tests-with-logs (FORCE the logs to be avaliable when running / debugging the tests) #8

Merged
3wordchant merged 4 commits from tests-with-logs into tests 2021-08-05 23:51:44 +00:00
21 changed files with 289 additions and 167 deletions

1
.gitignore vendored
View File

@ -17,3 +17,4 @@ build/
*.egg-info/ *.egg-info/
.venv .venv
unittest-log-output.log

3
app.py
View File

@ -1,4 +1,5 @@
from capsulflask import create_app from capsulflask import create_app
from capsulflask.http_client import MyHTTPClient
app = create_app() app = create_app(lambda timeout_seconds: MyHTTPClient(timeout_seconds=timeout_seconds))

View File

@ -18,14 +18,12 @@ from flask import current_app
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
from capsulflask import hub_model, spoke_model, cli from capsulflask import hub_model, spoke_model, cli
from capsulflask.btcpay import client as btcpay from capsulflask.btcpay import client as btcpay
from capsulflask.http_client import MyHTTPClient
def create_app(http_client_factory):
def create_app():
for var_name in [ for var_name in [
"SPOKE_HOST_TOKEN", "HUB_TOKEN", "STRIPE_SECRET_KEY", "SPOKE_HOST_TOKEN", "HUB_TOKEN", "STRIPE_SECRET_KEY",
"BTCPAY_PRIVATE_KEY", "MAIL_PASSWORD" "BTCPAY_PRIVATE_KEY", "MAIL_PASSWORD"
@ -50,7 +48,7 @@ def create_app():
app = Flask(__name__) app = Flask(__name__)
app.config.from_mapping( app.config.from_mapping(
TESTING=config.get("TESTING", False), TESTING=config.get("TESTING", "False").lower() in ['true', '1', 't', 'y', 'yes'],
BASE_URL=config.get("BASE_URL", "http://localhost:5000"), BASE_URL=config.get("BASE_URL", "http://localhost:5000"),
SECRET_KEY=config.get("SECRET_KEY", "dev"), SECRET_KEY=config.get("SECRET_KEY", "dev"),
HUB_MODE_ENABLED=config.get("HUB_MODE_ENABLED", "True").lower() in ['true', '1', 't', 'y', 'yes'], HUB_MODE_ENABLED=config.get("HUB_MODE_ENABLED", "True").lower() in ['true', '1', 't', 'y', 'yes'],
@ -96,28 +94,21 @@ def create_app():
app.config['HUB_URL'] = config.get("HUB_URL", app.config['BASE_URL']) app.config['HUB_URL'] = config.get("HUB_URL", app.config['BASE_URL'])
log_filters = {
'setLogLevelToDebugForHeartbeatRelatedMessages': {
'()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter,
}
}
if app.config['TESTING'] != False:
log_filters['captureLogOutputDuringTests'] = {
'()': CaptureLogOutputDuringTestsFilter,
}
logging_dict_config({ logging_dict_config({
'version': 1, 'version': 1,
'formatters': {'default': { 'formatters': {'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', 'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
}}, }},
'filters': log_filters, 'filters': {
'setLogLevelToDebugForHeartbeatRelatedMessages': {
'()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter,
}
},
'handlers': {'wsgi': { 'handlers': {'wsgi': {
'class': 'logging.StreamHandler', 'class': 'logging.StreamHandler',
'stream': 'ext://flask.logging.wsgi_errors_stream', 'stream': 'ext://flask.logging.wsgi_errors_stream',
'formatter': 'default', 'formatter': 'default',
'filters': list(log_filters.keys()) 'filters': ['setLogLevelToDebugForHeartbeatRelatedMessages']
}}, }},
'root': { 'root': {
'level': app.config['LOG_LEVEL'], 'level': app.config['LOG_LEVEL'],
@ -125,11 +116,11 @@ def create_app():
} }
}) })
# app.logger.critical("critical") # mylog_critical(app, "critical")
# app.logger.error("error") # mylog_error(app, "error")
# app.logger.warning("warning") # mylog_warning(app, "warning")
# app.logger.info("info") # mylog_info(app, "info")
# app.logger.debug("debug") # mylog_debug(app, "debug")
stripe.api_key = app.config['STRIPE_SECRET_KEY'] stripe.api_key = app.config['STRIPE_SECRET_KEY']
stripe.api_version = app.config['STRIPE_API_VERSION'] stripe.api_version = app.config['STRIPE_API_VERSION']
@ -137,10 +128,13 @@ def create_app():
if app.config['MAIL_SERVER'] != "": if app.config['MAIL_SERVER'] != "":
app.config['FLASK_MAIL_INSTANCE'] = Mail(app) app.config['FLASK_MAIL_INSTANCE'] = Mail(app)
else: else:
app.logger.warning("No MAIL_SERVER configured. capsul will simply print emails to stdout.") mylog_warning(app, "No MAIL_SERVER configured. capsul will simply print emails to stdout.")
app.config['FLASK_MAIL_INSTANCE'] = StdoutMockFlaskMail() app.config['FLASK_MAIL_INSTANCE'] = StdoutMockFlaskMail()
app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=int(app.config['INTERNAL_HTTP_TIMEOUT_SECONDS'])) # allow a mock http client to be injected by the test code.
app.config['HTTP_CLIENT'] = http_client_factory(int(app.config['INTERNAL_HTTP_TIMEOUT_SECONDS']))
app.config['BTCPAY_ENABLED'] = False app.config['BTCPAY_ENABLED'] = False
if app.config['BTCPAY_URL'] != "": if app.config['BTCPAY_URL'] != "":
@ -148,7 +142,7 @@ def create_app():
app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY']) app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY'])
app.config['BTCPAY_ENABLED'] = True app.config['BTCPAY_ENABLED'] = True
except: except:
app.logger.warning("unable to create btcpay client. Capsul will work fine except cryptocurrency payments will not work. The error was: " + my_exec_info_message(sys.exc_info())) mylog_warning(app, "unable to create btcpay client. Capsul will work fine except cryptocurrency payments will not work. The error was: " + my_exec_info_message(sys.exc_info()))
# only start the scheduler and attempt to migrate the database if we are running the app. # only start the scheduler and attempt to migrate the database if we are running the app.
# otherwise we are running a CLI command. # otherwise we are running a CLI command.
@ -159,7 +153,7 @@ def create_app():
('test' in command_line) ('test' in command_line)
) )
app.logger.info(f"is_running_server: {is_running_server}") mylog_info(app, f"is_running_server: {is_running_server}")
if app.config['HUB_MODE_ENABLED']: if app.config['HUB_MODE_ENABLED']:
if app.config['HUB_MODEL'] == "capsul-flask": if app.config['HUB_MODEL'] == "capsul-flask":
@ -167,7 +161,7 @@ def create_app():
# debug mode (flask reloader) runs two copies of the app. When running in debug mode, # debug mode (flask reloader) runs two copies of the app. When running in debug mode,
# we only want to start the scheduler one time. # we only want to start the scheduler one time.
if is_running_server and (not app.debug or config.get('WERKZEUG_RUN_MAIN') == 'true'): if is_running_server and not app.config['TESTING'] and (not app.debug or config.get('WERKZEUG_RUN_MAIN') == 'true'):
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task" heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task"
heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"} heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"}
@ -252,7 +246,7 @@ def url_for_with_cache_bust(endpoint, **values):
class StdoutMockFlaskMail: class StdoutMockFlaskMail:
def send(self, message: Message): def send(self, message: Message):
current_app.logger.info(f"Email would have been sent if configured:\n\nto: {','.join(message.recipients)}\nsubject: {message.subject}\nbody:\n\n{message.body}\n\n") mylog_info(current_app, f"Email would have been sent if configured:\n\nto: {','.join(message.recipients)}\nsubject: {message.subject}\nbody:\n\n{message.body}\n\n")
class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter): class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter):
def isHeartbeatRelatedString(self, thing): def isHeartbeatRelatedString(self, thing):
@ -279,13 +273,4 @@ class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter):
return True return True
class CaptureLogOutputDuringTestsFilter(logging.Filter):
def filter(self, record):
file_object = open('unittest-output.log', 'a')
file_object.write("%s" % record.msg)
for arg in record.args:
file_object.write("%s" % arg)
file_object.write("\n")
file_object.close()
return True

