diff --git a/capsulflask/__init__.py b/capsulflask/__init__.py index eba8ed9..388f263 100644 --- a/capsulflask/__init__.py +++ b/capsulflask/__init__.py @@ -102,7 +102,7 @@ if app.config['HUB_MODE_ENABLED']: heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task" heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"} heartbeat_task = lambda: requests.post(heartbeat_task_url, headers=heartbeat_task_headers) - scheduler.add_job(func=heartbeat_task, trigger="interval", seconds=5) + scheduler.add_job(name="heartbeat_task", func=heartbeat_task, trigger="interval", seconds=5) scheduler.start() atexit.register(lambda: scheduler.shutdown()) diff --git a/capsulflask/console.py b/capsulflask/console.py index 5630158..b8d3b18 100644 --- a/capsulflask/console.py +++ b/capsulflask/console.py @@ -27,16 +27,20 @@ def makeCapsulId(): return f"capsul-{lettersAndNumbers}" def double_check_capsul_address(id, ipv4): - try: - result = current_app.config["HUB_MODEL"].get(id) - if result.ipv4 != ipv4: - ipv4 = result.ipv4 - get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4) - except: - current_app.logger.error(f""" - the virtualization model threw an error in double_check_capsul_address of {id}: - {my_exec_info_message(sys.exc_info())}""" - ) + result = current_app.config["HUB_MODEL"].get(id) + if result.ipv4 != ipv4: + ipv4 = result.ipv4 + get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4) + # try: + # result = current_app.config["HUB_MODEL"].get(id) + # if result.ipv4 != ipv4: + # ipv4 = result.ipv4 + # get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4) + # except: + # current_app.logger.error(f""" + # the virtualization model threw an error in double_check_capsul_address of {id}: + # {my_exec_info_message(sys.exc_info())}""" + # ) return ipv4 diff --git a/capsulflask/db_model.py b/capsulflask/db_model.py index 240c887..6bad609 100644 --- a/capsulflask/db_model.py +++ b/capsulflask/db_model.py @@ -13,6 +13,7 @@ class DBModel: def __init__(self, connection, cursor): self.connection = connection self.cursor = cursor + self.cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;") # ------ LOGIN --------- @@ -300,18 +301,29 @@ class DBModel: self.connection.commit() return operation_id - def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str): - self.cursor.execute( - "UPDATE host_operation SET assignment_status = %s WHERE host = %s AND operation = %s", - (assignment_status, host_id, operation_id) - ) + def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str, result: str): + if assignment_status and not result: + self.cursor.execute( + "UPDATE host_operation SET assignment_status = %s, assigned = NOW() WHERE host = %s AND operation = %s", + (assignment_status, host_id, operation_id) + ) + elif not assignment_status and result: + self.cursor.execute( + "UPDATE host_operation SET results = %s, completed = NOW() WHERE host = %s AND operation = %s", + (result, host_id, operation_id) + ) + elif assignment_status and result: + self.cursor.execute( + "UPDATE host_operation SET assignment_status = %s, assigned = NOW(), results = %s, completed = NOW() WHERE host = %s AND operation = %s", + (assignment_status, result, host_id, operation_id) + ) self.connection.commit() - def host_of_capsul(self, capsul_id: str): - self.cursor.execute("SELECT host from vms where id = %s", (capsul_id,)) + def host_of_capsul(self, capsul_id: str) -> OnlineHost: + self.cursor.execute("SELECT hosts.id, hosts.https_url from vms JOIN hosts on hosts.id = vms.host where vms.id = %s", (capsul_id,)) row = self.cursor.fetchone() if row: - return row[0] + return OnlineHost(row[0], row[1]) else: return None @@ -320,8 +332,9 @@ class DBModel: return len(self.cursor.fetchall()) != 0 def claim_operation(self, operation_id: int, host_id: str) -> bool: + # have to make a new cursor to set isolation level + # cursor = self.connection.cursor() self.cursor.execute(""" - SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; BEGIN TRANSACTION; UPDATE host_operation SET assignment_status = 'assigned' WHERE host = %s AND operation = %s AND operation != ( @@ -336,6 +349,7 @@ class DBModel: to_return = self.cursor.rowcount != 0 self.connection.commit() + #cursor.close() return to_return diff --git a/capsulflask/http_client.py b/capsulflask/http_client.py index 0f1d0ea..f7bb766 100644 --- a/capsulflask/http_client.py +++ b/capsulflask/http_client.py @@ -37,11 +37,14 @@ class MyHTTPClient: return self.client_session async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: + # TODO make a configuration option where this throws an error if the url does not start with https:// response = None try: headers = {} if authorization_header != None: headers['Authorization'] = authorization_header + if body: + headers['Content-Type'] = "application/json" response = await self.get_client_session().request( method=method, url=url, @@ -85,7 +88,7 @@ class MyHTTPClient: # i lifted this direct from https://stackoverflow.com/a/58616001 # this is the bridge between Flask's one-thread-per-request world -# and aiohttp's event-loop based world +# and aiohttp's event-loop based world -- it allows us to call run_coroutine from a flask request handler class EventLoopThread(threading.Thread): loop = None diff --git a/capsulflask/hub_model.py b/capsulflask/hub_model.py index d5d05fc..48a6a6c 100644 --- a/capsulflask/hub_model.py +++ b/capsulflask/hub_model.py @@ -8,6 +8,7 @@ from typing import List, Tuple import aiohttp from flask import current_app +from flask import session from time import sleep from os.path import join from subprocess import run @@ -38,14 +39,27 @@ class MockHub(VirtualizationInterface): class CapsulFlaskHub(VirtualizationInterface): + def synchronous_operation(self, hosts: List[OnlineHost], payload: str) -> List[HTTPResult]: + return self.generic_operation(hosts, payload, True)[1] + + def asynchronous_operation(self, hosts: List[OnlineHost], payload: str) -> Tuple[int, List[HTTPResult]]: + return self.generic_operation(hosts, payload, False) + def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: - operation_id = get_model().create_operation(hosts, payload) + email = session["account"] + if not email or email == "": + raise ValueError("generic_operation was called but user was not logged in") + url_path = "/spoke/operation" + operation_id = None + if not immediate_mode: + operation_id = get_model().create_operation(hosts, email, payload) + url_path = f"/spoke/operation/{operation_id}" authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" - results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, "/spoke/operation", payload, authorization_header=authorization_header) + results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, url_path, payload, authorization_header=authorization_header) + for i in range(len(hosts)): host = hosts[i] result = results[i] - task_result = None assignment_status = "pending" if result.status_code == -1: assignment_status = "no_response_from_host" @@ -64,8 +78,6 @@ class CapsulFlaskHub(VirtualizationInterface): assignment_status = "invalid_response_from_host" error_message = "" try: - if immediate_mode: - task_result = result.body result_body = json.loads(result.body) result_is_json = True result_is_dict = isinstance(result_body, dict) @@ -87,16 +99,16 @@ class CapsulFlaskHub(VirtualizationInterface): error_message: {error_message} """ ) - - get_model().update_host_operation(host.id, operation_id, assignment_status, task_result) + + if not immediate_mode: + get_model().update_host_operation(host.id, operation_id, assignment_status, None) - return results + return (operation_id, results) def capacity_avaliable(self, additional_ram_bytes): online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes)) - op = self.generic_operation(online_hosts, payload, True) - results = op[1] + results = self.synchronous_operation(online_hosts, payload) for result in results: try: result_body = json.loads(result.body) @@ -108,13 +120,12 @@ class CapsulFlaskHub(VirtualizationInterface): return False - async def get(self, id) -> VirtualMachine: + def get(self, id) -> VirtualMachine: validate_capsul_id(id) host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="get", id=id)) - op = self.generic_operation([host], payload, True) - results = op[1] + results = self.synchronous_operation([host], payload) for result in results: try: result_body = json.loads(result.body) @@ -128,9 +139,7 @@ class CapsulFlaskHub(VirtualizationInterface): def list_ids(self) -> list: online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="list_ids")) - op = self.generic_operation(online_hosts, payload, False) - operation_id = op[0] - results = op[1] + results = self.synchronous_operation(online_hosts, payload) to_return = [] for i in range(len(results)): host = online_hosts[i] @@ -145,11 +154,8 @@ class CapsulFlaskHub(VirtualizationInterface): to_return.append(id) except: all_valid = False - if all_valid: - get_model().update_host_operation(host.id, operation_id, None, result.body) - else: + if not all_valid: result_json_string = json.dumps({"error_message": "invalid capsul id returned"}) - get_model().update_host_operation(host.id, operation_id, None, result_json_string) current_app.logger.error(f"""error reading ids for list_ids operation {operation_id}, host {host.id}""") else: result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"}) @@ -173,7 +179,7 @@ class CapsulFlaskHub(VirtualizationInterface): memory_mb=memory_mb, ssh_public_keys=ssh_public_keys, )) - op = self.generic_operation(online_hosts, payload, False) + op = self.asynchronous_operation(online_hosts, payload) operation_id = op[0] results = op[1] number_of_assigned = 0 @@ -206,8 +212,7 @@ class CapsulFlaskHub(VirtualizationInterface): host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="destroy", id=id)) - op = self.generic_operation([host], payload, True) - results = op[1] + results = self.synchronous_operation([host], payload) result_json_string = "" for result in results: try: diff --git a/capsulflask/spoke_api.py b/capsulflask/spoke_api.py index 4d12eac..f85da02 100644 --- a/capsulflask/spoke_api.py +++ b/capsulflask/spoke_api.py @@ -1,5 +1,6 @@ import sys +import json import aiohttp from flask import Blueprint from flask import current_app @@ -32,10 +33,19 @@ def heartbeat(): current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token") return abort(401, "invalid hub token") +@bp.route("/operation/", methods=("POST",)) +def operation_with_id(operation_id: int): + return operation_impl(operation_id) + @bp.route("/operation", methods=("POST",)) -def operation(): +def operation_without_id(): + return operation_impl(None) + +def operation_impl(operation_id: int): if authorized_as_hub(request.headers): - request_body = request.json() + request_body_json = request.json + request_body = json.loads(request_body_json) + current_app.logger.info(f"request.json: {request_body}") handlers = { "capacity_avaliable": handle_capacity_avaliable, "get": handle_get, @@ -49,7 +59,7 @@ def operation(): if isinstance(request_body, dict) and 'type' in request_body: if request_body['type'] in handlers: try: - return handlers[request_body['type']](request_body) + return handlers[request_body['type']](operation_id, request_body) except: error_message = my_exec_info_message(sys.exc_info()) current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") @@ -66,7 +76,7 @@ def operation(): current_app.logger.info(f"/hosts/operation returned 401: invalid hub token") return abort(401, "invalid hub token") -def handle_capacity_avaliable(request_body): +def handle_capacity_avaliable(operation_id, request_body): if 'additional_ram_bytes' not in request_body: current_app.logger.info(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable") return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable") @@ -74,7 +84,7 @@ def handle_capacity_avaliable(request_body): has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes']) return jsonify(dict(assignment_status="assigned", capacity_avaliable=has_capacity)) -def handle_get(request_body): +def handle_get(operation_id, request_body): if 'id' not in request_body: current_app.logger.info(f"/hosts/operation returned 400: id is required for get") return abort(400, f"bad request; id is required for get") @@ -83,24 +93,28 @@ def handle_get(request_body): return jsonify(dict(assignment_status="assigned", id=vm.id, host=vm.host, ipv4=vm.ipv4, ipv6=vm.ipv6)) -def handle_list_ids(request_body): +def handle_list_ids(operation_id, request_body): return jsonify(dict(assignment_status="assigned", ids=current_app.config['SPOKE_MODEL'].list_ids())) -def handle_create(request_body): - parameters = ["operation_id", "email", "id", "template_image_file_name", "vcpus", "memory_mb", "ssh_public_keys"] +def handle_create(operation_id, request_body): + if not operation_id: + current_app.logger.info(f"/hosts/operation returned 400: operation_id is required for create ") + return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/") + + parameters = ["email", "id", "template_image_file_name", "vcpus", "memory_mb", "ssh_public_keys"] error_message = "" for parameter in parameters: if parameter not in request_body: error_message = f"{error_message}\n{parameter} is required for create" if error_message != "": - current_app.logger.info(f"/hosts/operation returned 400: {error_message}") + current_app.logger.info(f"/hosts/opasdascasderation returned 400: {error_message}") return abort(400, f"bad request; {error_message}") # only one host should create the vm, so we first race to assign this create operation to ourselves. # only one host will win this race authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" - url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{request_body['operation_id']}/{current_app.config['SPOKE_HOST_ID']}" + url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}" result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header) assignment_status = "" @@ -132,7 +146,7 @@ def handle_create(request_body): return jsonify(dict(assignment_status=assignment_status)) -def handle_destroy(request_body): +def handle_destroy(operation_id, request_body): if 'id' not in request_body: current_app.logger.info(f"/hosts/operation returned 400: id is required for destroy") return abort(400, f"bad request; id is required for destroy")