From d9c30e1ef844a02981676ede4c48217248c7e29d Mon Sep 17 00:00:00 2001 From: forest Date: Sun, 3 Jan 2021 19:17:30 -0600 Subject: [PATCH] trying to set up assignment of create operation --- capsulflask/db_model.py | 27 ++++++++++++++ capsulflask/http_client.py | 10 ++++-- capsulflask/hub_api.py | 25 ++++++++++--- capsulflask/hub_model.py | 3 +- .../09_up_introduce_hosts.sql | 3 +- capsulflask/spoke_api.py | 35 +++++++++++++++---- 6 files changed, 86 insertions(+), 17 deletions(-) diff --git a/capsulflask/db_model.py b/capsulflask/db_model.py index 3a7baa6..a641dae 100644 --- a/capsulflask/db_model.py +++ b/capsulflask/db_model.py @@ -1,4 +1,6 @@ +# I was never able to get this type hinting to work correctly +# from psycopg2.extensions import connection as Psycopg2Connection, cursor as Psycopg2Cursor from nanoid import generate from flask import current_app from typing import List @@ -10,6 +12,7 @@ class OnlineHost: self.url = url class DBModel: + #def __init__(self, connection: Psycopg2Connection, cursor: Psycopg2Cursor): def __init__(self, connection, cursor): self.connection = connection self.cursor = cursor @@ -311,6 +314,30 @@ class DBModel: else: return None + def host_operation_exists(self, operation_id: int, host_id: str) -> bool: + self.cursor.execute("SELECT operation FROM host_operation WHERE host = %s AND operation = %s",(host_id, operation_id)) + return len(self.cursor.fetchall()) != 0 + + def claim_operation(self, operation_id: int, host_id: str) -> bool: + self.cursor.execute(""" + SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; + BEGIN TRANSACTION; + UPDATE host_operation SET assignment_status = 'assigned' + WHERE host = %s AND operation = %s AND operation != ( + SELECT COALESCE( + (SELECT operation FROM host_operation WHERE operation = %s AND assignment_status = 'assigned'), + -1 + ) as already_assigned_operation_id + ); + COMMIT TRANSACTION; + """, (host_id, operation_id, operation_id)) + + to_return = self.cursor.rowcount != 0 + + self.connection.commit() + + return to_return + diff --git a/capsulflask/http_client.py b/capsulflask/http_client.py index d9b7c26..ea03c89 100644 --- a/capsulflask/http_client.py +++ b/capsulflask/http_client.py @@ -22,16 +22,20 @@ class MyHTTPClient: def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body)) - def post_json_sync(self, method: str, url: str, body: str) -> HTTPResult: + def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body)) - async def post_json(self, method: str, url: str, body: str) -> HTTPResult: + async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: response = None try: + headers = {} + if authorization_header != None: + headers['Authorization'] = authorization_header response = await self.client_session.request( method=method, url=url, json=body, + headers=headers, auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), verify_ssl=True, ) @@ -60,7 +64,7 @@ class MyHTTPClient: # append to tasks in the same order as online_hosts for host in online_hosts: tasks.append( - self.post_json(method="POST", url=host.url, body=body) + self.post_json(url=host.url, body=body) ) # gather is like Promise.all from javascript, it returns a future which resolves to an array of results # in the same order as the tasks that we passed in -- which were in the same order as online_hosts diff --git a/capsulflask/hub_api.py b/capsulflask/hub_api.py index f071353..b35bf78 100644 --- a/capsulflask/hub_api.py +++ b/capsulflask/hub_api.py @@ -12,11 +12,26 @@ def authorized_for_host(id): auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") return get_model().authorized_for_host(id, auth_header_value) -@bp.route("/heartbeat/", methods=("POST")) -def heartbeat(id): - if authorized_for_host(id): - get_model().host_heartbeat(id) +@bp.route("/heartbeat/", methods=("POST")) +def heartbeat(host_id): + if authorized_for_host(host_id): + get_model().host_heartbeat(host_id) else: - current_app.logger.info(f"/hub/heartbeat/{id} returned 401: invalid token") + current_app.logger.info(f"/hub/heartbeat/{host_id} returned 401: invalid token") + return abort(401, "invalid host id or token") + +@bp.route("/claim-operation//", methods=("POST")) +def claim_operation(operation_id: int, host_id: str): + if authorized_for_host(host_id): + exists = get_model().host_operation_exists(operation_id, host_id) + if not exists: + return abort(404, "host operation not found") + claimed = get_model().claim_operation(operation_id, host_id) + if claimed: + return "ok" + else: + return abort(409, "operation was already assigned to another host") + else: + current_app.logger.info(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token") return abort(401, "invalid host id or token") diff --git a/capsulflask/hub_model.py b/capsulflask/hub_model.py index 0669624..230e345 100644 --- a/capsulflask/hub_model.py +++ b/capsulflask/hub_model.py @@ -64,7 +64,8 @@ class CapsulFlaskHub(VirtualizationInterface): def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: operation_id = get_model().create_operation(hosts, payload) - results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload) + authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" + results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload, authorization_header=authorization_header) for i in range(len(hosts)): host = hosts[i] result = results[i] diff --git a/capsulflask/schema_migrations/09_up_introduce_hosts.sql b/capsulflask/schema_migrations/09_up_introduce_hosts.sql index b5902a4..feac791 100644 --- a/capsulflask/schema_migrations/09_up_introduce_hosts.sql +++ b/capsulflask/schema_migrations/09_up_introduce_hosts.sql @@ -29,5 +29,4 @@ CREATE TABLE host_operation ( PRIMARY KEY (host, operation) ); - -UPDATE schemaversion SET version = 9; \ No newline at end of file +UPDATE schemaversion SET version = 9; diff --git a/capsulflask/spoke_api.py b/capsulflask/spoke_api.py index 467cbc3..12a7dcc 100644 --- a/capsulflask/spoke_api.py +++ b/capsulflask/spoke_api.py @@ -18,9 +18,20 @@ def authorized_as_hub(id): @bp.route("/heartbeat", methods=("POST")) def heartbeat(): if authorized_as_hub(id): - # make request to hub-domain.com/hub/heartbeat/{current_app.config["SPOKE_HOST_ID"]} - # succeed or fail based on whether the request succeeds or fails. - pass + url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}" + authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" + result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header) + if result.status_code == -1: + current_app.logger.info(f"/hosts/heartbeat returned 503: hub at {url} timed out or cannot be reached") + return abort(503, "Service Unavailable: hub timed out or cannot be reached") + if result.status_code == 401: + current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} rejected our token") + return abort(502, "hub rejected our token") + if result.status_code != 200: + current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} returned {result.status_code}") + return abort(502, "Bad Gateway: hub did not return 200") + + return "OK" else: current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token") return abort(401, "invalid hub token") @@ -90,9 +101,21 @@ def handle_create(request_body): current_app.logger.info(f"/hosts/operation returned 400: {error_message}") return abort(400, f"bad request; {error_message}") - # try to aquire operation_id - assignment_status = "assigned" + # only one host should create the vm, so we first race to assign this create operation to ourselves. + # only one host will win this race + authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" + url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{request_body['operation_id']}/{current_app.config['SPOKE_HOST_ID']}" + result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header) + assignment_status = "" + if result.status_code == 200: + assignment_status = "assigned" + elif result.status_code == 409: + assignment_status = "assigned_to_other_host" + else: + current_app.logger.info(f"{url} returned {result.status_code}: {result.body}") + return abort(503, f"hub did not cleanly handle our request to claim the create operation") + if assignment_status == "assigned": try: current_app.config['SPOKE_MODEL'].create( @@ -108,7 +131,7 @@ def handle_create(request_body): params = f"email='{request_body['email']}', id='{request_body['id']}', " params = f"{params}, template_image_file_name='{request_body['template_image_file_name']}', vcpus='{request_body['vcpus']}'" params = f"{params}, memory_mb='{request_body['memory_mb']}', ssh_public_keys='{request_body['ssh_public_keys']}'" - current_app.logger.error(f"current_app.config['SPOKE_MODEL'].create({params}) failed: {error_message}") + current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) return jsonify(dict(assignment_status=assignment_status))