View File

@ -10,7 +10,7 @@ from nanoid import generate
from capsulflask.metrics import durations as metric_durations from capsulflask.metrics import durations as metric_durations
from capsulflask.auth import admin_account_required from capsulflask.auth import admin_account_required
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
bp = Blueprint("admin", __name__, url_prefix="/admin") bp = Blueprint("admin", __name__, url_prefix="/admin")
@ -58,7 +58,7 @@ def index():
) )
network['allocations'].append(allocation) network['allocations'].append(allocation)
else: else:
current_app.logger.warning(f"/admin: capsul {vm['id']} has public_ipv4 {vm['public_ipv4']} which is out of range for its host network {host_id} {network['network_name']} {network['public_ipv4_cidr_block']}") mylog_warning(current_app, f"/admin: capsul {vm['id']} has public_ipv4 {vm['public_ipv4']} which is out of range for its host network {host_id} {network['network_name']} {network['public_ipv4_cidr_block']}")
display_hosts.append(display_host) display_hosts.append(display_host)

View File

@ -16,6 +16,7 @@ from flask_mail import Message
from werkzeug.exceptions import abort from werkzeug.exceptions import abort
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import *
bp = Blueprint("auth", __name__, url_prefix="/auth") bp = Blueprint("auth", __name__, url_prefix="/auth")

View File

