import sys import json import aiohttp from flask import Blueprint from flask import current_app from flask import request from flask.json import jsonify from werkzeug.exceptions import abort from capsulflask.shared import my_exec_info_message, authorized_as_hub bp = Blueprint("spoke", __name__, url_prefix="/spoke") @bp.route("/heartbeat", methods=("POST",)) def heartbeat(): if authorized_as_hub(request.headers): url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}" authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) if result.status_code == -1: current_app.logger.info(f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached") return abort(503, "Service Unavailable: hub timed out or cannot be reached") if result.status_code == 401: current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} rejected our token") return abort(502, "hub rejected our token") if result.status_code != 200: current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}") return abort(502, "Bad Gateway: hub did not return 200") return "OK" else: current_app.logger.info(f"/spoke/heartbeat returned 401: invalid hub token") return abort(401, "invalid hub token") @bp.route("/operation/", methods=("POST",)) def operation_with_id(operation_id: int): return operation_impl(operation_id) @bp.route("/operation", methods=("POST",)) def operation_without_id(): return operation_impl(None) def operation_impl(operation_id: int): if authorized_as_hub(request.headers): request_body_json = request.json request_body = json.loads(request_body_json) #current_app.logger.info(f"request.json: {request_body}") handlers = { "capacity_avaliable": handle_capacity_avaliable, "get": handle_get, "list_ids": handle_list_ids, "create": handle_create, "destroy": handle_destroy, "vm_state_command": handle_vm_state_command, } error_message = "" types_csv = ", ".join(handlers.keys()) if isinstance(request_body, dict) and 'type' in request_body: if request_body['type'] in handlers: try: return handlers[request_body['type']](operation_id, request_body) except: error_message = my_exec_info_message(sys.exc_info()) current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") return jsonify(dict(error_message=error_message)) else: error_message = f"'type' must be one of {types_csv}" else: error_message = "'type' json property is required" if error_message != "": current_app.logger.error(f"/hosts/operation returned 400: {error_message}") return abort(400, f"bad request; {error_message}") else: current_app.logger.warning(f"/hosts/operation returned 401: invalid hub token") return abort(401, "invalid hub token") def handle_capacity_avaliable(operation_id, request_body): if 'additional_ram_bytes' not in request_body: current_app.logger.error(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable") return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable") has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes']) return jsonify(dict(assignment_status="assigned", capacity_avaliable=has_capacity)) def handle_get(operation_id, request_body): if 'id' not in request_body: current_app.logger.error(f"/hosts/operation returned 400: id is required for get") return abort(400, f"bad request; id is required for get") vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], request_body['get_ssh_host_keys']) if vm is None: return jsonify(dict(assignment_status="assigned")) return jsonify(dict(assignment_status="assigned", id=vm.id, host=vm.host, ipv4=vm.ipv4, ipv6=vm.ipv6, ssh_host_keys=vm.ssh_host_keys)) def handle_list_ids(operation_id, request_body): return jsonify(dict(assignment_status="assigned", ids=current_app.config['SPOKE_MODEL'].list_ids())) def handle_create(operation_id, request_body): if not operation_id: current_app.logger.error(f"/hosts/operation returned 400: operation_id is required for create ") return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/") parameters = ["email", "id", "template_image_file_name", "vcpus", "memory_mb", "ssh_authorized_keys"] error_message = "" for parameter in parameters: if parameter not in request_body: error_message = f"{error_message}\n{parameter} is required for create" if error_message != "": current_app.logger.error(f"/hosts/operation returned 400: {error_message}") return abort(400, f"bad request; {error_message}") # only one host should create the vm, so we first race to assign this create operation to ourselves. # only one host will win this race authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}" result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) assignment_status = "" if result.status_code == 200: assignment_status = "assigned" elif result.status_code == 409: assignment_status = "assigned_to_other_host" else: current_app.logger.error(f"{url} returned {result.status_code}: {result.body}") return abort(503, f"hub did not cleanly handle our request to claim the create operation") if assignment_status == "assigned": try: current_app.config['SPOKE_MODEL'].create( email=request_body['email'], id=request_body['id'], template_image_file_name=request_body['template_image_file_name'], vcpus=request_body['vcpus'], memory_mb=request_body['memory_mb'], ssh_authorized_keys=request_body['ssh_authorized_keys'], ) except: error_message = my_exec_info_message(sys.exc_info()) params = f"email='{request_body['email']}', id='{request_body['id']}', " params = f"{params}, template_image_file_name='{request_body['template_image_file_name']}', vcpus='{request_body['vcpus']}'" params = f"{params}, memory_mb='{request_body['memory_mb']}', ssh_authorized_keys='{request_body['ssh_authorized_keys']}'" current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) return jsonify(dict(assignment_status=assignment_status)) def handle_destroy(operation_id, request_body): if 'id' not in request_body: current_app.logger.error(f"/hosts/operation returned 400: id is required for destroy") return abort(400, f"bad request; id is required for destroy") if 'email' not in request_body: current_app.logger.error(f"/hosts/operation returned 400: email is required for destroy") return abort(400, f"bad request; email is required for destroy") try: current_app.config['SPOKE_MODEL'].destroy(id=request_body['id'], email=request_body['email']) except: error_message = my_exec_info_message(sys.exc_info()) current_app.logger.error(f"current_app.config['SPOKE_MODEL'].destroy(id='{request_body['id']}', email='{request_body['email']}') failed: {error_message}") return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) return jsonify(dict(assignment_status="assigned", status="success")) def handle_vm_state_command(operation_id, request_body): required_properties = ['id', 'email', 'command'] for required_property in required_properties: if required_property not in request_body: current_app.logger.error(f"/hosts/operation returned 400: {required_property} is required for vm_state_command") return abort(400, f"bad request; {required_property} is required for vm_state_command") if request_body['command'] not in ["stop", "force-stop", "start", "restart"]: current_app.logger.error(f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart") return abort(400, f"bad request; command ({request_body['command']}) must be one of stop, force-stop, start, or restart") try: current_app.config['SPOKE_MODEL'].vm_state_command(id=request_body['id'], email=request_body['email'], command=request_body['command']) except: error_message = my_exec_info_message(sys.exc_info()) current_app.logger.error(f"current_app.config['SPOKE_MODEL'].vm_state_command(id='{request_body['id']}', email='{request_body['email']}, command='{request_body['command']}') failed: {error_message}") return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) return jsonify(dict(assignment_status="assigned", status="success"))