import json import ipaddress from flask import Blueprint from flask import current_app from flask import request, make_response from werkzeug.exceptions import abort from capsulflask.db import get_model from capsulflask.shared import my_exec_info_message, authorized_as_hub bp = Blueprint("hub", __name__, url_prefix="/hub") def authorized_for_host(id): if request.headers.get('Authorization'): auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") return get_model().authorized_for_host(id, auth_header_value) return False @bp.route("/heartbeat-task", methods=("POST",)) def ping_all_hosts_task(): if authorized_as_hub(request.headers): all_hosts = get_model().get_all_hosts() current_app.logger.debug(f"pinging {len(all_hosts)} hosts...") authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header) for i in range(len(all_hosts)): host = all_hosts[i] result = results[i] current_app.logger.debug(f"response from {host.id} ({host.url}): {result.status_code} {result.body}") if result.status_code == 200: get_model().host_heartbeat(host.id) return "ok" else: current_app.logger.warning(f"/hub/heartbeat-task returned 401: invalid hub token") return abort(401, "invalid hub token") @bp.route("/heartbeat/", methods=("POST",)) def heartbeat(host_id): if authorized_for_host(host_id): return "ok" else: current_app.logger.warning(f"/hub/heartbeat/{host_id} returned 401: invalid token") return abort(401, "invalid host id or token") @bp.route("/claim-operation//", methods=("POST",)) def claim_operation(operation_id: int, host_id: str): if authorized_for_host(host_id): payload_json = get_model().get_payload_json_from_host_operation(operation_id, host_id) if payload_json is None: error_message = f"{host_id} can't claim operation {operation_id} because host_operation row not found" current_app.logger.error(error_message) return abort(404, error_message) # right now if there is a can_claim_handler there needs to be a corresponding on_claimed_handler. # this is sole-ly due to laziness in error handling. can_claim_handlers = { "create": can_claim_create, } on_claimed_handlers = { "create": on_create_claimed, } error_message = "" payload = None payload_is_dict = False payload_has_type = False payload_has_valid_type = False try: payload = json.loads(payload_json) payload_is_dict = isinstance(payload, dict) payload_has_type = payload_is_dict and 'type' in payload payload_has_valid_type = payload_has_type and payload['type'] in can_claim_handlers if not payload_is_dict: error_message = "invalid json: expected an object" elif not payload_has_type: error_message = "invalid json: 'type' field is required" elif not payload_has_valid_type: error_message = f"invalid json: expected type \"{payload['type']}\" to be one of [{', '.join(can_claim_handlers.keys())}]" except: error_message = "could not parse payload as json" if error_message != "": error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" current_app.logger.error(error_message) return abort(400, error_message) # we will only return this payload as json if claiming succeeds, so might as well do this now... payload['assignment_status'] = 'assigned' # invoke the appropriate can_claim_handler for this operation type result_tuple = can_claim_handlers[payload['type']](payload, host_id) modified_payload = result_tuple[0] error_message = result_tuple[1] if error_message != "": error_message = f"{host_id} can't claim operation {operation_id} because {error_message}" current_app.logger.error(error_message) return abort(400, error_message) claimed = get_model().claim_operation(operation_id, host_id) if claimed: modified_payload_json = json.dumps(modified_payload) get_model().update_operation(operation_id, modified_payload_json) on_claimed_handlers[payload['type']](modified_payload, host_id) response = make_response(modified_payload_json) response.headers.set("Content-Type", "application/json") return response else: return abort(409, f"operation was already assigned to another host") else: current_app.logger.warning(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token") return abort(401, "invalid host id or token") def can_claim_create(payload, host_id) -> (str, str): hosts = get_model().list_hosts_with_networks(host_id) if host_id not in hosts: return "", f"the host \"{host_id}\" does not appear to have any networks." networks = hosts[host_id]['networks'] vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(host_id) vms_by_network = dict() if host_id in vms_by_host_and_network: vms_by_network = vms_by_host_and_network[host_id] allocated_ipv4_address = None allocated_network_name = None for network in networks: vms = [] if network["network_name"] in vms_by_network: vms = vms_by_network[network["network_name"]] claimed_ipv4s = dict() for vm in vms: claimed_ipv4s[vm['public_ipv4']] = True ipv4_network = ipaddress.ip_network(network["public_ipv4_cidr_block"], False) ipv4_first_usable_ip = network["public_ipv4_first_usable_ip"] ipv4_last_usable_ip = network["public_ipv4_last_usable_ip"] for ipv4_address in ipv4_network: within_usable_range = ipv4_first_usable_ip <= str(ipv4_address) and str(ipv4_address) <= ipv4_last_usable_ip if within_usable_range and str(ipv4_address) not in claimed_ipv4s: allocated_ipv4_address = str(ipv4_address) break if allocated_ipv4_address is not None: allocated_network_name = network["network_name"] break if allocated_network_name is None or allocated_ipv4_address is None: return "", f"host \"{host_id}\" does not have any avaliable IP addresses on any of its networks." # payload["network_name"] = allocated_network_name # payload["public_ipv4"] = allocated_ipv4_address # hard-code the network name and IP for now until we can implement https://git.cyberia.club/cyberia/capsul-flask/issues/11 # enable static IP -> capsul mapping via libvirt (manage MAC addresses) payload["network_name"] = 'public3' payload["public_ipv4"] = "" return payload, "" def on_create_claimed(payload, host_id): # TODO this happens when the create operation is assigned to one spoke, not when the create actually succeeds... # VMs should probably start out in the "creating" state, and change to "booting" when the create.sh script succeeds. # VMs should not be billable until they reach the booting state. # see the corresponding TODO in spoke_api.handle_create get_model().create_vm( email=payload['email'], id=payload['id'], size=payload['size'], os=payload['os'], host=host_id, network_name=payload['network_name'], public_ipv4=payload['public_ipv4'], ssh_authorized_keys=list(map(lambda x: x["name"], payload['ssh_authorized_keys'])), )