breaking up after abusive relationship with logger

This commit is contained in:
forest 2021-07-27 14:28:42 -05:00 committed by 3wc
parent 6a587ac7fc
commit 66dee4d87a
19 changed files with 158 additions and 139 deletions

1
.gitignore vendored
View File

@ -17,3 +17,4 @@ build/
*.egg-info/
.venv
unittest-output.log

View File

@ -18,7 +18,7 @@ from flask import current_app
from apscheduler.schedulers.background import BackgroundScheduler
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
from capsulflask import hub_model, spoke_model, cli
from capsulflask.btcpay import client as btcpay
from capsulflask.http_client import MyHTTPClient
@ -50,7 +50,7 @@ def create_app():
app = Flask(__name__)
app.config.from_mapping(
TESTING=config.get("TESTING", False),
TESTING=config.get("TESTING", "False").lower() in ['true', '1', 't', 'y', 'yes'],
BASE_URL=config.get("BASE_URL", "http://localhost:5000"),
SECRET_KEY=config.get("SECRET_KEY", "dev"),
HUB_MODE_ENABLED=config.get("HUB_MODE_ENABLED", "True").lower() in ['true', '1', 't', 'y', 'yes'],
@ -96,28 +96,21 @@ def create_app():
app.config['HUB_URL'] = config.get("HUB_URL", app.config['BASE_URL'])
log_filters = {
'setLogLevelToDebugForHeartbeatRelatedMessages': {
'()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter,
}
}
if app.config['TESTING'] != False:
log_filters['captureLogOutputDuringTests'] = {
'()': CaptureLogOutputDuringTestsFilter,
}
logging_dict_config({
'version': 1,
'formatters': {'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
}},
'filters': log_filters,
'filters': {
'setLogLevelToDebugForHeartbeatRelatedMessages': {
'()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter,
}
},
'handlers': {'wsgi': {
'class': 'logging.StreamHandler',
'stream': 'ext://flask.logging.wsgi_errors_stream',
'formatter': 'default',
'filters': list(log_filters.keys())
'filters': ['setLogLevelToDebugForHeartbeatRelatedMessages']
}},
'root': {
'level': app.config['LOG_LEVEL'],
@ -125,11 +118,11 @@ def create_app():
}
})
# app.logger.critical("critical")
# app.logger.error("error")
# app.logger.warning("warning")
# app.logger.info("info")
# app.logger.debug("debug")
# mylog_critical(app, "critical")
# mylog_error(app, "error")
# mylog_warning(app, "warning")
# mylog_info(app, "info")
# mylog_debug(app, "debug")
stripe.api_key = app.config['STRIPE_SECRET_KEY']
stripe.api_version = app.config['STRIPE_API_VERSION']
@ -137,7 +130,7 @@ def create_app():
if app.config['MAIL_SERVER'] != "":
app.config['FLASK_MAIL_INSTANCE'] = Mail(app)
else:
app.logger.warning("No MAIL_SERVER configured. capsul will simply print emails to stdout.")
mylog_warning(app, "No MAIL_SERVER configured. capsul will simply print emails to stdout.")
app.config['FLASK_MAIL_INSTANCE'] = StdoutMockFlaskMail()
app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=int(app.config['INTERNAL_HTTP_TIMEOUT_SECONDS']))
@ -148,7 +141,7 @@ def create_app():
app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY'])
app.config['BTCPAY_ENABLED'] = True
except:
app.logger.warning("unable to create btcpay client. Capsul will work fine except cryptocurrency payments will not work. The error was: " + my_exec_info_message(sys.exc_info()))
mylog_warning(app, "unable to create btcpay client. Capsul will work fine except cryptocurrency payments will not work. The error was: " + my_exec_info_message(sys.exc_info()))
# only start the scheduler and attempt to migrate the database if we are running the app.
# otherwise we are running a CLI command.
@ -159,7 +152,7 @@ def create_app():
('test' in command_line)
)
app.logger.info(f"is_running_server: {is_running_server}")
mylog_info(app, f"is_running_server: {is_running_server}")
if app.config['HUB_MODE_ENABLED']:
if app.config['HUB_MODEL'] == "capsul-flask":
@ -252,7 +245,7 @@ def url_for_with_cache_bust(endpoint, **values):
class StdoutMockFlaskMail:
def send(self, message: Message):
current_app.logger.info(f"Email would have been sent if configured:\n\nto: {','.join(message.recipients)}\nsubject: {message.subject}\nbody:\n\n{message.body}\n\n")
mylog_info(current_app, f"Email would have been sent if configured:\n\nto: {','.join(message.recipients)}\nsubject: {message.subject}\nbody:\n\n{message.body}\n\n")
class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter):
def isHeartbeatRelatedString(self, thing):
@ -279,13 +272,4 @@ class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter):
return True
class CaptureLogOutputDuringTestsFilter(logging.Filter):
def filter(self, record):
file_object = open('unittest-output.log', 'a')
file_object.write("%s" % record.msg)
for arg in record.args:
file_object.write("%s" % arg)
file_object.write("\n")
file_object.close()
return True