@ -12,7 +12,7 @@ from psycopg2 import ProgrammingError
from flask_mail import Message from flask_mail import Message
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
from capsulflask.console import get_account_balance from capsulflask.console import get_account_balance
bp = Blueprint('cli', __name__) bp = Blueprint('cli', __name__)
@ -68,19 +68,19 @@ def sql_script(f, c):
def cron_task(): def cron_task():
# make sure btcpay payments get completed (in case we miss a webhook), otherwise invalidate the payment # make sure btcpay payments get completed (in case we miss a webhook), otherwise invalidate the payment
current_app.logger.info("cron_task: starting clean_up_unresolved_btcpay_invoices") mylog_info(current_app, "cron_task: starting clean_up_unresolved_btcpay_invoices")
clean_up_unresolved_btcpay_invoices() clean_up_unresolved_btcpay_invoices()
current_app.logger.info("cron_task: finished clean_up_unresolved_btcpay_invoices") mylog_info(current_app, "cron_task: finished clean_up_unresolved_btcpay_invoices")
# notify when funds run out # notify when funds run out
current_app.logger.info("cron_task: starting notify_users_about_account_balance") mylog_info(current_app, "cron_task: starting notify_users_about_account_balance")
notify_users_about_account_balance() notify_users_about_account_balance()
current_app.logger.info("cron_task: finished notify_users_about_account_balance") mylog_info(current_app, "cron_task: finished notify_users_about_account_balance")
# make sure vm system and DB are synced # make sure vm system and DB are synced
current_app.logger.info("cron_task: starting ensure_vms_and_db_are_synced") mylog_info(current_app, "cron_task: starting ensure_vms_and_db_are_synced")
ensure_vms_and_db_are_synced() ensure_vms_and_db_are_synced()
current_app.logger.info("cron_task: finished ensure_vms_and_db_are_synced") mylog_info(current_app, "cron_task: finished ensure_vms_and_db_are_synced")
@ -92,7 +92,7 @@ def clean_up_unresolved_btcpay_invoices():
try: try:
btcpay_invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id) btcpay_invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id)
except: except:
current_app.logger.error(f""" mylog_error(current_app, f"""
error was thrown when contacting btcpay server for invoice {invoice_id}: error was thrown when contacting btcpay server for invoice {invoice_id}:
{my_exec_info_message(sys.exc_info())}""" {my_exec_info_message(sys.exc_info())}"""
) )
@ -101,13 +101,13 @@ def clean_up_unresolved_btcpay_invoices():
days = float((datetime.now() - unresolved_invoice['created']).total_seconds())/float(60*60*24) days = float((datetime.now() - unresolved_invoice['created']).total_seconds())/float(60*60*24)
if btcpay_invoice['status'] == "complete": if btcpay_invoice['status'] == "complete":
current_app.logger.info( mylog_info(current_app,
f"resolving btcpay invoice {invoice_id} " f"resolving btcpay invoice {invoice_id} "
f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as completed " f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as completed "
) )
get_model().btcpay_invoice_resolved(invoice_id, True) get_model().btcpay_invoice_resolved(invoice_id, True)
elif days >= 1: elif days >= 1:
current_app.logger.info( mylog_info(current_app,
f"resolving btcpay invoice {invoice_id} " f"resolving btcpay invoice {invoice_id} "
f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as invalidated, " f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as invalidated, "
f"btcpay server invoice status: {btcpay_invoice['status']}" f"btcpay server invoice status: {btcpay_invoice['status']}"
@ -236,7 +236,7 @@ def notify_users_about_account_balance():
index_to_send = i index_to_send = i
if index_to_send > -1: if index_to_send > -1:
current_app.logger.info(f"cron_task: sending {warnings[index_to_send]['id']} warning email to {account['email']}.") mylog_info(current_app, f"cron_task: sending {warnings[index_to_send]['id']} warning email to {account['email']}.")
get_body = warnings[index_to_send]['get_body'] get_body = warnings[index_to_send]['get_body']
get_subject = warnings[index_to_send]['get_subject'] get_subject = warnings[index_to_send]['get_subject']
current_app.config["FLASK_MAIL_INSTANCE"].send( current_app.config["FLASK_MAIL_INSTANCE"].send(
@ -250,7 +250,7 @@ def notify_users_about_account_balance():
get_model().set_account_balance_warning(account['email'], warnings[index_to_send]['id']) get_model().set_account_balance_warning(account['email'], warnings[index_to_send]['id'])
if index_to_send == len(warnings)-1: if index_to_send == len(warnings)-1:
for vm in vms: for vm in vms:
current_app.logger.warning(f"cron_task: deleting {vm['id']} ( {account['email']} ) due to negative account balance.") mylog_warning(current_app, f"cron_task: deleting {vm['id']} ( {account['email']} ) due to negative account balance.")
current_app.config["HUB_MODEL"].destroy(email=account["email"], id=vm['id']) current_app.config["HUB_MODEL"].destroy(email=account["email"], id=vm['id'])
get_model().delete_vm(email=account["email"], id=vm['id']) get_model().delete_vm(email=account["email"], id=vm['id'])
@ -282,9 +282,9 @@ def ensure_vms_and_db_are_synced():
email_addresses_raw = current_app.config['ADMIN_EMAIL_ADDRESSES'].split(",") email_addresses_raw = current_app.config['ADMIN_EMAIL_ADDRESSES'].split(",")
email_addresses = list(filter(lambda x: len(x) > 6, map(lambda x: x.strip(), email_addresses_raw ) )) email_addresses = list(filter(lambda x: len(x) > 6, map(lambda x: x.strip(), email_addresses_raw ) ))
current_app.logger.info(f"cron_task: sending inconsistency warning email to {','.join(email_addresses)}:") mylog_info(current_app, f"cron_task: sending inconsistency warning email to {','.join(email_addresses)}:")
for error in errors: for error in errors:
current_app.logger.info(f"cron_task: {error}.") mylog_info(current_app, f"cron_task: {error}.")
current_app.config["FLASK_MAIL_INSTANCE"].send( current_app.config["FLASK_MAIL_INSTANCE"].send(
Message( Message(

View File

@ -17,7 +17,7 @@ from nanoid import generate
from capsulflask.metrics import durations as metric_durations from capsulflask.metrics import durations as metric_durations
from capsulflask.auth import account_required from capsulflask.auth import account_required
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
from capsulflask.payment import poll_btcpay_session from capsulflask.payment import poll_btcpay_session
from capsulflask import cli from capsulflask import cli
@ -37,7 +37,7 @@ def double_check_capsul_address(id, ipv4, get_ssh_host_keys):
if result != None and result.ssh_host_keys != None and get_ssh_host_keys: if result != None and result.ssh_host_keys != None and get_ssh_host_keys:
get_model().update_vm_ssh_host_keys(email=session["account"], id=id, ssh_host_keys=result.ssh_host_keys) get_model().update_vm_ssh_host_keys(email=session["account"], id=id, ssh_host_keys=result.ssh_host_keys)
except: except:
current_app.logger.error(f""" mylog_error(current_app, f"""
the virtualization model threw an error in double_check_capsul_address of {id}: the virtualization model threw an error in double_check_capsul_address of {id}:
{my_exec_info_message(sys.exc_info())}""" {my_exec_info_message(sys.exc_info())}"""
) )
@ -135,7 +135,7 @@ def detail(id):
delete=True delete=True
) )
else: else:
current_app.logger.info(f"deleting {vm['id']} per user request ({session['account']})") mylog_info(current_app, f"deleting {vm['id']} per user request ({session['account']})")
current_app.config["HUB_MODEL"].destroy(email=session['account'], id=id) current_app.config["HUB_MODEL"].destroy(email=session['account'], id=id)
get_model().delete_vm(email=session['account'], id=id) get_model().delete_vm(email=session['account'], id=id)
@ -149,7 +149,7 @@ def detail(id):
force_stop=True, force_stop=True,
) )
else: else:
current_app.logger.info(f"force stopping {vm['id']} per user request ({session['account']})") mylog_info(current_app, f"force stopping {vm['id']} per user request ({session['account']})")
current_app.config["HUB_MODEL"].vm_state_command(email=session['account'], id=id, command="force-stop") current_app.config["HUB_MODEL"].vm_state_command(email=session['account'], id=id, command="force-stop")
vm["state"] = "stopped" vm["state"] = "stopped"
@ -193,10 +193,6 @@ def detail(id):
@account_required @account_required
def create(): def create():
raise "console.create()!"
current_app.logger.error("console.create()!")
vm_sizes = get_model().vm_sizes_dict() vm_sizes = get_model().vm_sizes_dict()
operating_systems = get_model().operating_systems_dict() operating_systems = get_model().operating_systems_dict()
public_keys_for_account = get_model().list_ssh_public_keys_for_account(session["account"]) public_keys_for_account = get_model().list_ssh_public_keys_for_account(session["account"])
@ -279,7 +275,7 @@ def create():
flash(error) flash(error)
if not capacity_avaliable: if not capacity_avaliable:
current_app.logger.warning(f"when capsul capacity is restored, send an email to {session['account']}") mylog_warning(current_app, f"when capsul capacity is restored, send an email to {session['account']}")
return render_template( return render_template(
"create-capsul.html", "create-capsul.html",

View File

@ -8,7 +8,7 @@ from flask import current_app
from flask import g from flask import g
from capsulflask.db_model import DBModel from capsulflask.db_model import DBModel
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
def init_app(app, is_running_server): def init_app(app, is_running_server):
@ -28,11 +28,11 @@ def init_app(app, is_running_server):
schemaMigrations = {} schemaMigrations = {}
schemaMigrationsPath = join(app.root_path, 'schema_migrations') schemaMigrationsPath = join(app.root_path, 'schema_migrations')
app.logger.info("loading schema migration scripts from {}".format(schemaMigrationsPath)) mylog_info(app, "loading schema migration scripts from {}".format(schemaMigrationsPath))
for filename in listdir(schemaMigrationsPath): for filename in listdir(schemaMigrationsPath):
result = re.search(r"^\d+_(up|down)", filename) result = re.search(r"^\d+_(up|down)", filename)
if not result: if not result:
app.logger.error(f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.") mylog_error(app, f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.")
exit(1) exit(1)
key = result.group() key = result.group()
with open(join(schemaMigrationsPath, filename), 'rb') as file: with open(join(schemaMigrationsPath, filename), 'rb') as file:
@ -57,12 +57,12 @@ def init_app(app, is_running_server):
hasSchemaVersionTable = True hasSchemaVersionTable = True
if hasSchemaVersionTable == False: if hasSchemaVersionTable == False:
app.logger.info("no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA'])) mylog_info(app, "no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA']))
try: try:
cursor.execute(schemaMigrations["01_up"]) cursor.execute(schemaMigrations["01_up"])
connection.commit() connection.commit()
except: except:
app.logger.error("unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info()))) mylog_error(app, "unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info())))
exit(1) exit(1)
actionWasTaken = True actionWasTaken = True
@ -70,24 +70,24 @@ def init_app(app, is_running_server):
schemaVersion = cursor.fetchall()[0][0] schemaVersion = cursor.fetchall()[0][0]
if schemaVersion > desiredSchemaVersion: if schemaVersion > desiredSchemaVersion:
app.logger.critical("schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format( mylog_critical(app, "schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format(
schemaVersion, desiredSchemaVersion schemaVersion, desiredSchemaVersion
)) ))
exit(1) exit(1)
while schemaVersion < desiredSchemaVersion: while schemaVersion < desiredSchemaVersion:
migrationKey = "%02d_up" % (schemaVersion+1) migrationKey = "%02d_up" % (schemaVersion+1)
app.logger.info("schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format( mylog_info(app, "schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format(
schemaVersion, desiredSchemaVersion, migrationKey schemaVersion, desiredSchemaVersion, migrationKey
)) ))
try: try:
cursor.execute(schemaMigrations[migrationKey]) cursor.execute(schemaMigrations[migrationKey])
connection.commit() connection.commit()
except KeyError: except KeyError:
app.logger.critical("missing schema migration script: {}_xyz.sql".format(migrationKey)) mylog_critical(app, "missing schema migration script: {}_xyz.sql".format(migrationKey))
exit(1) exit(1)
except: except:
app.logger.critical("unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info()))) mylog_critical(app, "unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info())))
exit(1) exit(1)
actionWasTaken = True actionWasTaken = True
@ -96,7 +96,7 @@ def init_app(app, is_running_server):
versionFromDatabase = cursor.fetchall()[0][0] versionFromDatabase = cursor.fetchall()[0][0]
if schemaVersion != versionFromDatabase: if schemaVersion != versionFromDatabase:
app.logger.critical("incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format( mylog_critical(app, "incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format(
versionFromDatabase, versionFromDatabase,
migrationKey, migrationKey,
schemaVersion schemaVersion
@ -107,7 +107,7 @@ def init_app(app, is_running_server):
app.config['PSYCOPG2_CONNECTION_POOL'].putconn(connection) app.config['PSYCOPG2_CONNECTION_POOL'].putconn(connection)
app.logger.info("{} current schemaVersion: \"{}\"".format( mylog_info(app, "{} current schemaVersion: \"{}\"".format(
("schema migration completed." if actionWasTaken else "schema is already up to date. "), schemaVersion ("schema migration completed." if actionWasTaken else "schema is already up to date. "), schemaVersion
)) ))

View File

@ -7,7 +7,7 @@ from nanoid import generate
from flask import current_app from flask import current_app
from typing import List from typing import List
from capsulflask.shared import OnlineHost from capsulflask.shared import *
class DBModel: class DBModel:
@ -270,7 +270,7 @@ class DBModel:
row = self.cursor.fetchone() row = self.cursor.fetchone()
if row: if row:
if int(row[1]) != int(dollars): if int(row[1]) != int(dollars):
current_app.logger.warning(f""" mylog_warning(current_app, f"""
{payment_type} gave us a completed payment session with a different dollar amount than what we had recorded!! {payment_type} gave us a completed payment session with a different dollar amount than what we had recorded!!
id: {id} id: {id}
account: {row[0]} account: {row[0]}

View File

@ -8,7 +8,7 @@ import threading
import aiohttp import aiohttp
import asyncio import asyncio
from flask import current_app from flask import current_app
from capsulflask.shared import OnlineHost, my_exec_info_message from capsulflask.shared import *
from typing import List from typing import List
class HTTPResult: class HTTPResult:
@ -33,7 +33,7 @@ class MyHTTPClient:
toReturn = [] toReturn = []
for individualResult in fromOtherThread: for individualResult in fromOtherThread:
if individualResult.error != None and individualResult.error != "": if individualResult.error != None and individualResult.error != "":
current_app.logger.error(individualResult.error) mylog_error(current_app, individualResult.error)
toReturn.append(individualResult.http_result) toReturn.append(individualResult.http_result)
return toReturn return toReturn
@ -42,7 +42,7 @@ class MyHTTPClient:
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header)) future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header))
fromOtherThread = future.result() fromOtherThread = future.result()
if fromOtherThread.error != None and fromOtherThread.error != "": if fromOtherThread.error != None and fromOtherThread.error != "":
current_app.logger.error(fromOtherThread.error) mylog_error(current_app, fromOtherThread.error)
return fromOtherThread.http_result return fromOtherThread.http_result
def get_client_session(self): def get_client_session(self):

