From 6764c5c97de1e7364feee041947a8dea05cd06c9 Mon Sep 17 00:00:00 2001 From: forest Date: Mon, 4 Jan 2021 13:32:52 -0600 Subject: [PATCH] got httpclient working, spoke heartbeat is working --- Pipfile | 1 + Pipfile.lock | 88 ++++++++------ capsulflask/__init__.py | 22 +++- capsulflask/cli.py | 3 +- capsulflask/console.py | 3 +- capsulflask/db.py | 5 +- capsulflask/db_model.py | 15 +-- capsulflask/http_client.py | 111 ++++++++++++++++-- capsulflask/hub_api.py | 35 +++++- capsulflask/hub_model.py | 40 ++----- capsulflask/payment.py | 3 +- .../09_up_introduce_hosts.sql | 4 +- capsulflask/shared.py | 44 +++++++ capsulflask/spoke_api.py | 14 +-- capsulflask/spoke_model.py | 5 +- 15 files changed, 281 insertions(+), 112 deletions(-) create mode 100644 capsulflask/shared.py diff --git a/Pipfile b/Pipfile index 218d982..efea2fc 100644 --- a/Pipfile +++ b/Pipfile @@ -30,6 +30,7 @@ requests = "*" python-dotenv = "*" ecdsa = "*" aiohttp = "*" +apscheduler = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 81fbcc2..d4badac 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "c05ae73ec64f0f248d1406a399de1ed83b7d472cf54ff0743f35d125b6d1e98f" + "sha256": "b3a8d161c35cb90f0909c0531e41d0ff4598e10dfecd1e8f1ac74a3c5d6ef170" }, "pipfile-spec": 6, "requires": { @@ -59,6 +59,14 @@ "index": "pypi", "version": "==3.7.3" }, + "apscheduler": { + "hashes": [ + "sha256:3bb5229eed6fbbdafc13ce962712ae66e175aa214c69bed35a06bffcf0c5e244", + "sha256:e8b1ecdb4c7cb2818913f766d5898183c7cb8936680710a4d3a966e02262e526" + ], + "index": "pypi", + "version": "==3.6.3" + }, "astroid": { "hashes": [ "sha256:4c17cea3e592c21b6e222f673868961bad77e1f985cb1694ed077475a89229c1", @@ -412,36 +420,36 @@ }, "pillow": { "hashes": [ - "sha256:006de60d7580d81f4a1a7e9f0173dc90a932e3905cc4d47ea909bc946302311a", - "sha256:0a2e8d03787ec7ad71dc18aec9367c946ef8ef50e1e78c71f743bc3a770f9fae", - "sha256:0eeeae397e5a79dc088d8297a4c2c6f901f8fb30db47795113a4a605d0f1e5ce", - "sha256:11c5c6e9b02c9dac08af04f093eb5a2f84857df70a7d4a6a6ad461aca803fb9e", - "sha256:2fb113757a369a6cdb189f8df3226e995acfed0a8919a72416626af1a0a71140", - "sha256:4b0ef2470c4979e345e4e0cc1bbac65fda11d0d7b789dbac035e4c6ce3f98adb", - "sha256:59e903ca800c8cfd1ebe482349ec7c35687b95e98cefae213e271c8c7fffa021", - "sha256:5abd653a23c35d980b332bc0431d39663b1709d64142e3652890df4c9b6970f6", - "sha256:5f9403af9c790cc18411ea398a6950ee2def2a830ad0cfe6dc9122e6d528b302", - "sha256:6b4a8fd632b4ebee28282a9fef4c341835a1aa8671e2770b6f89adc8e8c2703c", - "sha256:6c1aca8231625115104a06e4389fcd9ec88f0c9befbabd80dc206c35561be271", - "sha256:795e91a60f291e75de2e20e6bdd67770f793c8605b553cb6e4387ce0cb302e09", - "sha256:7ba0ba61252ab23052e642abdb17fd08fdcfdbbf3b74c969a30c58ac1ade7cd3", - "sha256:7c9401e68730d6c4245b8e361d3d13e1035cbc94db86b49dc7da8bec235d0015", - "sha256:81f812d8f5e8a09b246515fac141e9d10113229bc33ea073fec11403b016bcf3", - "sha256:895d54c0ddc78a478c80f9c438579ac15f3e27bf442c2a9aa74d41d0e4d12544", - "sha256:8de332053707c80963b589b22f8e0229f1be1f3ca862a932c1bcd48dafb18dd8", - "sha256:92c882b70a40c79de9f5294dc99390671e07fc0b0113d472cbea3fde15db1792", - "sha256:95edb1ed513e68bddc2aee3de66ceaf743590bf16c023fb9977adc4be15bd3f0", - "sha256:b63d4ff734263ae4ce6593798bcfee6dbfb00523c82753a3a03cbc05555a9cc3", - "sha256:bd7bf289e05470b1bc74889d1466d9ad4a56d201f24397557b6f65c24a6844b8", - "sha256:cc3ea6b23954da84dbee8025c616040d9aa5eaf34ea6895a0a762ee9d3e12e11", - "sha256:cc9ec588c6ef3a1325fa032ec14d97b7309db493782ea8c304666fb10c3bd9a7", - "sha256:d3d07c86d4efa1facdf32aa878bd508c0dc4f87c48125cc16b937baa4e5b5e11", - "sha256:d8a96747df78cda35980905bf26e72960cba6d355ace4780d4bdde3b217cdf1e", - "sha256:e38d58d9138ef972fceb7aeec4be02e3f01d383723965bfcef14d174c8ccd039", - "sha256:eb472586374dc66b31e36e14720747595c2b265ae962987261f044e5cce644b5", - "sha256:fbd922f702582cb0d71ef94442bfca57624352622d75e3be7a1e7e9360b07e72" + "sha256:165c88bc9d8dba670110c689e3cc5c71dbe4bfb984ffa7cbebf1fac9554071d6", + "sha256:22d070ca2e60c99929ef274cfced04294d2368193e935c5d6febfd8b601bf865", + "sha256:2353834b2c49b95e1313fb34edf18fca4d57446675d05298bb694bca4b194174", + "sha256:39725acf2d2e9c17356e6835dccebe7a697db55f25a09207e38b835d5e1bc032", + "sha256:3de6b2ee4f78c6b3d89d184ade5d8fa68af0848f9b6b6da2b9ab7943ec46971a", + "sha256:47c0d93ee9c8b181f353dbead6530b26980fe4f5485aa18be8f1fd3c3cbc685e", + "sha256:5e2fe3bb2363b862671eba632537cd3a823847db4d98be95690b7e382f3d6378", + "sha256:604815c55fd92e735f9738f65dabf4edc3e79f88541c221d292faec1904a4b17", + "sha256:6c5275bd82711cd3dcd0af8ce0bb99113ae8911fc2952805f1d012de7d600a4c", + "sha256:731ca5aabe9085160cf68b2dbef95fc1991015bc0a3a6ea46a371ab88f3d0913", + "sha256:7612520e5e1a371d77e1d1ca3a3ee6227eef00d0a9cddb4ef7ecb0b7396eddf7", + "sha256:7916cbc94f1c6b1301ac04510d0881b9e9feb20ae34094d3615a8a7c3db0dcc0", + "sha256:81c3fa9a75d9f1afafdb916d5995633f319db09bd773cb56b8e39f1e98d90820", + "sha256:887668e792b7edbfb1d3c9d8b5d8c859269a0f0eba4dda562adb95500f60dbba", + "sha256:93a473b53cc6e0b3ce6bf51b1b95b7b1e7e6084be3a07e40f79b42e83503fbf2", + "sha256:96d4dc103d1a0fa6d47c6c55a47de5f5dafd5ef0114fa10c85a1fd8e0216284b", + "sha256:a3d3e086474ef12ef13d42e5f9b7bbf09d39cf6bd4940f982263d6954b13f6a9", + "sha256:b02a0b9f332086657852b1f7cb380f6a42403a6d9c42a4c34a561aa4530d5234", + "sha256:b09e10ec453de97f9a23a5aa5e30b334195e8d2ddd1ce76cc32e52ba63c8b31d", + "sha256:b6f00ad5ebe846cc91763b1d0c6d30a8042e02b2316e27b05de04fa6ec831ec5", + "sha256:bba80df38cfc17f490ec651c73bb37cd896bc2400cfba27d078c2135223c1206", + "sha256:c3d911614b008e8a576b8e5303e3db29224b455d3d66d1b2848ba6ca83f9ece9", + "sha256:ca20739e303254287138234485579b28cb0d524401f83d5129b5ff9d606cb0a8", + "sha256:cb192176b477d49b0a327b2a5a4979552b7a58cd42037034316b8018ac3ebb59", + "sha256:cdbbe7dff4a677fb555a54f9bc0450f2a21a93c5ba2b44e09e54fcb72d2bd13d", + "sha256:d355502dce85ade85a2511b40b4c61a128902f246504f7de29bbeec1ae27933a", + "sha256:dc577f4cfdda354db3ae37a572428a90ffdbe4e51eda7849bf442fb803f09c9b", + "sha256:dd9eef866c70d2cbbea1ae58134eaffda0d4bfea403025f4db6859724b18ab3d" ], - "version": "==8.0.1" + "version": "==8.1.0" }, "psycopg2": { "hashes": [ @@ -492,14 +500,21 @@ "index": "pypi", "version": "==0.15.0" }, + "pytz": { + "hashes": [ + "sha256:16962c5fb8db4a8f63a26646d8886e9d769b6c511543557bc84e9569fb9a9cb4", + "sha256:180befebb1927b16f6b57101720075a984c019ac16b1b7575673bea42c6c3da5" + ], + "version": "==2020.5" + }, "requests": { "hashes": [ - "sha256:7f1a0b932f4a60a1a65caa4263921bb7d9ee911957e0ae4a23a6dd08185ad5f8", - "sha256:e786fa28d8c9154e6a4de5d46a1d921b8749f8b74e28bde23768e5e16eece998" + "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804", + "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e" ], "index": "pypi", "markers": "python_version >= '3.0'", - "version": "==2.25.0" + "version": "==2.25.1" }, "six": { "hashes": [ @@ -570,6 +585,13 @@ ], "version": "==3.7.4.3" }, + "tzlocal": { + "hashes": [ + "sha256:643c97c5294aedc737780a49d9df30889321cbe1204eac2c2ec6134035a92e44", + "sha256:e2cb6c6b5b604af38597403e9852872d7f534962ae2954c7f35efcb1ccacf4a4" + ], + "version": "==2.1" + }, "urllib3": { "hashes": [ "sha256:19188f96923873c92ccb987120ec4acaa12f0461fa9ce5d3d0772bc965a39e08", diff --git a/capsulflask/__init__.py b/capsulflask/__init__.py index 21dc047..eba8ed9 100644 --- a/capsulflask/__init__.py +++ b/capsulflask/__init__.py @@ -1,8 +1,10 @@ import logging from logging.config import dictConfig as logging_dict_config +import atexit import os import hashlib +import requests import stripe from dotenv import load_dotenv, find_dotenv @@ -11,11 +13,13 @@ from flask_mail import Mail from flask import render_template from flask import url_for from flask import current_app +from apscheduler.schedulers.background import BackgroundScheduler from capsulflask import hub_model, spoke_model, cli from capsulflask.btcpay import client as btcpay from capsulflask.http_client import MyHTTPClient + load_dotenv(find_dotenv()) app = Flask(__name__) @@ -32,7 +36,6 @@ app.config.from_mapping( SPOKE_HOST_ID=os.environ.get("SPOKE_HOST_ID", default="default"), SPOKE_HOST_TOKEN=os.environ.get("SPOKE_HOST_TOKEN", default="default"), HUB_TOKEN=os.environ.get("HUB_TOKEN", default="default"), - HUB_URL=os.environ.get("HUB_URL", default="https://capsul.org"), DATABASE_URL=os.environ.get("DATABASE_URL", default="sql://postgres:dev@localhost:5432/postgres"), DATABASE_SCHEMA=os.environ.get("DATABASE_SCHEMA", default="public"), @@ -56,6 +59,8 @@ app.config.from_mapping( BTCPAY_URL=os.environ.get("BTCPAY_URL", default="https://btcpay.cyberia.club") ) +app.config['HUB_URL'] = os.environ.get("HUB_URL", default=app.config['BASE_URL']) + logging_dict_config({ 'version': 1, 'formatters': {'default': { @@ -89,6 +94,19 @@ if app.config['HUB_MODE_ENABLED']: if app.config['HUB_MODEL'] == "capsul-flask": app.config['HUB_MODEL'] = hub_model.CapsulFlaskHub() + + # debug mode (flask reloader) runs two copies of the app. When running in debug mode, + # we only want to start the scheduler one time. + if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': + scheduler = BackgroundScheduler() + heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task" + heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"} + heartbeat_task = lambda: requests.post(heartbeat_task_url, headers=heartbeat_task_headers) + scheduler.add_job(func=heartbeat_task, trigger="interval", seconds=5) + scheduler.start() + + atexit.register(lambda: scheduler.shutdown()) + else: app.config['HUB_MODEL'] = hub_model.MockHub() @@ -107,6 +125,8 @@ if app.config['HUB_MODE_ENABLED']: app.add_url_rule("/", endpoint="index") + + if app.config['SPOKE_MODE_ENABLED']: if app.config['SPOKE_MODEL'] == "shell-scripts": diff --git a/capsulflask/cli.py b/capsulflask/cli.py index 09b5d47..790627a 100644 --- a/capsulflask/cli.py +++ b/capsulflask/cli.py @@ -11,7 +11,8 @@ from flask import current_app from psycopg2 import ProgrammingError from flask_mail import Message -from capsulflask.db import get_model, my_exec_info_message +from capsulflask.db import get_model +from capsulflask.shared import my_exec_info_message from capsulflask.console import get_account_balance bp = Blueprint('cli', __name__) diff --git a/capsulflask/console.py b/capsulflask/console.py index d5fd7fb..5630158 100644 --- a/capsulflask/console.py +++ b/capsulflask/console.py @@ -15,7 +15,8 @@ from nanoid import generate from capsulflask.metrics import durations as metric_durations from capsulflask.auth import account_required -from capsulflask.db import get_model, my_exec_info_message +from capsulflask.db import get_model +from capsulflask.shared import my_exec_info_message from capsulflask.payment import poll_btcpay_session from capsulflask import cli diff --git a/capsulflask/db.py b/capsulflask/db.py index 60cc103..09ad5c8 100644 --- a/capsulflask/db.py +++ b/capsulflask/db.py @@ -40,7 +40,7 @@ def init_app(app): hasSchemaVersionTable = False actionWasTaken = False schemaVersion = 0 - desiredSchemaVersion = 8 + desiredSchemaVersion = 9 cursor = connection.cursor() @@ -126,5 +126,4 @@ def close_db(e=None): db_model.cursor.close() current_app.config['PSYCOPG2_CONNECTION_POOL'].putconn(db_model.connection) -def my_exec_info_message(exec_info): - return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1]) + diff --git a/capsulflask/db_model.py b/capsulflask/db_model.py index a641dae..240c887 100644 --- a/capsulflask/db_model.py +++ b/capsulflask/db_model.py @@ -4,12 +4,9 @@ from nanoid import generate from flask import current_app from typing import List -from capsulflask.hub_model import HTTPResult -class OnlineHost: - def __init__(self, id: str, url: str): - self.id = id - self.url = url +from capsulflask.shared import OnlineHost + class DBModel: #def __init__(self, connection: Psycopg2Connection, cursor: Psycopg2Cursor): @@ -277,15 +274,19 @@ class DBModel: # ------ HOSTS --------- def authorized_for_host(self, id, token) -> bool: - self.cursor.execute("SELECT id FROM hosts WHERE id = %s token = %s", (id, token)) + self.cursor.execute("SELECT id FROM hosts WHERE id = %s AND token = %s", (id, token)) return self.cursor.fetchone() != None def host_heartbeat(self, id) -> None: self.cursor.execute("UPDATE hosts SET last_health_check = NOW() WHERE id = %s", (id,)) self.connection.commit() + def get_all_hosts(self) -> List[OnlineHost]: + self.cursor.execute("SELECT id, https_url FROM hosts") + return list(map(lambda x: OnlineHost(id=x[0], url=x[1]), self.cursor.fetchall())) + def get_online_hosts(self) -> List[OnlineHost]: - self.cursor.execute("SELECT id, https_url FROM hosts WHERE last_health_check > NOW() - INTERVAL '10 seconds'") + self.cursor.execute("SELECT id, https_url FROM hosts WHERE last_health_check > NOW() - INTERVAL '20 seconds'") return list(map(lambda x: OnlineHost(id=x[0], url=x[1]), self.cursor.fetchall())) def create_operation(self, online_hosts: List[OnlineHost], email: str, payload: str) -> int: diff --git a/capsulflask/http_client.py b/capsulflask/http_client.py index ea03c89..30b76a9 100644 --- a/capsulflask/http_client.py +++ b/capsulflask/http_client.py @@ -1,12 +1,14 @@ import sys import json +import itertools +import time +import threading import aiohttp import asyncio from flask import current_app -from capsulflask.db import my_exec_info_message -from capsulflask.db_model import OnlineHost +from capsulflask.shared import OnlineHost, my_exec_info_message from typing import List class HTTPResult: @@ -16,14 +18,23 @@ class HTTPResult: class MyHTTPClient: def __init__(self, timeout_seconds = 5): - self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) - self.event_loop = asyncio.get_event_loop() + self.timeout_seconds = timeout_seconds + self.client_session = None - def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): - self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body)) + + def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]: + future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header)) + return future.result() def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: - self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body)) + future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header)) + return future.result() + + def get_client_session(self): + if not self.client_session: + self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout_seconds)) + + return self.client_session async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: response = None @@ -31,12 +42,11 @@ class MyHTTPClient: headers = {} if authorization_header != None: headers['Authorization'] = authorization_header - response = await self.client_session.request( + response = await self.get_client_session().request( method=method, url=url, json=body, headers=headers, - auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), verify_ssl=True, ) except: @@ -59,15 +69,92 @@ class MyHTTPClient: return HTTPResult(response.status, response_body) - async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): + async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]: tasks = [] # append to tasks in the same order as online_hosts for host in online_hosts: tasks.append( - self.post_json(url=host.url, body=body) + self.post_json(url=f"{host.url}/{url_suffix}", body=body, authorization_header=authorization_header) ) # gather is like Promise.all from javascript, it returns a future which resolves to an array of results # in the same order as the tasks that we passed in -- which were in the same order as online_hosts results = await asyncio.gather(*tasks) - return results \ No newline at end of file + return results + + +# i lifted this direct from https://stackoverflow.com/a/58616001 +# this is the bridge between Flask's one-thread-per-request world +# and aiohttp's event-loop based world + +class EventLoopThread(threading.Thread): + loop = None + _count = itertools.count(0) + + def __init__(self): + name = f"{type(self).__name__}-{next(self._count)}" + super().__init__(name=name, daemon=True) + + def __repr__(self): + loop, r, c, d = self.loop, False, True, False + if loop is not None: + r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug() + return ( + f"<{type(self).__name__} {self.name} id={self.ident} " + f"running={r} closed={c} debug={d}>" + ) + + def run(self): + self.loop = loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_forever() + finally: + try: + shutdown_asyncgens = loop.shutdown_asyncgens() + except AttributeError: + pass + else: + loop.run_until_complete(shutdown_asyncgens) + loop.close() + asyncio.set_event_loop(None) + + def stop(self): + loop, self.loop = self.loop, None + if loop is None: + return + loop.call_soon_threadsafe(loop.stop) + self.join() + +_lock = threading.Lock() +_loop_thread = None + +def get_event_loop(): + global _loop_thread + + if _loop_thread is None: + with _lock: + if _loop_thread is None: + _loop_thread = EventLoopThread() + _loop_thread.start() + # give the thread up to a second to produce a loop + deadline = time.time() + 1 + while not _loop_thread.loop and time.time() < deadline: + time.sleep(0.001) + + return _loop_thread.loop + +def stop_event_loop(): + global _loop_thread + with _lock: + if _loop_thread is not None: + _loop_thread.stop() + _loop_thread = None + +def run_coroutine(coro): + """Run the coroutine in the event loop running in a separate thread + Returns a Future, call Future.result() to get the output + """ + + return asyncio.run_coroutine_threadsafe(coro, get_event_loop()) \ No newline at end of file diff --git a/capsulflask/hub_api.py b/capsulflask/hub_api.py index b35bf78..cb47614 100644 --- a/capsulflask/hub_api.py +++ b/capsulflask/hub_api.py @@ -4,23 +4,46 @@ from flask import current_app from flask import request from werkzeug.exceptions import abort -from capsulflask.db import get_model, my_exec_info_message +from capsulflask.db import get_model +from capsulflask.shared import my_exec_info_message, authorized_as_hub bp = Blueprint("hub", __name__, url_prefix="/hub") def authorized_for_host(id): - auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") - return get_model().authorized_for_host(id, auth_header_value) + if request.headers.get('Authorization'): + auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") + return get_model().authorized_for_host(id, auth_header_value) + return False -@bp.route("/heartbeat/", methods=("POST")) +@bp.route("/heartbeat-task", methods=("POST",)) +def ping_all_hosts_task(): + if authorized_as_hub(request.headers): + all_hosts = get_model().get_all_hosts() + current_app.logger.info(f"pinging {len(all_hosts)} hosts...") + authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" + results = current_app.config["HTTP_CLIENT"].make_requests_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header) + for i in range(len(all_hosts)): + host = all_hosts[i] + result = results[i] + current_app.logger.info(f"response from {host.id} ({host.url}): {result.status_code} {result.body}") + if result.status_code == 200: + get_model().host_heartbeat(host.id) + + return "ok" + else: + current_app.logger.info(f"/hub/heartbeat-task returned 401: invalid hub token") + return abort(401, "invalid hub token") + + +@bp.route("/heartbeat/", methods=("POST",)) def heartbeat(host_id): if authorized_for_host(host_id): - get_model().host_heartbeat(host_id) + return "ok" else: current_app.logger.info(f"/hub/heartbeat/{host_id} returned 401: invalid token") return abort(401, "invalid host id or token") -@bp.route("/claim-operation//", methods=("POST")) +@bp.route("/claim-operation//", methods=("POST",)) def claim_operation(operation_id: int, host_id: str): if authorized_for_host(host_id): exists = get_model().host_operation_exists(operation_id, host_id) diff --git a/capsulflask/hub_model.py b/capsulflask/hub_model.py index 230e345..d5d05fc 100644 --- a/capsulflask/hub_model.py +++ b/capsulflask/hub_model.py @@ -12,33 +12,9 @@ from time import sleep from os.path import join from subprocess import run -from capsulflask.db_model import OnlineHost -from capsulflask.spoke_model import validate_capsul_id -from capsulflask.db import get_model, my_exec_info_message +from capsulflask.db import get_model from capsulflask.http_client import HTTPResult - -class VirtualMachine: - def __init__(self, id, host, ipv4=None, ipv6=None): - self.id = id - self.host = host - self.ipv4 = ipv4 - self.ipv6 = ipv6 - -class VirtualizationInterface: - def capacity_avaliable(self, additional_ram_bytes: int) -> bool: - pass - - def get(self, id: str) -> VirtualMachine: - pass - - def list_ids(self) -> list: - pass - - def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list): - pass - - def destroy(self, email: str, id: str): - pass +from capsulflask.shared import VirtualizationInterface, VirtualMachine, OnlineHost, validate_capsul_id, my_exec_info_message class MockHub(VirtualizationInterface): def capacity_avaliable(self, additional_ram_bytes): @@ -61,11 +37,11 @@ class MockHub(VirtualizationInterface): class CapsulFlaskHub(VirtualizationInterface): - + def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: operation_id = get_model().create_operation(hosts, payload) authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" - results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload, authorization_header=authorization_header) + results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, "/spoke/operation", payload, authorization_header=authorization_header) for i in range(len(hosts)): host = hosts[i] result = results[i] @@ -137,7 +113,7 @@ class CapsulFlaskHub(VirtualizationInterface): host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="get", id=id)) - op = await self.generic_operation([host], payload, True) + op = self.generic_operation([host], payload, True) results = op[1] for result in results: try: @@ -152,7 +128,7 @@ class CapsulFlaskHub(VirtualizationInterface): def list_ids(self) -> list: online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="list_ids")) - op = await self.generic_operation(online_hosts, payload, False) + op = self.generic_operation(online_hosts, payload, False) operation_id = op[0] results = op[1] to_return = [] @@ -197,7 +173,7 @@ class CapsulFlaskHub(VirtualizationInterface): memory_mb=memory_mb, ssh_public_keys=ssh_public_keys, )) - op = await self.generic_operation(online_hosts, payload, False) + op = self.generic_operation(online_hosts, payload, False) operation_id = op[0] results = op[1] number_of_assigned = 0 @@ -230,7 +206,7 @@ class CapsulFlaskHub(VirtualizationInterface): host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="destroy", id=id)) - op = await self.generic_operation([host], payload, True) + op = self.generic_operation([host], payload, True) results = op[1] result_json_string = "" for result in results: diff --git a/capsulflask/payment.py b/capsulflask/payment.py index 2f65015..6219de5 100644 --- a/capsulflask/payment.py +++ b/capsulflask/payment.py @@ -20,7 +20,8 @@ from werkzeug.exceptions import abort from capsulflask.auth import account_required -from capsulflask.db import get_model, my_exec_info_message +from capsulflask.db import get_model +from capsulflask.shared import my_exec_info_message bp = Blueprint("payment", __name__, url_prefix="/payment") diff --git a/capsulflask/schema_migrations/09_up_introduce_hosts.sql b/capsulflask/schema_migrations/09_up_introduce_hosts.sql index feac791..2f6628c 100644 --- a/capsulflask/schema_migrations/09_up_introduce_hosts.sql +++ b/capsulflask/schema_migrations/09_up_introduce_hosts.sql @@ -7,7 +7,7 @@ CREATE TABLE hosts ( token TEXT NOT NULL ); -INSERT INTO hosts (id, token) VALUES ('baikal', 'changeme'); +INSERT INTO hosts (id, https_url, token) VALUES ('baikal', 'http://localhost:5000', 'changeme'); ALTER TABLE vms ADD COLUMN host TEXT REFERENCES hosts(id) ON DELETE RESTRICT DEFAULT 'baikal'; @@ -16,7 +16,7 @@ CREATE TABLE operations ( id SERIAL PRIMARY KEY , email TEXT REFERENCES accounts(email) ON DELETE RESTRICT, created TIMESTAMP NOT NULL DEFAULT NOW(), - payload TEXT NOT NULL, + payload TEXT NOT NULL ); CREATE TABLE host_operation ( diff --git a/capsulflask/shared.py b/capsulflask/shared.py new file mode 100644 index 0000000..867bb95 --- /dev/null +++ b/capsulflask/shared.py @@ -0,0 +1,44 @@ +import re + +from flask import current_app + +class OnlineHost: + def __init__(self, id: str, url: str): + self.id = id + self.url = url + +class VirtualMachine: + def __init__(self, id, host, ipv4=None, ipv6=None): + self.id = id + self.host = host + self.ipv4 = ipv4 + self.ipv6 = ipv6 + +class VirtualizationInterface: + def capacity_avaliable(self, additional_ram_bytes: int) -> bool: + pass + + def get(self, id: str) -> VirtualMachine: + pass + + def list_ids(self) -> list: + pass + + def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list): + pass + + def destroy(self, email: str, id: str): + pass + +def validate_capsul_id(id): + if not re.match(r"^(cvm|capsul)-[a-z0-9]{10}$", id): + raise ValueError(f"vm id \"{id}\" must match \"^capsul-[a-z0-9]{{10}}$\"") + +def authorized_as_hub(headers): + if headers.get('Authorization'): + auth_header_value = headers.get('Authorization').replace("Bearer ", "") + return auth_header_value == current_app.config["HUB_TOKEN"] + return False + +def my_exec_info_message(exec_info): + return "{}: {}".format(".".join([exec_info[0].__module__, exec_info[0].__name__]), exec_info[1]) \ No newline at end of file diff --git a/capsulflask/spoke_api.py b/capsulflask/spoke_api.py index 12a7dcc..4d12eac 100644 --- a/capsulflask/spoke_api.py +++ b/capsulflask/spoke_api.py @@ -7,17 +7,13 @@ from flask import request from flask.json import jsonify from werkzeug.exceptions import abort -from capsulflask.db import my_exec_info_message +from capsulflask.shared import my_exec_info_message, authorized_as_hub bp = Blueprint("spoke", __name__, url_prefix="/spoke") -def authorized_as_hub(id): - auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") - return auth_header_value == current_app.config["HUB_TOKEN"] - -@bp.route("/heartbeat", methods=("POST")) +@bp.route("/heartbeat", methods=("POST",)) def heartbeat(): - if authorized_as_hub(id): + if authorized_as_hub(request.headers): url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}" authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header) @@ -36,9 +32,9 @@ def heartbeat(): current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token") return abort(401, "invalid hub token") -@bp.route("/operation", methods=("POST")) +@bp.route("/operation", methods=("POST",)) def operation(): - if authorized_as_hub(id): + if authorized_as_hub(request.headers): request_body = request.json() handlers = { "capacity_avaliable": handle_capacity_avaliable, diff --git a/capsulflask/spoke_model.py b/capsulflask/spoke_model.py index 5b65868..15ab6e0 100644 --- a/capsulflask/spoke_model.py +++ b/capsulflask/spoke_model.py @@ -8,11 +8,8 @@ from subprocess import run from capsulflask.db import get_model -from capsulflask.hub_model import VirtualizationInterface, VirtualMachine +from capsulflask.shared import VirtualizationInterface, VirtualMachine, validate_capsul_id -def validate_capsul_id(id): - if not re.match(r"^(cvm|capsul)-[a-z0-9]{10}$", id): - raise ValueError(f"vm id \"{id}\" must match \"^capsul-[a-z0-9]{{10}}$\"") class MockSpoke(VirtualizationInterface): def capacity_avaliable(self, additional_ram_bytes):