forked from 3wordchant/capsul-flask
		
	hub allocate capsul IP addr when the create operation is being claimed
create.sh will now be passed two extra arguments from the web app: network_name and public_ipv4_address network_name will be virbr1 or virbr2 or whatever the network is called and public_ipv4_address will be an ipv4 from that network which is not currently being used
This commit is contained in:
		| @ -17,9 +17,9 @@ bp = Blueprint("admin", __name__, url_prefix="/admin") | ||||
| @bp.route("/") | ||||
| @admin_account_required | ||||
| def index(): | ||||
|   hosts = get_model().list_hosts_with_networks() | ||||
|   vms_by_host_and_network = get_model().all_non_deleted_vms_by_host_and_network() | ||||
|   network_display_width_px = float(500); | ||||
|   hosts = get_model().list_hosts_with_networks(None) | ||||
|   vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(None) | ||||
|   network_display_width_px = float(500) | ||||
|   #operations = get_model().list_all_operations() | ||||
|  | ||||
|   display_hosts = [] | ||||
|  | ||||
| @ -56,8 +56,12 @@ class DBModel: | ||||
|  | ||||
|   #     ------    VM & ACCOUNT MANAGEMENT     ---------   | ||||
|  | ||||
|   def all_non_deleted_vms_by_host_and_network(self): | ||||
|     self.cursor.execute("SELECT id, host, network_name, public_ipv4, public_ipv6 FROM vms WHERE deleted IS NULL") | ||||
|   def non_deleted_vms_by_host_and_network(self, host_id): | ||||
|     query = "SELECT id, host, network_name, public_ipv4, public_ipv6 FROM vms WHERE deleted IS NULL" | ||||
|     if host_id is None:  | ||||
|       self.cursor.execute(query) | ||||
|     else: | ||||
|       self.cursor.execute(f"{query} AND host = %s", (host_id)) | ||||
|  | ||||
|     hosts = dict() | ||||
|     for row in self.cursor.fetchall(): | ||||
| @ -318,11 +322,15 @@ class DBModel: | ||||
|  | ||||
|   #     ------    HOSTS     ---------   | ||||
|  | ||||
|   def list_hosts_with_networks(self): | ||||
|     self.cursor.execute(""" | ||||
|   def list_hosts_with_networks(self, host_id: str): | ||||
|     query = """ | ||||
|     SELECT hosts.id, hosts.last_health_check, host_network.network_name, host_network.public_ipv4_cidr_block FROM hosts | ||||
|     JOIN host_network ON host_network.host = hosts.id | ||||
|     """) | ||||
|     """ | ||||
|     if host_id is None:  | ||||
|       self.cursor.execute(query) | ||||
|     else: | ||||
|       self.cursor.execute(f"{query} WHERE hosts.id = %s", (host_id)) | ||||
|  | ||||
|     hosts = dict() | ||||
|     for row in self.cursor.fetchall(): | ||||
| @ -379,6 +387,13 @@ class DBModel: | ||||
|     self.connection.commit() | ||||
|     return operation_id | ||||
|  | ||||
|   def update_operation(self, operation_id: int, payload: str): | ||||
|     self.cursor.execute( | ||||
|       "UPDATE operations SET payload = %s WHERE id = %s",  | ||||
|       (payload, operation_id) | ||||
|     ) | ||||
|     self.connection.commit() | ||||
|  | ||||
|   def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str, result: str): | ||||
|     if assignment_status and not result: | ||||
|       self.cursor.execute( | ||||
| @ -405,9 +420,20 @@ class DBModel: | ||||
|     else: | ||||
|       return None | ||||
|  | ||||
|   def host_operation_exists(self, operation_id: int, host_id: str) -> bool: | ||||
|     self.cursor.execute("SELECT operation FROM host_operation WHERE host = %s AND operation = %s",(host_id, operation_id)) | ||||
|     return len(self.cursor.fetchall()) != 0 | ||||
|   def get_payload_json_from_host_operation(self, operation_id: int, host_id: str) -> str: | ||||
|     self.cursor.execute( | ||||
|       """ | ||||
|         SELECT operations.payload FROM operations  | ||||
|           JOIN host_operation ON host_operation.operation = operations.id | ||||
|           WHERE host_operation.host = %s AND host_operation.operation = %s | ||||
|       """, | ||||
|       (host_id, operation_id) | ||||
|     ) | ||||
|     row = self.cursor.fetchone() | ||||
|     if row: | ||||
|       return row[0] | ||||
|     else: | ||||
|       return None | ||||
|  | ||||
|   def claim_operation(self, operation_id: int, host_id: str) -> bool: | ||||
|     # have to make a new cursor to set isolation level | ||||
|  | ||||
| @ -1,7 +1,9 @@ | ||||
| import json | ||||
| import ipaddress | ||||
|  | ||||
| from flask import Blueprint | ||||
| from flask import current_app | ||||
| from flask import request | ||||
| from flask import request, make_response | ||||
| from werkzeug.exceptions import abort | ||||
|  | ||||
| from capsulflask.db import get_model | ||||
| @ -46,15 +48,109 @@ def heartbeat(host_id): | ||||
| @bp.route("/claim-operation/<int:operation_id>/<string:host_id>", methods=("POST",)) | ||||
| def claim_operation(operation_id: int, host_id: str): | ||||
|   if authorized_for_host(host_id): | ||||
|     exists = get_model().host_operation_exists(operation_id, host_id) | ||||
|     if not exists: | ||||
|       return abort(404, "host operation not found") | ||||
|     payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id) | ||||
|     if payload_json is None: | ||||
|       error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found" | ||||
|       current_app.logger.error(error_message) | ||||
|       return abort(404, error_message) | ||||
|  | ||||
|     can_claim_handlers = { | ||||
|       "create": can_claim_create, | ||||
|     } | ||||
|     error_message = "" | ||||
|     payload = None | ||||
|     payload_is_dict = False | ||||
|     payload_has_type = False | ||||
|     payload_has_valid_type = False | ||||
|     try: | ||||
|       payload = json.loads(payload_json) | ||||
|       payload_is_dict = isinstance(payload, dict) | ||||
|       payload_has_type = payload_is_dict and 'type' in payload | ||||
|       payload_has_valid_type = payload_has_type and payload['type'] in can_claim_handlers | ||||
|        | ||||
|       if not payload_is_dict: | ||||
|         error_message = "invalid json: expected an object" | ||||
|       elif not payload_has_type: | ||||
|         error_message = "invalid json: 'type' field is required" | ||||
|       elif not payload_has_valid_type: | ||||
|         error_message = f"invalid json: expected type \"{payload['type']}\" to be one of [{', '.join(can_claim_handlers.keys())}]" | ||||
|     except: | ||||
|       error_message = "could not parse payload as json" | ||||
|  | ||||
|     if error_message is not "": | ||||
|       error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" | ||||
|       current_app.logger.error(error_message) | ||||
|       return abort(400, error_message) | ||||
|  | ||||
|     # we will only return this payload as json if claiming succeeds, so might as well do this now... | ||||
|     payload['assignment_status'] = 'assigned' | ||||
|  | ||||
|     # invoke the appropriate can_claim_handler for this operation type | ||||
|     result_tuple = can_claim_handlers[payload['type']](payload, host_id) | ||||
|     payload_json = result_tuple[0] | ||||
|     error_message = result_tuple[1] | ||||
|  | ||||
|     if error_message is not "": | ||||
|       error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" | ||||
|       current_app.logger.error(error_message) | ||||
|       return abort(400, error_message) | ||||
|  | ||||
|     claimed = get_model().claim_operation(operation_id, host_id) | ||||
|     if claimed: | ||||
|       return "ok" | ||||
|       get_model().update_operation(operation_id, payload_json) | ||||
|        | ||||
|       response = make_response(payload_json) | ||||
|       response.header.set("Content-Type", "application/json") | ||||
|  | ||||
|       return response | ||||
|     else: | ||||
|       return abort(409, "operation was already assigned to another host") | ||||
|       return abort(409, f"operation was already assigned to another host") | ||||
|   else: | ||||
|     current_app.logger.warning(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token") | ||||
|     return abort(401, "invalid host id or token") | ||||
|  | ||||
| def can_claim_create(payload, host_id) -> (str, str): | ||||
|  | ||||
|   hosts = get_model().list_hosts_with_networks(host_id) | ||||
|  | ||||
|   if host_id not in hosts: | ||||
|     return "", f"the host \"{host_id}\" does not appear to have any networks." | ||||
|  | ||||
|   networks = hosts[host_id].networks | ||||
|  | ||||
|   vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(host_id) | ||||
|  | ||||
|   vms_by_network = dict() | ||||
|   if host_id in vms_by_host_and_network: | ||||
|     vms_by_network = vms_by_host_and_network[host_id] | ||||
|  | ||||
|   allocated_ipv4_address = None | ||||
|   allocated_network_name = None | ||||
|   for network in networks: | ||||
|     vms = [] | ||||
|     if network["network_name"] in vms_by_network: | ||||
|       vms = vms_by_network[network["network_name"]] | ||||
|  | ||||
|     claimed_ipv4s = dict() | ||||
|     for vm in vms: | ||||
|       claimed_ipv4s[vm['public_ipv4']] = True | ||||
|  | ||||
|     ipv4_network = ipaddress.ip_network(network["public_ipv4_cidr_block"], False) | ||||
|     i = 0 | ||||
|     for ipv4_address in ipv4_network:  | ||||
|       i += 1 | ||||
|       if i > 2 and str(ipv4_address) not in claimed_ipv4s: | ||||
|         allocated_ipv4_address = str(ipv4_address) | ||||
|         break  | ||||
|  | ||||
|     if allocated_ipv4_address is not None: | ||||
|       allocated_network_name = network["network_name"] | ||||
|       break | ||||
|          | ||||
|   if allocated_network_name is None or allocated_ipv4_address is None: | ||||
|     return "", f"host \"{host_id}\" does not have any avaliable IP addresses on any of its networks." | ||||
|  | ||||
|   payload["network_name"] = allocated_network_name | ||||
|   payload["public_ipv4_address"] = allocated_ipv4_address | ||||
|  | ||||
|   return json.dumps(payload), "" | ||||
| @ -115,14 +115,29 @@ def handle_create(operation_id, request_body): | ||||
|     return abort(400, f"bad request; {error_message}") | ||||
|  | ||||
|   # only one host should create the vm, so we first race to assign this create operation to ourselves. | ||||
|   # only one host will win this race | ||||
|   # only one host will win this race. | ||||
|   authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" | ||||
|   url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}" | ||||
|   result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) | ||||
|  | ||||
|   assignment_status = "" | ||||
|   if result.status_code == 200: | ||||
|     try: | ||||
|       assignment_info = json.loads(result.body) | ||||
|       if not isinstance(assignment_info, dict): | ||||
|         return abort(503, f"hub at '{url}' returned 200, but did not return assignment_info json object") | ||||
|       if 'network_name' not in assignment_info: | ||||
|         return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include network_name") | ||||
|       if 'public_ipv4_address' not in assignment_info: | ||||
|         return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include public_ipv4_address") | ||||
|  | ||||
|       request_body['network_name'] = assignment_info['network_name'] | ||||
|       request_body['public_ipv4_address'] = assignment_info['public_ipv4_address'] | ||||
|     except: | ||||
|       return abort(503, f"hub at '{url}' returned 200, but did not return valid json") | ||||
|      | ||||
|     assignment_status = "assigned" | ||||
|      | ||||
|   elif result.status_code == 409: | ||||
|     assignment_status = "assigned_to_other_host" | ||||
|   else: | ||||
| @ -138,6 +153,8 @@ def handle_create(operation_id, request_body): | ||||
|         vcpus=request_body['vcpus'], | ||||
|         memory_mb=request_body['memory_mb'], | ||||
|         ssh_authorized_keys=request_body['ssh_authorized_keys'], | ||||
|         network_name=request_body['network_name'], | ||||
|         public_ipv4_address=request_body['public_ipv4_address'], | ||||
|       ) | ||||
|     except: | ||||
|       error_message = my_exec_info_message(sys.exc_info()) | ||||
| @ -147,7 +164,11 @@ def handle_create(operation_id, request_body): | ||||
|       params=  f"{params} vcpus='{request_body['vcpus'] if 'vcpus' in request_body else 'KeyError'}', " | ||||
|       params=  f"{params} memory_mb='{request_body['memory_mb'] if 'memory_mb' in request_body else 'KeyError'}', " | ||||
|       params=  f"{params} ssh_authorized_keys='{request_body['ssh_authorized_keys'] if 'ssh_authorized_keys' in request_body else 'KeyError'}', " | ||||
|       params=  f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', " | ||||
|       params=  f"{params} public_ipv4_address='{request_body['public_ipv4_address'] if 'public_ipv4_address' in request_body else 'KeyError'}', " | ||||
|        | ||||
|       current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") | ||||
|  | ||||
|       return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) | ||||
|  | ||||
|   return jsonify(dict(assignment_status=assignment_status)) | ||||
|  | ||||
| @ -129,7 +129,7 @@ class ShellScriptSpoke(VirtualizationInterface): | ||||
|     self.validate_completed_process(completedProcess) | ||||
|     return list(map(lambda x: x.decode("utf-8"), completedProcess.stdout.splitlines() )) | ||||
|  | ||||
|   def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): | ||||
|   def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list, network_name: str, public_ipv4_address: str): | ||||
|     validate_capsul_id(id) | ||||
|  | ||||
|     if not re.match(r"^[a-zA-Z0-9/_.-]+$", template_image_file_name): | ||||
| @ -139,12 +139,18 @@ class ShellScriptSpoke(VirtualizationInterface): | ||||
|       if not re.match(r"^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$", ssh_authorized_key): | ||||
|         raise ValueError(f"ssh_authorized_key \"{ssh_authorized_key}\" must match \"^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$\"") | ||||
|  | ||||
|     if vcpus < 1 or vcpus > 8: | ||||
|     if isinstance(vcpus, int) and (vcpus < 1 or vcpus > 8): | ||||
|       raise ValueError(f"vcpus \"{vcpus}\" must match 1 <= vcpus <= 8") | ||||
|  | ||||
|     if memory_mb < 512 or memory_mb > 16384: | ||||
|     if isinstance(memory_mb, int) and (memory_mb < 512 or memory_mb > 16384): | ||||
|       raise ValueError(f"memory_mb \"{memory_mb}\" must match 512 <= memory_mb <= 16384") | ||||
|  | ||||
|     if not re.match(r"^[a-zA-Z0-9_-]+$", network_name): | ||||
|       raise ValueError(f"network_name \"{network_name}\" must match \"^[a-zA-Z0-9_-]+\"") | ||||
|  | ||||
|     if not re.match(r"^[0-9.]+$", public_ipv4_address): | ||||
|       raise ValueError(f"public_ipv4_address \"{public_ipv4_address}\" must match \"^[0-9.]+$\"") | ||||
|  | ||||
|     ssh_keys_string = "\n".join(ssh_authorized_keys) | ||||
|  | ||||
|     completedProcess = run([ | ||||
| @ -153,7 +159,9 @@ class ShellScriptSpoke(VirtualizationInterface): | ||||
|       template_image_file_name, | ||||
|       str(vcpus), | ||||
|       str(memory_mb), | ||||
|       ssh_keys_string | ||||
|       ssh_keys_string, | ||||
|       network_name, | ||||
|       public_ipv4_address | ||||
|     ], capture_output=True) | ||||
|  | ||||
|     self.validate_completed_process(completedProcess, email) | ||||
| @ -166,6 +174,8 @@ class ShellScriptSpoke(VirtualizationInterface): | ||||
|       vcpus={str(vcpus)} | ||||
|       memory={str(memory_mb)} | ||||
|       ssh_authorized_keys={ssh_keys_string} | ||||
|       network_name={network_name} | ||||
|       public_ipv4_address={public_ipv4_address} | ||||
|     """ | ||||
|  | ||||
|     if not status == "success": | ||||
|  | ||||
		Reference in New Issue
	
	Block a user