View File

@ -10,7 +10,7 @@ from nanoid import generate
from capsulflask.metrics import durations as metric_durations
from capsulflask.auth import admin_account_required
from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
bp = Blueprint("admin", __name__, url_prefix="/admin")
@ -58,7 +58,7 @@ def index():
)
network['allocations'].append(allocation)
else:
current_app.logger.warning(f"/admin: capsul {vm['id']} has public_ipv4 {vm['public_ipv4']} which is out of range for its host network {host_id} {network['network_name']} {network['public_ipv4_cidr_block']}")
mylog_warning(current_app, f"/admin: capsul {vm['id']} has public_ipv4 {vm['public_ipv4']} which is out of range for its host network {host_id} {network['network_name']} {network['public_ipv4_cidr_block']}")
display_hosts.append(display_host)

View File

@ -16,6 +16,7 @@ from flask_mail import Message
from werkzeug.exceptions import abort
from capsulflask.db import get_model
from capsulflask.shared import *
bp = Blueprint("auth", __name__, url_prefix="/auth")

View File

@ -12,7 +12,7 @@ from psycopg2 import ProgrammingError
from flask_mail import Message
from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
from capsulflask.console import get_account_balance
bp = Blueprint('cli', __name__)
@ -68,19 +68,19 @@ def sql_script(f, c):
def cron_task():
# make sure btcpay payments get completed (in case we miss a webhook), otherwise invalidate the payment
current_app.logger.info("cron_task: starting clean_up_unresolved_btcpay_invoices")
mylog_info(current_app, "cron_task: starting clean_up_unresolved_btcpay_invoices")
clean_up_unresolved_btcpay_invoices()
current_app.logger.info("cron_task: finished clean_up_unresolved_btcpay_invoices")
mylog_info(current_app, "cron_task: finished clean_up_unresolved_btcpay_invoices")
# notify when funds run out
current_app.logger.info("cron_task: starting notify_users_about_account_balance")
mylog_info(current_app, "cron_task: starting notify_users_about_account_balance")
notify_users_about_account_balance()
current_app.logger.info("cron_task: finished notify_users_about_account_balance")
mylog_info(current_app, "cron_task: finished notify_users_about_account_balance")
# make sure vm system and DB are synced
current_app.logger.info("cron_task: starting ensure_vms_and_db_are_synced")
mylog_info(current_app, "cron_task: starting ensure_vms_and_db_are_synced")
ensure_vms_and_db_are_synced()
current_app.logger.info("cron_task: finished ensure_vms_and_db_are_synced")
mylog_info(current_app, "cron_task: finished ensure_vms_and_db_are_synced")
@ -92,7 +92,7 @@ def clean_up_unresolved_btcpay_invoices():
try:
btcpay_invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id)
except:
current_app.logger.error(f"""
mylog_error(current_app, f"""
error was thrown when contacting btcpay server for invoice {invoice_id}:
{my_exec_info_message(sys.exc_info())}"""
)
@ -101,13 +101,13 @@ def clean_up_unresolved_btcpay_invoices():
days = float((datetime.now() - unresolved_invoice['created']).total_seconds())/float(60*60*24)
if btcpay_invoice['status'] == "complete":
current_app.logger.info(
mylog_info(current_app,
f"resolving btcpay invoice {invoice_id} "
f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as completed "
)
get_model().btcpay_invoice_resolved(invoice_id, True)
elif days >= 1:
current_app.logger.info(
mylog_info(current_app,
f"resolving btcpay invoice {invoice_id} "
f"({unresolved_invoice['email']}, ${unresolved_invoice['dollars']}) as invalidated, "
f"btcpay server invoice status: {btcpay_invoice['status']}"
@ -236,7 +236,7 @@ def notify_users_about_account_balance():
index_to_send = i
if index_to_send > -1:
current_app.logger.info(f"cron_task: sending {warnings[index_to_send]['id']} warning email to {account['email']}.")
mylog_info(current_app, f"cron_task: sending {warnings[index_to_send]['id']} warning email to {account['email']}.")
get_body = warnings[index_to_send]['get_body']
get_subject = warnings[index_to_send]['get_subject']
current_app.config["FLASK_MAIL_INSTANCE"].send(
@ -250,7 +250,7 @@ def notify_users_about_account_balance():
get_model().set_account_balance_warning(account['email'], warnings[index_to_send]['id'])
if index_to_send == len(warnings)-1:
for vm in vms:
current_app.logger.warning(f"cron_task: deleting {vm['id']} ( {account['email']} ) due to negative account balance.")
mylog_warning(current_app, f"cron_task: deleting {vm['id']} ( {account['email']} ) due to negative account balance.")
current_app.config["HUB_MODEL"].destroy(email=account["email"], id=vm['id'])
get_model().delete_vm(email=account["email"], id=vm['id'])
@ -282,9 +282,9 @@ def ensure_vms_and_db_are_synced():
email_addresses_raw = current_app.config['ADMIN_EMAIL_ADDRESSES'].split(",")
email_addresses = list(filter(lambda x: len(x) > 6, map(lambda x: x.strip(), email_addresses_raw ) ))
current_app.logger.info(f"cron_task: sending inconsistency warning email to {','.join(email_addresses)}:")
mylog_info(current_app, f"cron_task: sending inconsistency warning email to {','.join(email_addresses)}:")
for error in errors:
current_app.logger.info(f"cron_task: {error}.")
mylog_info(current_app, f"cron_task: {error}.")
current_app.config["FLASK_MAIL_INSTANCE"].send(
Message(

View File

@ -17,7 +17,7 @@ from nanoid import generate
from capsulflask.metrics import durations as metric_durations
from capsulflask.auth import account_required
from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
from capsulflask.payment import poll_btcpay_session
from capsulflask import cli
@ -37,7 +37,7 @@ def double_check_capsul_address(id, ipv4, get_ssh_host_keys):
if result != None and result.ssh_host_keys != None and get_ssh_host_keys:
get_model().update_vm_ssh_host_keys(email=session["account"], id=id, ssh_host_keys=result.ssh_host_keys)
except:
current_app.logger.error(f"""
mylog_error(current_app, f"""
the virtualization model threw an error in double_check_capsul_address of {id}:
{my_exec_info_message(sys.exc_info())}"""
)
@ -137,7 +137,7 @@ def detail(id):
delete=True
)
else:
current_app.logger.info(f"deleting {vm['id']} per user request ({session['account']})")
mylog_info(current_app, f"deleting {vm['id']} per user request ({session['account']})")
current_app.config["HUB_MODEL"].destroy(email=session['account'], id=id)
get_model().delete_vm(email=session['account'], id=id)
@ -151,7 +151,7 @@ def detail(id):
force_stop=True,
)
else:
current_app.logger.info(f"force stopping {vm['id']} per user request ({session['account']})")
mylog_info(current_app, f"force stopping {vm['id']} per user request ({session['account']})")
current_app.config["HUB_MODEL"].vm_state_command(email=session['account'], id=id, command="force-stop")
vm["state"] = "stopped"
@ -195,9 +195,12 @@ def detail(id):
@account_required
def create():
raise "console.create()!"
#raise "console.create()!"
# file_object = open('unittest-output.log', 'a')
# file_object.write("console.create()!\n")
# file_object.close()
current_app.logger.error("console.create()!")
mylog_error(current_app, "console.create()!")
vm_sizes = get_model().vm_sizes_dict()
operating_systems = get_model().operating_systems_dict()
@ -281,7 +284,7 @@ def create():
flash(error)
if not capacity_avaliable:
current_app.logger.warning(f"when capsul capacity is restored, send an email to {session['account']}")
mylog_warning(current_app, f"when capsul capacity is restored, send an email to {session['account']}")
return render_template(
"create-capsul.html",

View File

@ -8,7 +8,7 @@ from flask import current_app
from flask import g
from capsulflask.db_model import DBModel
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
def init_app(app, is_running_server):
@ -28,11 +28,11 @@ def init_app(app, is_running_server):
schemaMigrations = {}
schemaMigrationsPath = join(app.root_path, 'schema_migrations')
app.logger.info("loading schema migration scripts from {}".format(schemaMigrationsPath))
mylog_info(app, "loading schema migration scripts from {}".format(schemaMigrationsPath))
for filename in listdir(schemaMigrationsPath):
result = re.search(r"^\d+_(up|down)", filename)
if not result:
app.logger.error(f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.")
mylog_error(app, f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.")
exit(1)
key = result.group()
with open(join(schemaMigrationsPath, filename), 'rb') as file:
@ -57,12 +57,12 @@ def init_app(app, is_running_server):
hasSchemaVersionTable = True
if hasSchemaVersionTable == False:
app.logger.info("no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA']))
mylog_info(app, "no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA']))
try:
cursor.execute(schemaMigrations["01_up"])
connection.commit()
except:
app.logger.error("unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info())))
mylog_error(app, "unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
@ -70,24 +70,24 @@ def init_app(app, is_running_server):
schemaVersion = cursor.fetchall()[0][0]
if schemaVersion > desiredSchemaVersion:
app.logger.critical("schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format(
mylog_critical(app, "schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format(
schemaVersion, desiredSchemaVersion
))
exit(1)
while schemaVersion < desiredSchemaVersion:
migrationKey = "%02d_up" % (schemaVersion+1)
app.logger.info("schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format(
mylog_info(app, "schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format(
schemaVersion, desiredSchemaVersion, migrationKey
))
try:
cursor.execute(schemaMigrations[migrationKey])
connection.commit()
except KeyError:
app.logger.critical("missing schema migration script: {}_xyz.sql".format(migrationKey))
mylog_critical(app, "missing schema migration script: {}_xyz.sql".format(migrationKey))
exit(1)
except:
app.logger.critical("unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info())))
mylog_critical(app, "unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
@ -96,7 +96,7 @@ def init_app(app, is_running_server):
versionFromDatabase = cursor.fetchall()[0][0]
if schemaVersion != versionFromDatabase:
app.logger.critical("incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format(
mylog_critical(app, "incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format(
versionFromDatabase,
migrationKey,
schemaVersion
@ -107,7 +107,7 @@ def init_app(app, is_running_server):
app.config['PSYCOPG2_CONNECTION_POOL'].putconn(connection)
app.logger.info("{} current schemaVersion: \"{}\"".format(
mylog_info(app, "{} current schemaVersion: \"{}\"".format(
("schema migration completed." if actionWasTaken else "schema is already up to date. "), schemaVersion
))

View File

@ -7,7 +7,7 @@ from nanoid import generate
from flask import current_app
from typing import List
from capsulflask.shared import OnlineHost
from capsulflask.shared import *
class DBModel:
@ -270,7 +270,7 @@ class DBModel:
row = self.cursor.fetchone()
if row:
if int(row[1]) != int(dollars):
current_app.logger.warning(f"""
mylog_warning(current_app, f"""
{payment_type} gave us a completed payment session with a different dollar amount than what we had recorded!!
id: {id}
account: {row[0]}

View File

@ -8,7 +8,7 @@ import threading
import aiohttp
import asyncio
from flask import current_app
from capsulflask.shared import OnlineHost, my_exec_info_message
from capsulflask.shared import *
from typing import List
class HTTPResult:
@ -33,7 +33,7 @@ class MyHTTPClient:
toReturn = []
for individualResult in fromOtherThread:
if individualResult.error != None and individualResult.error != "":
current_app.logger.error(individualResult.error)
mylog_error(current_app, individualResult.error)
toReturn.append(individualResult.http_result)
return toReturn
@ -42,7 +42,7 @@ class MyHTTPClient:
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header))
fromOtherThread = future.result()
if fromOtherThread.error != None and fromOtherThread.error != "":
current_app.logger.error(fromOtherThread.error)
mylog_error(current_app, fromOtherThread.error)
return fromOtherThread.http_result
def get_client_session(self):

View File

@ -7,7 +7,7 @@ from flask import request, make_response
from werkzeug.exceptions import abort
from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message, authorized_as_hub
from capsulflask.shared import *
bp = Blueprint("hub", __name__, url_prefix="/hub")
@ -21,19 +21,19 @@ def authorized_for_host(id):
def ping_all_hosts_task():
if authorized_as_hub(request.headers):
all_hosts = get_model().get_all_hosts()
current_app.logger.debug(f"pinging {len(all_hosts)} hosts...")
mylog_debug(current_app, f"pinging {len(all_hosts)} hosts...")
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header)
for i in range(len(all_hosts)):
host = all_hosts[i]
result = results[i]
current_app.logger.debug(f"response from {host.id} ({host.url}): {result.status_code} {result.body}")
mylog_debug(current_app, f"response from {host.id} ({host.url}): {result.status_code} {result.body}")
if result.status_code == 200:
get_model().host_heartbeat(host.id)
return "ok"
else:
current_app.logger.warning(f"/hub/heartbeat-task returned 401: invalid hub token")
mylog_warning(current_app, f"/hub/heartbeat-task returned 401: invalid hub token")
return abort(401, "invalid hub token")
@ -42,7 +42,7 @@ def heartbeat(host_id):
if authorized_for_host(host_id):
return "ok"
else:
current_app.logger.warning(f"/hub/heartbeat/{host_id} returned 401: invalid token")
mylog_warning(current_app, f"/hub/heartbeat/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token")
@bp.route("/claim-operation/<int:operation_id>/<string:host_id>", methods=("POST",))
@ -51,7 +51,7 @@ def claim_operation(operation_id: int, host_id: str):
payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id)
if payload_json is None:
error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found"
current_app.logger.error(error_message)
mylog_error(current_app, error_message)
return abort(404, error_message)
# right now if there is a can_claim_handler there needs to be a corresponding on_claimed_handler.
@ -84,7 +84,7 @@ def claim_operation(operation_id: int, host_id: str):
if error_message != "":
error_message = f"{host_id} can't claim operation {operation_id} because {error_message}"
current_app.logger.error(error_message)
mylog_error(current_app, error_message)
return abort(400, error_message)
# we will only return this payload as json if claiming succeeds, so might as well do this now...
@ -97,7 +97,7 @@ def claim_operation(operation_id: int, host_id: str):
if error_message != "":
error_message = f"{host_id} can't claim operation {operation_id} because {error_message}"
current_app.logger.error(error_message)
mylog_error(current_app, error_message)
return abort(400, error_message)
claimed = get_model().claim_operation(operation_id, host_id)
@ -113,7 +113,7 @@ def claim_operation(operation_id: int, host_id: str):
else:
return abort(409, f"operation was already assigned to another host")
else:
current_app.logger.warning(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token")
mylog_warning(current_app, f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token")
def can_claim_create(payload, host_id) -> (str, str):

View File

@ -14,7 +14,7 @@ from subprocess import run
from capsulflask.db import get_model
from capsulflask.http_client import HTTPResult
from capsulflask.shared import VirtualizationInterface, VirtualMachine, OnlineHost, validate_capsul_id, my_exec_info_message
from capsulflask.shared import *
class MockHub(VirtualizationInterface):
def __init__(self):
@ -42,7 +42,7 @@ class MockHub(VirtualizationInterface):
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}")
mylog_info(current_app, f"mock create: {id} for {email}")
sleep(1)
get_model().create_vm(
email=email,
@ -56,10 +56,10 @@ class MockHub(VirtualizationInterface):
)
def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}")
mylog_info(current_app, f"mock destroy: {id} for {email}")
def vm_state_command(self, email: str, id: str, command: str):
current_app.logger.info(f"mock {command}: {id} for {email}")
mylog_info(current_app, f"mock {command}: {id} for {email}")
class CapsulFlaskHub(VirtualizationInterface):
@ -119,7 +119,7 @@ class CapsulFlaskHub(VirtualizationInterface):
operation_desc = ""
if operation_id:
operation_desc = f"for operation {operation_id}"
current_app.logger.error(f"""error reading assignment_status {operation_desc} from host {host.id}:
mylog_error(current_app, f"""error reading assignment_status {operation_desc} from host {host.id}:
result_is_json: {result_is_json}
result_is_dict: {result_is_dict}
result_has_status: {result_has_status}
@ -183,10 +183,10 @@ class CapsulFlaskHub(VirtualizationInterface):
except:
all_valid = False
if not all_valid:
current_app.logger.error(f"""error reading ids for list_ids operation, host {host.id}""")
mylog_error(current_app, f"""error reading ids for list_ids operation, host {host.id}""")
else:
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
current_app.logger.error(f"""missing 'ids' list for list_ids operation, host {host.id}""")
mylog_error(current_app, f"""missing 'ids' list for list_ids operation, host {host.id}""")
except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass
@ -196,7 +196,7 @@ class CapsulFlaskHub(VirtualizationInterface):
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id)
online_hosts = get_model().get_online_hosts()
current_app.logger.debug(f"hub_model.create(): ${len(online_hosts)} hosts")
mylog_debug(current_app, f"hub_model.create(): ${len(online_hosts)} hosts")
payload = json.dumps(dict(
type="create",
email=email,

View File

@ -3,6 +3,7 @@ from flask import render_template
from flask import current_app
from capsulflask.db import get_model
from capsulflask.shared import *
bp = Blueprint("landing", __name__, url_prefix="/")

View File

@ -15,6 +15,7 @@ from werkzeug.exceptions import abort
from capsulflask.db import get_model
from capsulflask.auth import account_required
from capsulflask.shared import *
mutex = Lock()
bp = Blueprint("metrics", __name__, url_prefix="/metrics")
@ -143,7 +144,7 @@ def get_plot_bytes(metric, capsulid, duration, size):
def draw_plot_png_bytes(data, scale, size_x=3, size_y=1):
#current_app.logger.info(json.dumps(data, indent=4, default=str))
#mylog_info(current_app, json.dumps(data, indent=4, default=str))
pyplot.style.use("seaborn-dark")
fig, my_plot = pyplot.subplots(figsize=(size_x, size_y))

View File

@ -21,7 +21,7 @@ from werkzeug.exceptions import abort
from capsulflask.auth import account_required
from capsulflask.db import get_model
from capsulflask.shared import my_exec_info_message
from capsulflask.shared import *
bp = Blueprint("payment", __name__, url_prefix="/payment")
@ -67,11 +67,11 @@ def btcpay_payment():
notificationURL=f"{current_app.config['BASE_URL']}/payment/btcpay/webhook"
))
current_app.logger.info(f"created btcpay invoice: {invoice}")
mylog_info(current_app, f"created btcpay invoice: {invoice}")
# print(invoice)
current_app.logger.info(f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )")
mylog_info(current_app, f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )")
get_model().create_payment_session("btcpay", invoice["id"], session["account"], dollars)
@ -89,7 +89,7 @@ def poll_btcpay_session(invoice_id):
try:
invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id)
except:
current_app.logger.error(f"""
mylog_error(current_app, f"""
error was thrown when contacting btcpay server:
{my_exec_info_message(sys.exc_info())}"""
)
@ -101,13 +101,13 @@ def poll_btcpay_session(invoice_id):
dollars = invoice['price']
current_app.logger.info(f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}")
mylog_info(current_app, f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}")
if invoice['status'] == "paid" or invoice['status'] == "confirmed" or invoice['status'] == "complete":
success_account = get_model().consume_payment_session("btcpay", invoice_id, dollars)
if success_account:
current_app.logger.info(f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})")
mylog_info(current_app, f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})")
if invoice['status'] == "complete":
get_model().btcpay_invoice_resolved(invoice_id, True)
@ -120,7 +120,7 @@ def poll_btcpay_session(invoice_id):
@bp.route("/btcpay/webhook", methods=("POST",))
def btcpay_webhook():
current_app.logger.info(f"got btcpay webhook")
mylog_info(current_app, f"got btcpay webhook")
# IMPORTANT! there is no signature or credential to authenticate the data sent into this webhook :facepalm:
# its just a notification, thats all.
@ -148,7 +148,7 @@ def stripe_payment():
if len(errors) == 0:
current_app.logger.info(f"creating stripe checkout session for {session['account']}, ${dollars}")
mylog_info(current_app, f"creating stripe checkout session for {session['account']}, ${dollars}")
checkout_session = stripe.checkout.Session.create(
success_url=current_app.config['BASE_URL'] + "/payment/stripe/success?session_id={CHECKOUT_SESSION_ID}",
@ -167,7 +167,7 @@ def stripe_payment():
)
stripe_checkout_session_id = checkout_session['id']
current_app.logger.info(f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )")
mylog_info(current_app, f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )")
get_model().create_payment_session("stripe", stripe_checkout_session_id, session["account"], dollars)
@ -251,13 +251,13 @@ def validate_stripe_checkout_session(stripe_checkout_session_id):
def success():
stripe_checkout_session_id = request.args.get('session_id')
if not stripe_checkout_session_id:
current_app.logger.info("/payment/stripe/success returned 400: missing required URL parameter session_id")
mylog_info(current_app, "/payment/stripe/success returned 400: missing required URL parameter session_id")
abort(400, "missing required URL parameter session_id")
else:
for _ in range(0, 5):
paid = validate_stripe_checkout_session(stripe_checkout_session_id)
if paid:
current_app.logger.info(f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})")
mylog_info(current_app, f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})")
return redirect(url_for("console.account_balance"))
else:
sleep(1)

View File

@ -1,6 +1,6 @@
import re
from flask import current_app
from flask import current_app, Flask
from typing import List
class OnlineHost:
@ -54,4 +54,30 @@ def authorized_as_hub(headers):
return False
def my_exec_info_message(exec_info):
return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1])
return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1])
def log_output_for_tests(app: Flask, message: str):
if app.config['TESTING'] != False:
file_object = open('unittest-log-output.log', 'a')
file_object.write(message)
file_object.close()
def mylog_debug(app: Flask, message: str):
log_output_for_tests(app, f"DEBUG: {message}\n")
app.logger.debug(message)
def mylog_info(app: Flask, message: str):
log_output_for_tests(app, f"INFO: {message}\n")
app.logger.info(message)
def mylog_warning(app: Flask, message: str):
log_output_for_tests(app, f"WARNING: {message}\n")
app.logger.warning(message)
def mylog_error(app: Flask, message: str):
log_output_for_tests(app, f"ERROR: {message}\n")
app.logger.error(message)
def mylog_critical(app: Flask, message: str):
log_output_for_tests(app, f"CRITICAL: {message}\n")
app.logger.critical(message)

View File

@ -8,7 +8,7 @@ from flask import request
from flask.json import jsonify
from werkzeug.exceptions import abort
from capsulflask.shared import my_exec_info_message, authorized_as_hub
from capsulflask.shared import *
bp = Blueprint("spoke", __name__, url_prefix="/spoke")
@ -19,18 +19,18 @@ def heartbeat():
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header)
if result.status_code == -1:
current_app.logger.info(f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached")
mylog_info(current_app, f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached")
return abort(503, "Service Unavailable: hub timed out or cannot be reached")
if result.status_code == 401:
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} rejected our token")
mylog_info(current_app, f"/spoke/heartbeat returned 502: hub at {url} rejected our token")
return abort(502, "hub rejected our token")
if result.status_code != 200:
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}")
mylog_info(current_app, f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}")
return abort(502, "Bad Gateway: hub did not return 200")
return "OK"
else:
current_app.logger.info(f"/spoke/heartbeat returned 401: invalid hub token")
mylog_info(current_app, f"/spoke/heartbeat returned 401: invalid hub token")
return abort(401, "invalid hub token")
@bp.route("/operation/<int:operation_id>", methods=("POST",))
@ -45,7 +45,7 @@ def operation_impl(operation_id: int):
if authorized_as_hub(request.headers):
request_body_json = request.json
request_body = json.loads(request_body_json)
#current_app.logger.info(f"request.json: {request_body}")
#mylog_info(current_app, f"request.json: {request_body}")
handlers = {
"capacity_avaliable": handle_capacity_avaliable,
"get": handle_get,
@ -63,7 +63,7 @@ def operation_impl(operation_id: int):
return handlers[request_body['type']](operation_id, request_body)
except:
error_message = my_exec_info_message(sys.exc_info())
current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}")
mylog_error(current_app, f"unhandled exception in {request_body['type']} handler: {error_message}")
return jsonify(dict(error_message=error_message))
else:
error_message = f"'type' must be one of {types_csv}"
@ -71,15 +71,15 @@ def operation_impl(operation_id: int):
error_message = "'type' json property is required"
if error_message != "":
current_app.logger.error(f"/hosts/operation returned 400: {error_message}")
mylog_error(current_app, f"/hosts/operation returned 400: {error_message}")
return abort(400, f"bad request; {error_message}")
else:
current_app.logger.warning(f"/hosts/operation returned 401: invalid hub token")
mylog_warning(current_app, f"/hosts/operation returned 401: invalid hub token")
return abort(401, "invalid hub token")
def handle_capacity_avaliable(operation_id, request_body):
if 'additional_ram_bytes' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable")
mylog_error(current_app, f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable")
return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable")
has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes'])
@ -87,7 +87,7 @@ def handle_capacity_avaliable(operation_id, request_body):
def handle_get(operation_id, request_body):
if 'id' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: id is required for get")
mylog_error(current_app, f"/hosts/operation returned 400: id is required for get")
return abort(400, f"bad request; id is required for get")
vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], request_body['get_ssh_host_keys'])
@ -101,7 +101,7 @@ def handle_list_ids(operation_id, request_body):
def handle_create(operation_id, request_body):
if not operation_id:
current_app.logger.error(f"/hosts/operation returned 400: operation_id is required for create ")
mylog_error(current_app, f"/hosts/operation returned 400: operation_id is required for create ")
return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/<id>")
parameters = ["email", "id", "os", "size", "template_image_file_name", "vcpus", "memory_mb", "ssh_authorized_keys"]
@ -111,7 +111,7 @@ def handle_create(operation_id, request_body):
error_message = f"{error_message}\n{parameter} is required for create"
if error_message != "":
current_app.logger.error(f"/hosts/operation returned 400: {error_message}")
mylog_error(current_app, f"/hosts/operation returned 400: {error_message}")
return abort(400, f"bad request; {error_message}")
# only one host should create the vm, so we first race to assign this create operation to ourselves.
@ -140,7 +140,7 @@ def handle_create(operation_id, request_body):
elif result.status_code == 409:
assignment_status = "assigned_to_other_host"
else:
current_app.logger.error(f"{url} returned {result.status_code}: {result.body}")
mylog_error(current_app, f"{url} returned {result.status_code}: {result.body}")
return abort(503, f"hub did not cleanly handle our request to claim the create operation")
if assignment_status == "assigned":
@ -166,7 +166,7 @@ def handle_create(operation_id, request_body):
params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', "
params= f"{params} public_ipv4='{request_body['public_ipv4'] if 'public_ipv4' in request_body else 'KeyError'}', "
current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}")
mylog_error(current_app, f"spoke_model.create({params}) failed: {error_message}")
return jsonify(dict(assignment_status=assignment_status, error_message=error_message))
@ -174,11 +174,11 @@ def handle_create(operation_id, request_body):
def handle_destroy(operation_id, request_body):
if 'id' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: id is required for destroy")
mylog_error(current_app, f"/hosts/operation returned 400: id is required for destroy")
return abort(400, f"bad request; id is required for destroy")
if 'email' not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: email is required for destroy")
mylog_error(current_app, f"/hosts/operation returned 400: email is required for destroy")
return abort(400, f"bad request; email is required for destroy")
try:
@ -187,7 +187,7 @@ def handle_destroy(operation_id, request_body):
error_message = my_exec_info_message(sys.exc_info())
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', "
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', "
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].destroy({params}) failed: {error_message}")
mylog_error(current_app, f"current_app.config['SPOKE_MODEL'].destroy({params}) failed: {error_message}")
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message))
return jsonify(dict(assignment_status="assigned", status="success"))
@ -198,11 +198,11 @@ def handle_vm_state_command(operation_id, request_body):
required_properties = ['id', 'email', 'command']
for required_property in required_properties:
if required_property not in request_body:
current_app.logger.error(f"/hosts/operation returned 400: {required_property} is required for vm_state_command")
mylog_error(current_app, f"/hosts/operation returned 400: {required_property} is required for vm_state_command")
return abort(400, f"bad request; {required_property} is required for vm_state_command")
if request_body['command'] not in ["stop", "force-stop", "start", "restart"]:
current_app.logger.error(f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart")
mylog_error(current_app, f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart")
return abort(400, f"bad request; command ({request_body['command']}) must be one of stop, force-stop, start, or restart")
try:
@ -212,7 +212,7 @@ def handle_vm_state_command(operation_id, request_body):
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', "
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', "
params= f"{params} command='{request_body['command'] if 'command' in request_body else 'KeyError'}', "
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].vm_state_command({params}) failed: {error_message}")
mylog_error(current_app, f"current_app.config['SPOKE_MODEL'].vm_state_command({params}) failed: {error_message}")
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message))
return jsonify(dict(assignment_status="assigned", status="success"))

View File

@ -10,7 +10,7 @@ from subprocess import run
from capsulflask.db import get_model
from capsulflask.shared import VirtualizationInterface, VirtualMachine, validate_capsul_id, my_exec_info_message
from capsulflask.shared import *
class MockSpoke(VirtualizationInterface):
@ -43,15 +43,15 @@ class MockSpoke(VirtualizationInterface):
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list, network_name: str, public_ipv4: str):
validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}")
mylog_info(current_app, f"mock create: {id} for {email}")
self.capsuls[id] = dict(email=email, id=id, network_name=network_name, public_ipv4=public_ipv4)
sleep(1)
def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}")
mylog_info(current_app, f"mock destroy: {id} for {email}")
def vm_state_command(self, email: str, id: str, command: str):
current_app.logger.info(f"mock {command}: {id} for {email}")
mylog_info(current_app, f"mock {command}: {id} for {email}")
class ShellScriptSpoke(VirtualizationInterface):
@ -73,7 +73,7 @@ class ShellScriptSpoke(VirtualizationInterface):
completedProcess = run(my_args, capture_output=True)
if completedProcess.returncode != 0:
current_app.logger.error(f"""
mylog_error(current_app, f"""
capacity-avaliable.sh exited {completedProcess.returncode} with
stdout:
{completedProcess.stdout}
@ -85,7 +85,7 @@ class ShellScriptSpoke(VirtualizationInterface):
lines = completedProcess.stdout.splitlines()
output = lines[len(lines)-1]
if not output == b"yes":
current_app.logger.error(f"capacity-avaliable.sh exited 0 and returned {output} but did not return \"yes\" ")
mylog_error(current_app, f"capacity-avaliable.sh exited 0 and returned {output} but did not return \"yes\" ")
return False
return True
@ -96,14 +96,14 @@ class ShellScriptSpoke(VirtualizationInterface):
self.validate_completed_process(completedProcess)
lines = completedProcess.stdout.splitlines()
if len(lines) == 0:
current_app.logger.warning("shell_scripts/get.sh returned zero lines!")
mylog_warning(current_app, "shell_scripts/get.sh returned zero lines!")
return None
result_string = lines[0].decode("utf-8")
fields = result_string.split(" ")
if fields[0] != "true":
current_app.logger.warning(f"shell_scripts/get.sh was called for {id} which libvirt says does not exist.")
mylog_warning(current_app, f"shell_scripts/get.sh was called for {id} which libvirt says does not exist.")
return None
if len(fields) < 2:
@ -126,7 +126,7 @@ class ShellScriptSpoke(VirtualizationInterface):
ssh_host_keys = json.loads(completedProcess2.stdout.decode("utf-8"))
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], state=state, ipv4=ipaddr, ssh_host_keys=ssh_host_keys)
except:
current_app.logger.warning(f"""
mylog_warning(current_app, f"""
failed to ssh-keyscan {id} at {ipaddr}:
{my_exec_info_message(sys.exc_info())}"""
)
@ -218,5 +218,5 @@ class ShellScriptSpoke(VirtualizationInterface):
completedProcess = run([join(current_app.root_path, f"shell_scripts/{command}.sh"), id], capture_output=True)
self.validate_completed_process(completedProcess, email)
returned_string = completedProcess.stdout.decode("utf-8")
current_app.logger.info(f"{command} vm {id} for {email} returned: {returned_string}")
mylog_info(current_app, f"{command} vm {id} for {email} returned: {returned_string}")

View File

@ -58,6 +58,7 @@ class ConsoleTests(BaseTestCase):
def test_create_fails_capacity(self):
with self.client as client:
client.get(url_for("console.create"))
csrf_token = self.get_context_variable('csrf_token')
@ -82,7 +83,7 @@ class ConsoleTests(BaseTestCase):
len(get_model().list_vms_for_account('test@example.com')),
0
)
file_object = open('unittest-output.log', 'a')
file_object.write(f"{self.id()} captured output:\n{self.logs_from_test.getvalue()}\n")
file_object.close()
@ -207,3 +208,4 @@ class ConsoleTests(BaseTestCase):
get_model().cursor.execute("DELETE FROM vms")
get_model().cursor.execute("DELETE FROM payments")
get_model().cursor.connection.commit()

View File

@ -9,12 +9,13 @@ from flask import current_app
from capsulflask import create_app
from capsulflask.db import get_model
from capsulflask.shared import *
class BaseTestCase(TestCase):
def create_app(self):
# Use default connection paramaters
os.environ['POSTGRES_CONNECTION_PARAMETERS'] = "host=localhost port=5432 user=postgres password=dev dbname=capsulflask_test"
os.environ['TESTING'] = '1'
os.environ['TESTING'] = 'True'
os.environ['LOG_LEVEL'] = 'DEBUG'
os.environ['SPOKE_MODEL'] = 'mock'
os.environ['HUB_MODEL'] = 'capsul-flask'
@ -22,14 +23,13 @@ class BaseTestCase(TestCase):
return self.app
def setUp(self):
pass
mylog_info(self.app, f"setting up {self.id()}")
def tearDown(self):
pass
mylog_info(self.app, f"tearing down {self.id()}")
def _login(self, user_email):
get_model().login(user_email)
with self.client.session_transaction() as session:
session['account'] = user_email
session['csrf-token'] = generate()