View File

@ -7,7 +7,7 @@ from flask import request, make_response
from werkzeug.exceptions import abort from werkzeug.exceptions import abort
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message, authorized_as_hub from capsulflask.shared import *
bp = Blueprint("hub", __name__, url_prefix="/hub") bp = Blueprint("hub", __name__, url_prefix="/hub")
@ -21,19 +21,19 @@ def authorized_for_host(id):
def ping_all_hosts_task(): def ping_all_hosts_task():
if authorized_as_hub(request.headers): if authorized_as_hub(request.headers):
all_hosts = get_model().get_all_hosts() all_hosts = get_model().get_all_hosts()
current_app.logger.debug(f"pinging {len(all_hosts)} hosts...") mylog_debug(current_app, f"pinging {len(all_hosts)} hosts...")
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header) results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header)
for i in range(len(all_hosts)): for i in range(len(all_hosts)):
host = all_hosts[i] host = all_hosts[i]
result = results[i] result = results[i]
current_app.logger.debug(f"response from {host.id} ({host.url}): {result.status_code} {result.body}") mylog_debug(current_app, f"response from {host.id} ({host.url}): {result.status_code} {result.body}")
if result.status_code == 200: if result.status_code == 200:
get_model().host_heartbeat(host.id) get_model().host_heartbeat(host.id)
return "ok" return "ok"
else: else:
current_app.logger.warning(f"/hub/heartbeat-task returned 401: invalid hub token") mylog_warning(current_app, f"/hub/heartbeat-task returned 401: invalid hub token")
return abort(401, "invalid hub token") return abort(401, "invalid hub token")
@ -42,7 +42,7 @@ def heartbeat(host_id):
if authorized_for_host(host_id): if authorized_for_host(host_id):
return "ok" return "ok"
else: else:
current_app.logger.warning(f"/hub/heartbeat/{host_id} returned 401: invalid token") mylog_warning(current_app, f"/hub/heartbeat/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token") return abort(401, "invalid host id or token")
@bp.route("/claim-operation/<int:operation_id>/<string:host_id>", methods=("POST",)) @bp.route("/claim-operation/<int:operation_id>/<string:host_id>", methods=("POST",))
@ -51,7 +51,7 @@ def claim_operation(operation_id: int, host_id: str):
payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id) payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id)
if payload_json is None: if payload_json is None:
error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found" error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found"
current_app.logger.error(error_message) mylog_error(current_app, error_message)
return abort(404, error_message) return abort(404, error_message)
# right now if there is a can_claim_handler there needs to be a corresponding on_claimed_handler. # right now if there is a can_claim_handler there needs to be a corresponding on_claimed_handler.
@ -84,7 +84,7 @@ def claim_operation(operation_id: int, host_id: str):
if error_message != "": if error_message != "":
error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" error_message = f"{host_id} can't claim operation {operation_id} because {error_message}"
current_app.logger.error(error_message) mylog_error(current_app, error_message)
return abort(400, error_message) return abort(400, error_message)
# we will only return this payload as json if claiming succeeds, so might as well do this now... # we will only return this payload as json if claiming succeeds, so might as well do this now...
@ -97,7 +97,7 @@ def claim_operation(operation_id: int, host_id: str):
if error_message != "": if error_message != "":
error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" error_message = f"{host_id} can't claim operation {operation_id} because {error_message}"
current_app.logger.error(error_message) mylog_error(current_app, error_message)
return abort(400, error_message) return abort(400, error_message)
claimed = get_model().claim_operation(operation_id, host_id) claimed = get_model().claim_operation(operation_id, host_id)
@ -113,7 +113,7 @@ def claim_operation(operation_id: int, host_id: str):
else: else:
return abort(409, f"operation was already assigned to another host") return abort(409, f"operation was already assigned to another host")
else: else:
current_app.logger.warning(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token") mylog_warning(current_app, f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token") return abort(401, "invalid host id or token")
def can_claim_create(payload, host_id) -> (str, str): def can_claim_create(payload, host_id) -> (str, str):

View File

