From 42a8e0c8860d68fded56bd4a9a3e28c12f9de5a2 Mon Sep 17 00:00:00 2001 From: forest Date: Sun, 3 Jan 2021 15:19:29 -0600 Subject: [PATCH] http client class --- capsulflask/__init__.py | 3 +- capsulflask/http_client.py | 69 ++++++++++++++++++++++++++++++++++ capsulflask/hub_model.py | 77 +++++++++----------------------------- capsulflask/spoke_api.py | 7 +++- capsulflask/spoke_model.py | 29 ++------------ 5 files changed, 99 insertions(+), 86 deletions(-) create mode 100644 capsulflask/http_client.py diff --git a/capsulflask/__init__.py b/capsulflask/__init__.py index faeba94..21dc047 100644 --- a/capsulflask/__init__.py +++ b/capsulflask/__init__.py @@ -14,6 +14,7 @@ from flask import current_app from capsulflask import hub_model, spoke_model, cli from capsulflask.btcpay import client as btcpay +from capsulflask.http_client import MyHTTPClient load_dotenv(find_dotenv()) @@ -81,7 +82,7 @@ stripe.api_key = app.config['STRIPE_SECRET_KEY'] stripe.api_version = app.config['STRIPE_API_VERSION'] app.config['FLASK_MAIL_INSTANCE'] = Mail(app) - +app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=5) app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY']) if app.config['HUB_MODE_ENABLED']: diff --git a/capsulflask/http_client.py b/capsulflask/http_client.py new file mode 100644 index 0000000..d9b7c26 --- /dev/null +++ b/capsulflask/http_client.py @@ -0,0 +1,69 @@ + +import sys +import json + +import aiohttp +import asyncio +from flask import current_app +from capsulflask.db import my_exec_info_message +from capsulflask.db_model import OnlineHost +from typing import List + +class HTTPResult: + def __init__(self, status_code, body=None): + self.status_code = status_code + self.body = body + +class MyHTTPClient: + def __init__(self, timeout_seconds = 5): + self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) + self.event_loop = asyncio.get_event_loop() + + def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): + self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body)) + + def post_json_sync(self, method: str, url: str, body: str) -> HTTPResult: + self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body)) + + async def post_json(self, method: str, url: str, body: str) -> HTTPResult: + response = None + try: + response = await self.client_session.request( + method=method, + url=url, + json=body, + auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), + verify_ssl=True, + ) + except: + error_message = my_exec_info_message(sys.exc_info()) + response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) + current_app.logger.error(f""" + error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" + ) + return HTTPResult(-1, response_body) + + response_body = None + try: + response_body = await response.text() + except: + error_message = my_exec_info_message(sys.exc_info()) + response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) + current_app.logger.error(f""" + error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" + ) + + return HTTPResult(response.status, response_body) + + async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): + tasks = [] + # append to tasks in the same order as online_hosts + for host in online_hosts: + tasks.append( + self.post_json(method="POST", url=host.url, body=body) + ) + # gather is like Promise.all from javascript, it returns a future which resolves to an array of results + # in the same order as the tasks that we passed in -- which were in the same order as online_hosts + results = await asyncio.gather(*tasks) + + return results \ No newline at end of file diff --git a/capsulflask/hub_model.py b/capsulflask/hub_model.py index 6489327..0669624 100644 --- a/capsulflask/hub_model.py +++ b/capsulflask/hub_model.py @@ -13,16 +13,18 @@ from os.path import join from subprocess import run from capsulflask.db_model import OnlineHost -from capsulflask.spoke_model import VirtualMachine from capsulflask.spoke_model import validate_capsul_id from capsulflask.db import get_model, my_exec_info_message +from capsulflask.http_client import HTTPResult -class HTTPResult: - def __init__(self, status_code, body=None): - self.status_code = status_code - self.body = body +class VirtualMachine: + def __init__(self, id, host, ipv4=None, ipv6=None): + self.id = id + self.host = host + self.ipv4 = ipv4 + self.ipv6 = ipv6 -class HubInterface: +class VirtualizationInterface: def capacity_avaliable(self, additional_ram_bytes: int) -> bool: pass @@ -38,7 +40,7 @@ class HubInterface: def destroy(self, email: str, id: str): pass -class MockHub(HubInterface): +class MockHub(VirtualizationInterface): def capacity_avaliable(self, additional_ram_bytes): return True @@ -58,58 +60,11 @@ class MockHub(HubInterface): current_app.logger.info(f"mock destroy: {id} for {email}") -class CapsulFlaskHub(HubInterface): - - - async def post_json(self, method: str, url: str, body: str, session: aiohttp.ClientSession) -> HTTPResult: - response = None - try: - response = await session.request( - method=method, - url=url, - json=body, - auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), - verify_ssl=True, - ) - except: - error_message = my_exec_info_message(sys.exc_info()) - response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) - current_app.logger.error(f""" - error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" - ) - return HTTPResult(-1, response_body) - - response_body = None - try: - response_body = await response.text() - except: - error_message = my_exec_info_message(sys.exc_info()) - response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) - current_app.logger.error(f""" - error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" - ) - - return HTTPResult(response.status, response_body) - - async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): - timeout_seconds = 5 - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) as session: - tasks = [] - # append to tasks in the same order as online_hosts - for host in online_hosts: - tasks.append( - self.post_json(method="POST", url=host.url, body=body, session=session) - ) - # gather is like Promise.all from javascript, it returns a future which resolves to an array of results - # in the same order as the tasks that we passed in -- which were in the same order as online_hosts - results = await asyncio.gather(*tasks) - - return results +class CapsulFlaskHub(VirtualizationInterface): - - async def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: + def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: operation_id = get_model().create_operation(hosts, payload) - results = await self.make_requests(hosts, payload) + results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload) for i in range(len(hosts)): host = hosts[i] result = results[i] @@ -130,6 +85,7 @@ class CapsulFlaskHub(HubInterface): result_has_status = False result_has_valid_status = False assignment_status = "invalid_response_from_host" + error_message = "" try: if immediate_mode: task_result = result.body @@ -140,6 +96,8 @@ class CapsulFlaskHub(HubInterface): result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses if result_has_valid_status: assignment_status = result_body['assignment_status'] + if result_is_dict and "error_message" in result_body: + error_message = result_body['error_message'] except: pass @@ -149,6 +107,7 @@ class CapsulFlaskHub(HubInterface): result_is_dict: {result_is_dict} result_has_status: {result_has_status} result_has_valid_status: {result_has_valid_status} + error_message: {error_message} """ ) @@ -156,10 +115,10 @@ class CapsulFlaskHub(HubInterface): return results - async def capacity_avaliable(self, additional_ram_bytes): + def capacity_avaliable(self, additional_ram_bytes): online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes)) - op = await self.generic_operation(online_hosts, payload, True) + op = self.generic_operation(online_hosts, payload, True) results = op[1] for result in results: try: diff --git a/capsulflask/spoke_api.py b/capsulflask/spoke_api.py index cc3f67f..467cbc3 100644 --- a/capsulflask/spoke_api.py +++ b/capsulflask/spoke_api.py @@ -41,7 +41,12 @@ def operation(): types_csv = ", ".join(handlers.keys()) if isinstance(request_body, dict) and 'type' in request_body: if request_body['type'] in handlers: - return handlers[request_body['type']](request_body) + try: + return handlers[request_body['type']](request_body) + except: + error_message = my_exec_info_message(sys.exc_info()) + current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") + return jsonify(dict(error_message=error_message)) else: error_message = f"'type' must be one of {types_csv}" else: diff --git a/capsulflask/spoke_model.py b/capsulflask/spoke_model.py index e01b390..5b65868 100644 --- a/capsulflask/spoke_model.py +++ b/capsulflask/spoke_model.py @@ -8,34 +8,13 @@ from subprocess import run from capsulflask.db import get_model +from capsulflask.hub_model import VirtualizationInterface, VirtualMachine + def validate_capsul_id(id): if not re.match(r"^(cvm|capsul)-[a-z0-9]{10}$", id): raise ValueError(f"vm id \"{id}\" must match \"^capsul-[a-z0-9]{{10}}$\"") -class VirtualMachine: - def __init__(self, id, host, ipv4=None, ipv6=None): - self.id = id - self.host = host - self.ipv4 = ipv4 - self.ipv6 = ipv6 - -class SpokeInterface: - def capacity_avaliable(self, additional_ram_bytes: int) -> bool: - pass - - def get(self, id: str) -> VirtualMachine: - pass - - def list_ids(self) -> list: - pass - - def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list): - pass - - def destroy(self, email: str, id: str): - pass - -class MockSpoke(SpokeInterface): +class MockSpoke(VirtualizationInterface): def capacity_avaliable(self, additional_ram_bytes): return True @@ -54,7 +33,7 @@ class MockSpoke(SpokeInterface): def destroy(self, email: str, id: str): current_app.logger.info(f"mock destroy: {id} for {email}") -class ShellScriptSpoke(SpokeInterface): +class ShellScriptSpoke(VirtualizationInterface): def validate_completed_process(self, completedProcess, email=None): emailPart = ""