From 79ef90c380a831c44906eb565059ea2368528590 Mon Sep 17 00:00:00 2001 From: forest Date: Fri, 9 Jul 2021 17:08:38 -0500 Subject: [PATCH] hub allocate capsul IP addr when the create operation is being claimed create.sh will now be passed two extra arguments from the web app: network_name and public_ipv4_address network_name will be virbr1 or virbr2 or whatever the network is called and public_ipv4_address will be an ipv4 from that network which is not currently being used --- capsulflask/admin.py | 6 +-- capsulflask/db_model.py | 42 ++++++++++++--- capsulflask/hub_api.py | 108 ++++++++++++++++++++++++++++++++++--- capsulflask/spoke_api.py | 23 +++++++- capsulflask/spoke_model.py | 18 +++++-- 5 files changed, 175 insertions(+), 22 deletions(-) diff --git a/capsulflask/admin.py b/capsulflask/admin.py index ccdf5cd..22f94d6 100644 --- a/capsulflask/admin.py +++ b/capsulflask/admin.py @@ -17,9 +17,9 @@ bp = Blueprint("admin", __name__, url_prefix="/admin") @bp.route("/") @admin_account_required def index(): - hosts = get_model().list_hosts_with_networks() - vms_by_host_and_network = get_model().all_non_deleted_vms_by_host_and_network() - network_display_width_px = float(500); + hosts = get_model().list_hosts_with_networks(None) + vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(None) + network_display_width_px = float(500) #operations = get_model().list_all_operations() display_hosts = [] diff --git a/capsulflask/db_model.py b/capsulflask/db_model.py index fcd99f4..d7df878 100644 --- a/capsulflask/db_model.py +++ b/capsulflask/db_model.py @@ -56,8 +56,12 @@ class DBModel: # ------ VM & ACCOUNT MANAGEMENT --------- - def all_non_deleted_vms_by_host_and_network(self): - self.cursor.execute("SELECT id, host, network_name, public_ipv4, public_ipv6 FROM vms WHERE deleted IS NULL") + def non_deleted_vms_by_host_and_network(self, host_id): + query = "SELECT id, host, network_name, public_ipv4, public_ipv6 FROM vms WHERE deleted IS NULL" + if host_id is None: + self.cursor.execute(query) + else: + self.cursor.execute(f"{query} AND host = %s", (host_id)) hosts = dict() for row in self.cursor.fetchall(): @@ -318,11 +322,15 @@ class DBModel: # ------ HOSTS --------- - def list_hosts_with_networks(self): - self.cursor.execute(""" + def list_hosts_with_networks(self, host_id: str): + query = """ SELECT hosts.id, hosts.last_health_check, host_network.network_name, host_network.public_ipv4_cidr_block FROM hosts JOIN host_network ON host_network.host = hosts.id - """) + """ + if host_id is None: + self.cursor.execute(query) + else: + self.cursor.execute(f"{query} WHERE hosts.id = %s", (host_id)) hosts = dict() for row in self.cursor.fetchall(): @@ -379,6 +387,13 @@ class DBModel: self.connection.commit() return operation_id + def update_operation(self, operation_id: int, payload: str): + self.cursor.execute( + "UPDATE operations SET payload = %s WHERE id = %s", + (payload, operation_id) + ) + self.connection.commit() + def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str, result: str): if assignment_status and not result: self.cursor.execute( @@ -405,9 +420,20 @@ class DBModel: else: return None - def host_operation_exists(self, operation_id: int, host_id: str) -> bool: - self.cursor.execute("SELECT operation FROM host_operation WHERE host = %s AND operation = %s",(host_id, operation_id)) - return len(self.cursor.fetchall()) != 0 + def get_payload_json_from_host_operation(self, operation_id: int, host_id: str) -> str: + self.cursor.execute( + """ + SELECT operations.payload FROM operations + JOIN host_operation ON host_operation.operation = operations.id + WHERE host_operation.host = %s AND host_operation.operation = %s + """, + (host_id, operation_id) + ) + row = self.cursor.fetchone() + if row: + return row[0] + else: + return None def claim_operation(self, operation_id: int, host_id: str) -> bool: # have to make a new cursor to set isolation level diff --git a/capsulflask/hub_api.py b/capsulflask/hub_api.py index 04ea4cc..11b91f9 100644 --- a/capsulflask/hub_api.py +++ b/capsulflask/hub_api.py @@ -1,7 +1,9 @@ +import json +import ipaddress from flask import Blueprint from flask import current_app -from flask import request +from flask import request, make_response from werkzeug.exceptions import abort from capsulflask.db import get_model @@ -46,15 +48,109 @@ def heartbeat(host_id): @bp.route("/claim-operation//", methods=("POST",)) def claim_operation(operation_id: int, host_id: str): if authorized_for_host(host_id): - exists = get_model().host_operation_exists(operation_id, host_id) - if not exists: - return abort(404, "host operation not found") + payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id) + if payload_json is None: + error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found" + current_app.logger.error(error_message) + return abort(404, error_message) + + can_claim_handlers = { + "create": can_claim_create, + } + error_message = "" + payload = None + payload_is_dict = False + payload_has_type = False + payload_has_valid_type = False + try: + payload = json.loads(payload_json) + payload_is_dict = isinstance(payload, dict) + payload_has_type = payload_is_dict and 'type' in payload + payload_has_valid_type = payload_has_type and payload['type'] in can_claim_handlers + + if not payload_is_dict: + error_message = "invalid json: expected an object" + elif not payload_has_type: + error_message = "invalid json: 'type' field is required" + elif not payload_has_valid_type: + error_message = f"invalid json: expected type \"{payload['type']}\" to be one of [{', '.join(can_claim_handlers.keys())}]" + except: + error_message = "could not parse payload as json" + + if error_message is not "": + error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" + current_app.logger.error(error_message) + return abort(400, error_message) + + # we will only return this payload as json if claiming succeeds, so might as well do this now... + payload['assignment_status'] = 'assigned' + + # invoke the appropriate can_claim_handler for this operation type + result_tuple = can_claim_handlers[payload['type']](payload, host_id) + payload_json = result_tuple[0] + error_message = result_tuple[1] + + if error_message is not "": + error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" + current_app.logger.error(error_message) + return abort(400, error_message) + claimed = get_model().claim_operation(operation_id, host_id) if claimed: - return "ok" + get_model().update_operation(operation_id, payload_json) + + response = make_response(payload_json) + response.header.set("Content-Type", "application/json") + + return response else: - return abort(409, "operation was already assigned to another host") + return abort(409, f"operation was already assigned to another host") else: current_app.logger.warning(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token") return abort(401, "invalid host id or token") +def can_claim_create(payload, host_id) -> (str, str): + + hosts = get_model().list_hosts_with_networks(host_id) + + if host_id not in hosts: + return "", f"the host \"{host_id}\" does not appear to have any networks." + + networks = hosts[host_id].networks + + vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(host_id) + + vms_by_network = dict() + if host_id in vms_by_host_and_network: + vms_by_network = vms_by_host_and_network[host_id] + + allocated_ipv4_address = None + allocated_network_name = None + for network in networks: + vms = [] + if network["network_name"] in vms_by_network: + vms = vms_by_network[network["network_name"]] + + claimed_ipv4s = dict() + for vm in vms: + claimed_ipv4s[vm['public_ipv4']] = True + + ipv4_network = ipaddress.ip_network(network["public_ipv4_cidr_block"], False) + i = 0 + for ipv4_address in ipv4_network: + i += 1 + if i > 2 and str(ipv4_address) not in claimed_ipv4s: + allocated_ipv4_address = str(ipv4_address) + break + + if allocated_ipv4_address is not None: + allocated_network_name = network["network_name"] + break + + if allocated_network_name is None or allocated_ipv4_address is None: + return "", f"host \"{host_id}\" does not have any avaliable IP addresses on any of its networks." + + payload["network_name"] = allocated_network_name + payload["public_ipv4_address"] = allocated_ipv4_address + + return json.dumps(payload), "" \ No newline at end of file diff --git a/capsulflask/spoke_api.py b/capsulflask/spoke_api.py index 01fbe54..16be975 100644 --- a/capsulflask/spoke_api.py +++ b/capsulflask/spoke_api.py @@ -115,14 +115,29 @@ def handle_create(operation_id, request_body): return abort(400, f"bad request; {error_message}") # only one host should create the vm, so we first race to assign this create operation to ourselves. - # only one host will win this race + # only one host will win this race. authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}" result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) assignment_status = "" if result.status_code == 200: + try: + assignment_info = json.loads(result.body) + if not isinstance(assignment_info, dict): + return abort(503, f"hub at '{url}' returned 200, but did not return assignment_info json object") + if 'network_name' not in assignment_info: + return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include network_name") + if 'public_ipv4_address' not in assignment_info: + return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include public_ipv4_address") + + request_body['network_name'] = assignment_info['network_name'] + request_body['public_ipv4_address'] = assignment_info['public_ipv4_address'] + except: + return abort(503, f"hub at '{url}' returned 200, but did not return valid json") + assignment_status = "assigned" + elif result.status_code == 409: assignment_status = "assigned_to_other_host" else: @@ -138,6 +153,8 @@ def handle_create(operation_id, request_body): vcpus=request_body['vcpus'], memory_mb=request_body['memory_mb'], ssh_authorized_keys=request_body['ssh_authorized_keys'], + network_name=request_body['network_name'], + public_ipv4_address=request_body['public_ipv4_address'], ) except: error_message = my_exec_info_message(sys.exc_info()) @@ -147,7 +164,11 @@ def handle_create(operation_id, request_body): params= f"{params} vcpus='{request_body['vcpus'] if 'vcpus' in request_body else 'KeyError'}', " params= f"{params} memory_mb='{request_body['memory_mb'] if 'memory_mb' in request_body else 'KeyError'}', " params= f"{params} ssh_authorized_keys='{request_body['ssh_authorized_keys'] if 'ssh_authorized_keys' in request_body else 'KeyError'}', " + params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', " + params= f"{params} public_ipv4_address='{request_body['public_ipv4_address'] if 'public_ipv4_address' in request_body else 'KeyError'}', " + current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") + return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) return jsonify(dict(assignment_status=assignment_status)) diff --git a/capsulflask/spoke_model.py b/capsulflask/spoke_model.py index a8ad013..a11270d 100644 --- a/capsulflask/spoke_model.py +++ b/capsulflask/spoke_model.py @@ -129,7 +129,7 @@ class ShellScriptSpoke(VirtualizationInterface): self.validate_completed_process(completedProcess) return list(map(lambda x: x.decode("utf-8"), completedProcess.stdout.splitlines() )) - def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): + def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list, network_name: str, public_ipv4_address: str): validate_capsul_id(id) if not re.match(r"^[a-zA-Z0-9/_.-]+$", template_image_file_name): @@ -139,12 +139,18 @@ class ShellScriptSpoke(VirtualizationInterface): if not re.match(r"^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$", ssh_authorized_key): raise ValueError(f"ssh_authorized_key \"{ssh_authorized_key}\" must match \"^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$\"") - if vcpus < 1 or vcpus > 8: + if isinstance(vcpus, int) and (vcpus < 1 or vcpus > 8): raise ValueError(f"vcpus \"{vcpus}\" must match 1 <= vcpus <= 8") - if memory_mb < 512 or memory_mb > 16384: + if isinstance(memory_mb, int) and (memory_mb < 512 or memory_mb > 16384): raise ValueError(f"memory_mb \"{memory_mb}\" must match 512 <= memory_mb <= 16384") + if not re.match(r"^[a-zA-Z0-9_-]+$", network_name): + raise ValueError(f"network_name \"{network_name}\" must match \"^[a-zA-Z0-9_-]+\"") + + if not re.match(r"^[0-9.]+$", public_ipv4_address): + raise ValueError(f"public_ipv4_address \"{public_ipv4_address}\" must match \"^[0-9.]+$\"") + ssh_keys_string = "\n".join(ssh_authorized_keys) completedProcess = run([ @@ -153,7 +159,9 @@ class ShellScriptSpoke(VirtualizationInterface): template_image_file_name, str(vcpus), str(memory_mb), - ssh_keys_string + ssh_keys_string, + network_name, + public_ipv4_address ], capture_output=True) self.validate_completed_process(completedProcess, email) @@ -166,6 +174,8 @@ class ShellScriptSpoke(VirtualizationInterface): vcpus={str(vcpus)} memory={str(memory_mb)} ssh_authorized_keys={ssh_keys_string} + network_name={network_name} + public_ipv4_address={public_ipv4_address} """ if not status == "success":