@ -14,7 +14,7 @@ from subprocess import run
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.http_client import HTTPResult from capsulflask.http_client import HTTPResult
from capsulflask.shared import VirtualizationInterface, VirtualMachine, OnlineHost, validate_capsul_id, my_exec_info_message from capsulflask.shared import *
class MockHub(VirtualizationInterface): class MockHub(VirtualizationInterface):
def __init__(self): def __init__(self):
@ -42,7 +42,7 @@ class MockHub(VirtualizationInterface):
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id) validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}") mylog_info(current_app, f"mock create: {id} for {email}")
sleep(1) sleep(1)
get_model().create_vm( get_model().create_vm(
email=email, email=email,
@ -56,10 +56,10 @@ class MockHub(VirtualizationInterface):
) )
def destroy(self, email: str, id: str): def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}") mylog_info(current_app, f"mock destroy: {id} for {email}")
def vm_state_command(self, email: str, id: str, command: str): def vm_state_command(self, email: str, id: str, command: str):
current_app.logger.info(f"mock {command}: {id} for {email}") mylog_info(current_app, f"mock {command}: {id} for {email}")
class CapsulFlaskHub(VirtualizationInterface): class CapsulFlaskHub(VirtualizationInterface):
@ -119,7 +119,7 @@ class CapsulFlaskHub(VirtualizationInterface):
operation_desc = "" operation_desc = ""
if operation_id: if operation_id:
operation_desc = f"for operation {operation_id}" operation_desc = f"for operation {operation_id}"
current_app.logger.error(f"""error reading assignment_status {operation_desc} from host {host.id}: mylog_error(current_app, f"""error reading assignment_status {operation_desc} from host {host.id}:
result_is_json: {result_is_json} result_is_json: {result_is_json}
result_is_dict: {result_is_dict} result_is_dict: {result_is_dict}
result_has_status: {result_has_status} result_has_status: {result_has_status}
@ -183,10 +183,10 @@ class CapsulFlaskHub(VirtualizationInterface):
except: except:
all_valid = False all_valid = False
if not all_valid: if not all_valid:
current_app.logger.error(f"""error reading ids for list_ids operation, host {host.id}""") mylog_error(current_app, f"""error reading ids for list_ids operation, host {host.id}""")
else: else:
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"}) result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
current_app.logger.error(f"""missing 'ids' list for list_ids operation, host {host.id}""") mylog_error(current_app, f"""missing 'ids' list for list_ids operation, host {host.id}""")
except: except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it. # no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass pass
@ -196,7 +196,6 @@ class CapsulFlaskHub(VirtualizationInterface):
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id) validate_capsul_id(id)
online_hosts = get_model().get_online_hosts() online_hosts = get_model().get_online_hosts()
current_app.logger.debug(f"hub_model.create(): ${len(online_hosts)} hosts")
payload = json.dumps(dict( payload = json.dumps(dict(
type="create", type="create",
email=email, email=email,

View File

@ -3,6 +3,7 @@ from flask import render_template
from flask import current_app from flask import current_app
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import *
bp = Blueprint("landing", __name__, url_prefix="/") bp = Blueprint("landing", __name__, url_prefix="/")

View File

@ -15,6 +15,7 @@ from werkzeug.exceptions import abort
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.auth import account_required from capsulflask.auth import account_required
from capsulflask.shared import *
mutex = Lock() mutex = Lock()
bp = Blueprint("metrics", __name__, url_prefix="/metrics") bp = Blueprint("metrics", __name__, url_prefix="/metrics")
@ -143,7 +144,7 @@ def get_plot_bytes(metric, capsulid, duration, size):
def draw_plot_png_bytes(data, scale, size_x=3, size_y=1): def draw_plot_png_bytes(data, scale, size_x=3, size_y=1):
#current_app.logger.info(json.dumps(data, indent=4, default=str)) #mylog_info(current_app, json.dumps(data, indent=4, default=str))
pyplot.style.use("seaborn-dark") pyplot.style.use("seaborn-dark")
fig, my_plot = pyplot.subplots(figsize=(size_x, size_y)) fig, my_plot = pyplot.subplots(figsize=(size_x, size_y))

View File

@ -21,7 +21,7 @@ from werkzeug.exceptions import abort
from capsulflask.auth import account_required from capsulflask.auth import account_required
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message from capsulflask.shared import *
bp = Blueprint("payment", __name__, url_prefix="/payment") bp = Blueprint("payment", __name__, url_prefix="/payment")
@ -67,11 +67,11 @@ def btcpay_payment():
notificationURL=f"{current_app.config['BASE_URL']}/payment/btcpay/webhook" notificationURL=f"{current_app.config['BASE_URL']}/payment/btcpay/webhook"
)) ))
current_app.logger.info(f"created btcpay invoice: {invoice}") mylog_info(current_app, f"created btcpay invoice: {invoice}")
# print(invoice) # print(invoice)
current_app.logger.info(f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )") mylog_info(current_app, f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )")
get_model().create_payment_session("btcpay", invoice["id"], session["account"], dollars) get_model().create_payment_session("btcpay", invoice["id"], session["account"], dollars)
@ -89,7 +89,7 @@ def poll_btcpay_session(invoice_id):
try: try:
invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id) invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id)
except: except:
current_app.logger.error(f""" mylog_error(current_app, f"""
error was thrown when contacting btcpay server: error was thrown when contacting btcpay server:
{my_exec_info_message(sys.exc_info())}""" {my_exec_info_message(sys.exc_info())}"""
) )
@ -101,13 +101,13 @@ def poll_btcpay_session(invoice_id):
dollars = invoice['price'] dollars = invoice['price']
current_app.logger.info(f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}") mylog_info(current_app, f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}")
if invoice['status'] == "paid" or invoice['status'] == "confirmed" or invoice['status'] == "complete": if invoice['status'] == "paid" or invoice['status'] == "confirmed" or invoice['status'] == "complete":
success_account = get_model().consume_payment_session("btcpay", invoice_id, dollars) success_account = get_model().consume_payment_session("btcpay", invoice_id, dollars)
if success_account: if success_account:
current_app.logger.info(f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})") mylog_info(current_app, f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})")
if invoice['status'] == "complete": if invoice['status'] == "complete":
get_model().btcpay_invoice_resolved(invoice_id, True) get_model().btcpay_invoice_resolved(invoice_id, True)
@ -120,7 +120,7 @@ def poll_btcpay_session(invoice_id):
@bp.route("/btcpay/webhook", methods=("POST",)) @bp.route("/btcpay/webhook", methods=("POST",))
def btcpay_webhook(): def btcpay_webhook():
current_app.logger.info(f"got btcpay webhook") mylog_info(current_app, f"got btcpay webhook")
# IMPORTANT! there is no signature or credential to authenticate the data sent into this webhook :facepalm: # IMPORTANT! there is no signature or credential to authenticate the data sent into this webhook :facepalm:
# its just a notification, thats all. # its just a notification, thats all.
@ -148,7 +148,7 @@ def stripe_payment():
if len(errors) == 0: if len(errors) == 0:
current_app.logger.info(f"creating stripe checkout session for {session['account']}, ${dollars}") mylog_info(current_app, f"creating stripe checkout session for {session['account']}, ${dollars}")
checkout_session = stripe.checkout.Session.create( checkout_session = stripe.checkout.Session.create(
success_url=current_app.config['BASE_URL'] + "/payment/stripe/success?session_id={CHECKOUT_SESSION_ID}", success_url=current_app.config['BASE_URL'] + "/payment/stripe/success?session_id={CHECKOUT_SESSION_ID}",
@ -167,7 +167,7 @@ def stripe_payment():
) )
stripe_checkout_session_id = checkout_session['id'] stripe_checkout_session_id = checkout_session['id']
current_app.logger.info(f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )") mylog_info(current_app, f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )")
get_model().create_payment_session("stripe", stripe_checkout_session_id, session["account"], dollars) get_model().create_payment_session("stripe", stripe_checkout_session_id, session["account"], dollars)
@ -251,13 +251,13 @@ def validate_stripe_checkout_session(stripe_checkout_session_id):
def success(): def success():
stripe_checkout_session_id = request.args.get('session_id') stripe_checkout_session_id = request.args.get('session_id')
if not stripe_checkout_session_id: if not stripe_checkout_session_id:
current_app.logger.info("/payment/stripe/success returned 400: missing required URL parameter session_id") mylog_info(current_app, "/payment/stripe/success returned 400: missing required URL parameter session_id")
abort(400, "missing required URL parameter session_id") abort(400, "missing required URL parameter session_id")
else: else:
for _ in range(0, 5): for _ in range(0, 5):
paid = validate_stripe_checkout_session(stripe_checkout_session_id) paid = validate_stripe_checkout_session(stripe_checkout_session_id)
if paid: if paid:
current_app.logger.info(f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})") mylog_info(current_app, f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})")
return redirect(url_for("console.account_balance")) return redirect(url_for("console.account_balance"))
else: else:
sleep(1) sleep(1)

View File

@ -1,7 +1,8 @@
import re import re
from flask import current_app from flask import current_app, Flask
from typing import List from typing import List
from threading import Lock
class OnlineHost: class OnlineHost:
def __init__(self, id: str, url: str): def __init__(self, id: str, url: str):
@ -55,3 +56,42 @@ def authorized_as_hub(headers):
def my_exec_info_message(exec_info): def my_exec_info_message(exec_info):
return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1]) return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1])
mylog_current_test_id_container = {
'value': '',
'mutex': Lock()
}
def set_mylog_test_id(test_id):
mylog_current_test_id_container['value'] = ".".join(test_id.split(".")[-2:])
def log_output_for_tests(app: Flask, message: str):
if app.config['TESTING'] and mylog_current_test_id_container['value'] != "":
mylog_current_test_id_container['mutex'].acquire()
file_object = open('unittest-log-output.log', 'a')
file_object.write(f"{mylog_current_test_id_container['value']}: {message}\n")
file_object.close()
mylog_current_test_id_container['mutex'].release()
def mylog_debug(app: Flask, message: str):
log_output_for_tests(app, f"DEBUG: {message}")
app.logger.debug(message)
def mylog_info(app: Flask, message: str):
log_output_for_tests(app, f"INFO: {message}")
app.logger.info(message)
def mylog_warning(app: Flask, message: str):
log_output_for_tests(app, f"WARNING: {message}")
app.logger.warning(message)
def mylog_error(app: Flask, message: str):
log_output_for_tests(app, f"ERROR: {message}")
app.logger.error(message)
def mylog_critical(app: Flask, message: str):
log_output_for_tests(app, f"CRITICAL: {message}")
app.logger.critical(message)

View File

@ -8,7 +8,7 @@ from flask import request
from flask.json import jsonify from flask.json import jsonify
from werkzeug.exceptions import abort from werkzeug.exceptions import abort
from capsulflask.shared import my_exec_info_message, authorized_as_hub from capsulflask.shared import *
bp = Blueprint("spoke", __name__, url_prefix="/spoke") bp = Blueprint("spoke", __name__, url_prefix="/spoke")
@ -19,18 +19,18 @@ def heartbeat():
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header)
if result.status_code == -1: if result.status_code == -1:
current_app.logger.info(f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached") mylog_info(current_app, f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached")
return abort(503, "Service Unavailable: hub timed out or cannot be reached") return abort(503, "Service Unavailable: hub timed out or cannot be reached")
if result.status_code == 401: if result.status_code == 401:
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} rejected our token") mylog_info(current_app, f"/spoke/heartbeat returned 502: hub at {url} rejected our token")
return abort(502, "hub rejected our token") return abort(502, "hub rejected our token")
if result.status_code != 200: if result.status_code != 200:
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}") mylog_info(current_app, f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}")
return abort(502, "Bad Gateway: hub did not return 200") return abort(502, "Bad Gateway: hub did not return 200")
return "OK" return "OK"
else: else:
current_app.logger.info(f"/spoke/heartbeat returned 401: invalid hub token") mylog_info(current_app, f"/spoke/heartbeat returned 401: invalid hub token")
return abort(401, "invalid hub token") return abort(401, "invalid hub token")
@bp.route("/operation/<int:operation_id>", methods=("POST",)) @bp.route("/operation/<int:operation_id>", methods=("POST",))
@ -43,9 +43,10 @@ def operation_without_id():
def operation_impl(operation_id: int): def operation_impl(operation_id: int):
if authorized_as_hub(request.headers): if authorized_as_hub(request.headers):
request_body_json = request.json request_body = request.json
request_body = json.loads(request_body_json) if not isinstance(request.json, dict) and not isinstance(request.json, list):
#current_app.logger.info(f"request.json: {request_body}") request_body = json.loads(request.json)
handlers = { handlers = {
"capacity_avaliable": handle_capacity_avaliable, "capacity_avaliable": handle_capacity_avaliable,
"get": handle_get, "get": handle_get,
@ -63,7 +64,7 @@ def operation_impl(operation_id: int):
return handlers[request_body['type']](operation_id, request_body) return handlers[request_body['type']](operation_id, request_body)
except: except:
error_message = my_exec_info_message(sys.exc_info()) error_message = my_exec_info_message(sys.exc_info())
current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") mylog_error(current_app, f"unhandled exception in {request_body['type']} handler: {error_message}")
return jsonify(dict(error_message=error_message)) return jsonify(dict(error_message=error_message))
else: else:
error_message = f"'type' must be one of {types_csv}" error_message = f"'type' must be one of {types_csv}"
@ -71,15 +72,15 @@ def operation_impl(operation_id: int):
error_message = "'type' json property is required" error_message = "'type' json property is required"
if error_message != "": if error_message != "":
current_app.logger.error(f"/hosts/operation returned 400: {error_message}") mylog_error(current_app, f"/hosts/operation returned 400: {error_message}")
return abort(400, f"bad request; {error_message}") return abort(400, f"bad request; {error_message}")
else: else:
current_app.logger.warning(f"/hosts/operation returned 401: invalid hub token") mylog_warning(current_app, f"/hosts/operation returned 401: invalid hub token")
return abort(401, "invalid hub token") return abort(401, "invalid hub token")
def handle_capacity_avaliable(operation_id, request_body): def handle_capacity_avaliable(operation_id, request_body):
if 'additional_ram_bytes' not in request_body: if 'additional_ram_bytes' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable") mylog_error(current_app, f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable")
return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable") return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable")
has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes']) has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes'])
@ -87,7 +88,7 @@ def handle_capacity_avaliable(operation_id, request_body):
def handle_get(operation_id, request_body): def handle_get(operation_id, request_body):
if 'id' not in request_body: if 'id' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: id is required for get") mylog_error(current_app, f"/hosts/operation returned 400: id is required for get")
return abort(400, f"bad request; id is required for get") return abort(400, f"bad request; id is required for get")
vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], request_body['get_ssh_host_keys']) vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], request_body['get_ssh_host_keys'])
@ -101,7 +102,7 @@ def handle_list_ids(operation_id, request_body):
def handle_create(operation_id, request_body): def handle_create(operation_id, request_body):
if not operation_id: if not operation_id:
current_app.logger.error(f"/hosts/operation returned 400: operation_id is required for create ") mylog_error(current_app, f"/hosts/operation returned 400: operation_id is required for create ")
return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/<id>") return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/<id>")
parameters = ["email", "id", "os", "size", "template_image_file_name", "vcpus", "memory_mb", "ssh_authorized_keys"] parameters = ["email", "id", "os", "size", "template_image_file_name", "vcpus", "memory_mb", "ssh_authorized_keys"]
@ -111,7 +112,7 @@ def handle_create(operation_id, request_body):
error_message = f"{error_message}\n{parameter} is required for create" error_message = f"{error_message}\n{parameter} is required for create"
if error_message != "": if error_message != "":
current_app.logger.error(f"/hosts/operation returned 400: {error_message}") mylog_error(current_app, f"/hosts/operation returned 400: {error_message}")
return abort(400, f"bad request; {error_message}") return abort(400, f"bad request; {error_message}")
# only one host should create the vm, so we first race to assign this create operation to ourselves. # only one host should create the vm, so we first race to assign this create operation to ourselves.
@ -140,7 +141,7 @@ def handle_create(operation_id, request_body):
elif result.status_code == 409: elif result.status_code == 409:
assignment_status = "assigned_to_other_host" assignment_status = "assigned_to_other_host"
else: else:
current_app.logger.error(f"{url} returned {result.status_code}: {result.body}") mylog_error(current_app, f"{url} returned {result.status_code}: {result.body}")
return abort(503, f"hub did not cleanly handle our request to claim the create operation") return abort(503, f"hub did not cleanly handle our request to claim the create operation")
if assignment_status == "assigned": if assignment_status == "assigned":
@ -166,7 +167,7 @@ def handle_create(operation_id, request_body):
params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', " params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', "
params= f"{params} public_ipv4='{request_body['public_ipv4'] if 'public_ipv4' in request_body else 'KeyError'}', " params= f"{params} public_ipv4='{request_body['public_ipv4'] if 'public_ipv4' in request_body else 'KeyError'}', "
current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") mylog_error(current_app, f"spoke_model.create({params}) failed: {error_message}")
return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) return jsonify(dict(assignment_status=assignment_status, error_message=error_message))
@ -174,11 +175,11 @@ def handle_create(operation_id, request_body):
def handle_destroy(operation_id, request_body): def handle_destroy(operation_id, request_body):
if 'id' not in request_body: if 'id' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: id is required for destroy") mylog_error(current_app, f"/hosts/operation returned 400: id is required for destroy")
return abort(400, f"bad request; id is required for destroy") return abort(400, f"bad request; id is required for destroy")
if 'email' not in request_body: if 'email' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: email is required for destroy") mylog_error(current_app, f"/hosts/operation returned 400: email is required for destroy")
return abort(400, f"bad request; email is required for destroy") return abort(400, f"bad request; email is required for destroy")
try: try:
@ -187,7 +188,7 @@ def handle_destroy(operation_id, request_body):
error_message = my_exec_info_message(sys.exc_info()) error_message = my_exec_info_message(sys.exc_info())
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', "
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', " params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', "
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].destroy({params}) failed: {error_message}") mylog_error(current_app, f"current_app.config['SPOKE_MODEL'].destroy({params}) failed: {error_message}")
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message))
return jsonify(dict(assignment_status="assigned", status="success")) return jsonify(dict(assignment_status="assigned", status="success"))
@ -198,11 +199,11 @@ def handle_vm_state_command(operation_id, request_body):
required_properties = ['id', 'email', 'command'] required_properties = ['id', 'email', 'command']
for required_property in required_properties: for required_property in required_properties:
if required_property not in request_body: if required_property not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: {required_property} is required for vm_state_command") mylog_error(current_app, f"/hosts/operation returned 400: {required_property} is required for vm_state_command")
return abort(400, f"bad request; {required_property} is required for vm_state_command") return abort(400, f"bad request; {required_property} is required for vm_state_command")
if request_body['command'] not in ["stop", "force-stop", "start", "restart"]: if request_body['command'] not in ["stop", "force-stop", "start", "restart"]:
current_app.logger.error(f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart") mylog_error(current_app, f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart")
return abort(400, f"bad request; command ({request_body['command']}) must be one of stop, force-stop, start, or restart") return abort(400, f"bad request; command ({request_body['command']}) must be one of stop, force-stop, start, or restart")
try: try:
@ -212,7 +213,7 @@ def handle_vm_state_command(operation_id, request_body):
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', "
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', " params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', "
params= f"{params} command='{request_body['command'] if 'command' in request_body else 'KeyError'}', " params= f"{params} command='{request_body['command'] if 'command' in request_body else 'KeyError'}', "
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].vm_state_command({params}) failed: {error_message}") mylog_error(current_app, f"current_app.config['SPOKE_MODEL'].vm_state_command({params}) failed: {error_message}")
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message))
return jsonify(dict(assignment_status="assigned", status="success")) return jsonify(dict(assignment_status="assigned", status="success"))

View File

@ -10,7 +10,7 @@ from subprocess import run
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.shared import VirtualizationInterface, VirtualMachine, validate_capsul_id, my_exec_info_message from capsulflask.shared import *
class MockSpoke(VirtualizationInterface): class MockSpoke(VirtualizationInterface):
@ -43,15 +43,15 @@ class MockSpoke(VirtualizationInterface):
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list, network_name: str, public_ipv4: str): def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list, network_name: str, public_ipv4: str):
validate_capsul_id(id) validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}") mylog_info(current_app, f"mock create: {id} for {email}")
self.capsuls[id] = dict(email=email, id=id, network_name=network_name, public_ipv4=public_ipv4) self.capsuls[id] = dict(email=email, id=id, network_name=network_name, public_ipv4=public_ipv4)
sleep(1) sleep(1)
def destroy(self, email: str, id: str): def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}") mylog_info(current_app, f"mock destroy: {id} for {email}")
def vm_state_command(self, email: str, id: str, command: str): def vm_state_command(self, email: str, id: str, command: str):
current_app.logger.info(f"mock {command}: {id} for {email}") mylog_info(current_app, f"mock {command}: {id} for {email}")
class ShellScriptSpoke(VirtualizationInterface): class ShellScriptSpoke(VirtualizationInterface):
@ -73,7 +73,7 @@ class ShellScriptSpoke(VirtualizationInterface):
completedProcess = run(my_args, capture_output=True) completedProcess = run(my_args, capture_output=True)
if completedProcess.returncode != 0: if completedProcess.returncode != 0:
current_app.logger.error(f""" mylog_error(current_app, f"""
capacity-avaliable.sh exited {completedProcess.returncode} with capacity-avaliable.sh exited {completedProcess.returncode} with
stdout: stdout:
{completedProcess.stdout} {completedProcess.stdout}
@ -85,7 +85,7 @@ class ShellScriptSpoke(VirtualizationInterface):
lines = completedProcess.stdout.splitlines() lines = completedProcess.stdout.splitlines()
output = lines[len(lines)-1] output = lines[len(lines)-1]
if not output == b"yes": if not output == b"yes":
current_app.logger.error(f"capacity-avaliable.sh exited 0 and returned {output} but did not return \"yes\" ") mylog_error(current_app, f"capacity-avaliable.sh exited 0 and returned {output} but did not return \"yes\" ")
return False return False
return True return True
@ -96,14 +96,14 @@ class ShellScriptSpoke(VirtualizationInterface):
self.validate_completed_process(completedProcess) self.validate_completed_process(completedProcess)
lines = completedProcess.stdout.splitlines() lines = completedProcess.stdout.splitlines()
if len(lines) == 0: if len(lines) == 0:
current_app.logger.warning("shell_scripts/get.sh returned zero lines!") mylog_warning(current_app, "shell_scripts/get.sh returned zero lines!")
return None return None
result_string = lines[0].decode("utf-8") result_string = lines[0].decode("utf-8")
fields = result_string.split(" ") fields = result_string.split(" ")
if fields[0] != "true": if fields[0] != "true":
current_app.logger.warning(f"shell_scripts/get.sh was called for {id} which libvirt says does not exist.") mylog_warning(current_app, f"shell_scripts/get.sh was called for {id} which libvirt says does not exist.")
return None return None
if len(fields) < 2: if len(fields) < 2:
@ -126,7 +126,7 @@ class ShellScriptSpoke(VirtualizationInterface):
ssh_host_keys = json.loads(completedProcess2.stdout.decode("utf-8")) ssh_host_keys = json.loads(completedProcess2.stdout.decode("utf-8"))
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], state=state, ipv4=ipaddr, ssh_host_keys=ssh_host_keys) return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], state=state, ipv4=ipaddr, ssh_host_keys=ssh_host_keys)
except: except:
current_app.logger.warning(f""" mylog_warning(current_app, f"""
failed to ssh-keyscan {id} at {ipaddr}: failed to ssh-keyscan {id} at {ipaddr}:
{my_exec_info_message(sys.exc_info())}""" {my_exec_info_message(sys.exc_info())}"""
) )
@ -218,5 +218,5 @@ class ShellScriptSpoke(VirtualizationInterface):
completedProcess = run([join(current_app.root_path, f"shell_scripts/{command}.sh"), id], capture_output=True) completedProcess = run([join(current_app.root_path, f"shell_scripts/{command}.sh"), id], capture_output=True)
self.validate_completed_process(completedProcess, email) self.validate_completed_process(completedProcess, email)
returned_string = completedProcess.stdout.decode("utf-8") returned_string = completedProcess.stdout.decode("utf-8")
current_app.logger.info(f"{command} vm {id} for {email} returned: {returned_string}") mylog_info(current_app, f"{command} vm {id} for {email} returned: {returned_string}")

View File

@ -8,6 +8,7 @@ from flask import url_for
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.tests_base import BaseTestCase from capsulflask.tests_base import BaseTestCase
from capsulflask.shared import *
from capsulflask.spoke_model import MockSpoke from capsulflask.spoke_model import MockSpoke
@ -25,6 +26,30 @@ class ConsoleTests(BaseTestCase):
"content": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDntq1t8Ddsa2q4p+PM7W4CLYYmxakokRRVLlf7AQlsTJFPsgBe9u0zuoOaKDMkBr0dlnuLm4Eub1Mj+BrdqAokto0YDiAnxUKRuYQKuHySKK8bLkisi2k47jGBDikx/jihgiuFTawo1mYsJJepC7PPwZGsoCImJEgq1L+ug0p3Zrj3QkUx4h25MpCSs2yvfgWjDyN8hEC76O42P+4ETezYrzrd1Kj26hdzHRnrxygvIUOtfau+5ydlaz8xQBEPrEY6/+pKDuwtXg1pBL7GmoUxBXVfHQSgq5s9jIJH+G0CR0ZoHMB25Ln4X/bsCQbLOu21+IGYKSDVM5TIMLtkKUkERQMVWvnpOp1LZKir4dC0m7SW74wpA8+2b1IsURIr9ARYGJpCEv1Q1Wz/X3yTf6Mfey7992MjUc9HcgjgU01/+kYomoXHprzolk+22Gjfgo3a4dRIoTY82GO8kkUKiaWHvDkkVURCY5dpteLA05sk3Z9aRMYsNXPLeOOPfzTlDA0=" "content": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDntq1t8Ddsa2q4p+PM7W4CLYYmxakokRRVLlf7AQlsTJFPsgBe9u0zuoOaKDMkBr0dlnuLm4Eub1Mj+BrdqAokto0YDiAnxUKRuYQKuHySKK8bLkisi2k47jGBDikx/jihgiuFTawo1mYsJJepC7PPwZGsoCImJEgq1L+ug0p3Zrj3QkUx4h25MpCSs2yvfgWjDyN8hEC76O42P+4ETezYrzrd1Kj26hdzHRnrxygvIUOtfau+5ydlaz8xQBEPrEY6/+pKDuwtXg1pBL7GmoUxBXVfHQSgq5s9jIJH+G0CR0ZoHMB25Ln4X/bsCQbLOu21+IGYKSDVM5TIMLtkKUkERQMVWvnpOp1LZKir4dC0m7SW74wpA8+2b1IsURIr9ARYGJpCEv1Q1Wz/X3yTf6Mfey7992MjUc9HcgjgU01/+kYomoXHprzolk+22Gjfgo3a4dRIoTY82GO8kkUKiaWHvDkkVURCY5dpteLA05sk3Z9aRMYsNXPLeOOPfzTlDA0="
} }
def setUp(self):
super().setUp()
get_model().cursor.execute("DELETE FROM host_operation")
get_model().cursor.execute("DELETE FROM operations")
get_model().cursor.execute("DELETE FROM vm_ssh_host_key")
get_model().cursor.execute("DELETE FROM vm_ssh_authorized_key")
get_model().cursor.execute("DELETE FROM ssh_public_keys")
get_model().cursor.execute("DELETE FROM login_tokens")
get_model().cursor.execute("DELETE FROM vms")
get_model().cursor.execute("DELETE FROM payments")
get_model().cursor.connection.commit()
self._login('test@example.com')
get_model().create_ssh_public_key('test@example.com', 'key', 'foo')
# heartbeat all the spokes so that the hub <--> spoke communication can work as normal.
host_ids = get_model().list_hosts_with_networks(None).keys()
for host_id in host_ids:
get_model().host_heartbeat(host_id)
def test_index(self): def test_index(self):
self._login('test@example.com') self._login('test@example.com')
with self.client as client: with self.client as client:
@ -58,6 +83,7 @@ class ConsoleTests(BaseTestCase):
def test_create_fails_capacity(self): def test_create_fails_capacity(self):
with self.client as client: with self.client as client:
client.get(url_for("console.create")) client.get(url_for("console.create"))
csrf_token = self.get_context_variable('csrf_token') csrf_token = self.get_context_variable('csrf_token')
@ -83,10 +109,6 @@ class ConsoleTests(BaseTestCase):
0 0
) )
file_object = open('unittest-output.log', 'a')
file_object.write(f"{self.id()} captured output:\n{self.logs_from_test.getvalue()}\n")
file_object.close()
def test_create_fails_invalid(self): def test_create_fails_invalid(self):
with self.client as client: with self.client as client:
client.get(url_for("console.create")) client.get(url_for("console.create"))
@ -120,8 +142,7 @@ class ConsoleTests(BaseTestCase):
response = client.post(url_for("console.create"), data=data) response = client.post(url_for("console.create"), data=data)
# mylog_info(self.app, get_model().list_all_operations())
self.assertEqual( self.assertEqual(
len(get_model().list_all_operations()), len(get_model().list_all_operations()),
@ -195,15 +216,4 @@ class ConsoleTests(BaseTestCase):
category='message' category='message'
) )
def setUp(self):
super().setUp()
self._login('test@example.com')
get_model().create_ssh_public_key('test@example.com', 'key', 'foo')
def tearDown(self):
super().tearDown()
get_model().cursor.execute("DELETE FROM ssh_public_keys")
get_model().cursor.execute("DELETE FROM login_tokens")
get_model().cursor.execute("DELETE FROM vms")
get_model().cursor.execute("DELETE FROM payments")
get_model().cursor.connection.commit()

View File

@ -1,31 +1,46 @@
from io import StringIO
import logging import logging
import unittest import unittest
import os import os
import sys
import json
import itertools
import time
import threading
import asyncio
import traceback
from urllib.parse import urlparse
from typing import List
from nanoid import generate from nanoid import generate
from concurrent.futures import ThreadPoolExecutor
from flask_testing import TestCase from flask_testing import TestCase
from flask import current_app from flask import current_app
from capsulflask import create_app from capsulflask import create_app
from capsulflask.db import get_model from capsulflask.db import get_model
from capsulflask.http_client import *
from capsulflask.shared import *
class BaseTestCase(TestCase): class BaseTestCase(TestCase):
def create_app(self): def create_app(self):
# Use default connection paramaters # Use default connection paramaters
os.environ['POSTGRES_CONNECTION_PARAMETERS'] = "host=localhost port=5432 user=postgres password=dev dbname=capsulflask_test" os.environ['POSTGRES_CONNECTION_PARAMETERS'] = "host=localhost port=5432 user=postgres password=dev dbname=capsulflask_test"
os.environ['TESTING'] = '1' os.environ['TESTING'] = 'True'
os.environ['LOG_LEVEL'] = 'DEBUG' os.environ['LOG_LEVEL'] = 'DEBUG'
os.environ['SPOKE_MODEL'] = 'mock' os.environ['SPOKE_MODEL'] = 'mock'
os.environ['HUB_MODEL'] = 'capsul-flask' os.environ['HUB_MODEL'] = 'capsul-flask'
self.app = create_app() self1 = self
get_app = lambda: self1.app
self.app = create_app(lambda timeout_seconds: TestHTTPClient(get_app, timeout_seconds))
return self.app return self.app
def setUp(self): def setUp(self):
pass set_mylog_test_id(self.id())
def tearDown(self): def tearDown(self):
pass set_mylog_test_id("")
def _login(self, user_email): def _login(self, user_email):
get_model().login(user_email) get_model().login(user_email)
@ -33,3 +48,72 @@ class BaseTestCase(TestCase):
session['account'] = user_email session['account'] = user_email
session['csrf-token'] = generate() session['csrf-token'] = generate()
class TestHTTPClient:
def __init__(self, get_app, timeout_seconds = 5):
self.timeout_seconds = timeout_seconds
self.get_app = get_app
self.executor = ThreadPoolExecutor()
def do_multi_http_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
future = run_coroutine(self.do_multi_http(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
fromOtherThread = future.result()
toReturn = []
for individualResult in fromOtherThread:
if individualResult.error != None and individualResult.error != "":
mylog_error(self.get_app(), individualResult.error)
toReturn.append(individualResult.http_result)
return toReturn
def do_http_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header))
fromOtherThread = future.result()
if fromOtherThread.error != None and fromOtherThread.error != "":
mylog_error(self.get_app(), fromOtherThread.error)
return fromOtherThread.http_result
async def do_http(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult:
path = urlparse(url).path
headers = {}
if authorization_header != None:
headers['Authorization'] = authorization_header
if body:
headers['Content-Type'] = "application/json"
#mylog_info(self.get_app(), f"path, data=body, headers=headers: {path}, {body}, {headers}")
do_request = None
if method == "POST":
do_request = lambda: self.get_app().test_client().post(path, data=body, headers=headers)
if method == "GET":
do_request = lambda: self.get_app().test_client().get(path, headers=headers)
response = None
try:
response = await get_event_loop().run_in_executor(self.executor, do_request)
except:
traceback.print_exc()
error_message = my_exec_info_message(sys.exc_info())
response_body = json.dumps({"error_message": f"do_http (HTTP {method} {url}) {error_message}"})
return InterThreadResult(
HTTPResult(-1, response_body),
f"""do_http (HTTP {method} {url}) failed with: {error_message}"""
)
return InterThreadResult(HTTPResult(response.status_code, response.get_data()), None)
async def do_multi_http(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]:
tasks = []
# append to tasks in the same order as online_hosts
for host in online_hosts:
tasks.append(
self.do_http(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header)
)
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
results = await asyncio.gather(*tasks)
return results

View File

@ -6,7 +6,9 @@ To run tests:
1. create a Postgres database called `capsulflask_test` 1. create a Postgres database called `capsulflask_test`
- e.g.: `docker exec -it 98e1ddfbbffb createdb -U postgres -O postgres capsulflask_test` - e.g.: `docker exec -it 98e1ddfbbffb createdb -U postgres -O postgres capsulflask_test`
- (`98e1ddfbbffb` is the docker container ID of the postgres container) - (`98e1ddfbbffb` is the docker container ID of the postgres container)
2. run `python3 -m unittest` 2. run `python3 -m unittest; cat unittest-log-output.log; rm unittest-log-output.log`
**NOTE** that right now we can't figure out how to get the tests to properly output the log messages that happened when they failed, (or passed), so for now we have hacked it to write to a file.
### Architecture ### Architecture