create capsul is working
This commit is contained in:
parent
44e918a974
commit
4833c6250b
@ -102,7 +102,7 @@ if app.config['HUB_MODE_ENABLED']:
|
||||
heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task"
|
||||
heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"}
|
||||
heartbeat_task = lambda: requests.post(heartbeat_task_url, headers=heartbeat_task_headers)
|
||||
scheduler.add_job(func=heartbeat_task, trigger="interval", seconds=5)
|
||||
scheduler.add_job(name="heartbeat_task", func=heartbeat_task, trigger="interval", seconds=5)
|
||||
scheduler.start()
|
||||
|
||||
atexit.register(lambda: scheduler.shutdown())
|
||||
|
@ -27,16 +27,20 @@ def makeCapsulId():
|
||||
return f"capsul-{lettersAndNumbers}"
|
||||
|
||||
def double_check_capsul_address(id, ipv4):
|
||||
try:
|
||||
result = current_app.config["HUB_MODEL"].get(id)
|
||||
if result.ipv4 != ipv4:
|
||||
ipv4 = result.ipv4
|
||||
get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4)
|
||||
except:
|
||||
current_app.logger.error(f"""
|
||||
the virtualization model threw an error in double_check_capsul_address of {id}:
|
||||
{my_exec_info_message(sys.exc_info())}"""
|
||||
)
|
||||
result = current_app.config["HUB_MODEL"].get(id)
|
||||
if result.ipv4 != ipv4:
|
||||
ipv4 = result.ipv4
|
||||
get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4)
|
||||
# try:
|
||||
# result = current_app.config["HUB_MODEL"].get(id)
|
||||
# if result.ipv4 != ipv4:
|
||||
# ipv4 = result.ipv4
|
||||
# get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4)
|
||||
# except:
|
||||
# current_app.logger.error(f"""
|
||||
# the virtualization model threw an error in double_check_capsul_address of {id}:
|
||||
# {my_exec_info_message(sys.exc_info())}"""
|
||||
# )
|
||||
|
||||
return ipv4
|
||||
|
||||
|
@ -13,6 +13,7 @@ class DBModel:
|
||||
def __init__(self, connection, cursor):
|
||||
self.connection = connection
|
||||
self.cursor = cursor
|
||||
self.cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
|
||||
|
||||
|
||||
# ------ LOGIN ---------
|
||||
@ -300,18 +301,29 @@ class DBModel:
|
||||
self.connection.commit()
|
||||
return operation_id
|
||||
|
||||
def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str):
|
||||
self.cursor.execute(
|
||||
"UPDATE host_operation SET assignment_status = %s WHERE host = %s AND operation = %s",
|
||||
(assignment_status, host_id, operation_id)
|
||||
)
|
||||
def update_host_operation(self, host_id: str, operation_id: int, assignment_status: str, result: str):
|
||||
if assignment_status and not result:
|
||||
self.cursor.execute(
|
||||
"UPDATE host_operation SET assignment_status = %s, assigned = NOW() WHERE host = %s AND operation = %s",
|
||||
(assignment_status, host_id, operation_id)
|
||||
)
|
||||
elif not assignment_status and result:
|
||||
self.cursor.execute(
|
||||
"UPDATE host_operation SET results = %s, completed = NOW() WHERE host = %s AND operation = %s",
|
||||
(result, host_id, operation_id)
|
||||
)
|
||||
elif assignment_status and result:
|
||||
self.cursor.execute(
|
||||
"UPDATE host_operation SET assignment_status = %s, assigned = NOW(), results = %s, completed = NOW() WHERE host = %s AND operation = %s",
|
||||
(assignment_status, result, host_id, operation_id)
|
||||
)
|
||||
self.connection.commit()
|
||||
|
||||
def host_of_capsul(self, capsul_id: str):
|
||||
self.cursor.execute("SELECT host from vms where id = %s", (capsul_id,))
|
||||
def host_of_capsul(self, capsul_id: str) -> OnlineHost:
|
||||
self.cursor.execute("SELECT hosts.id, hosts.https_url from vms JOIN hosts on hosts.id = vms.host where vms.id = %s", (capsul_id,))
|
||||
row = self.cursor.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
return OnlineHost(row[0], row[1])
|
||||
else:
|
||||
return None
|
||||
|
||||
@ -320,8 +332,9 @@ class DBModel:
|
||||
return len(self.cursor.fetchall()) != 0
|
||||
|
||||
def claim_operation(self, operation_id: int, host_id: str) -> bool:
|
||||
# have to make a new cursor to set isolation level
|
||||
# cursor = self.connection.cursor()
|
||||
self.cursor.execute("""
|
||||
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
|
||||
BEGIN TRANSACTION;
|
||||
UPDATE host_operation SET assignment_status = 'assigned'
|
||||
WHERE host = %s AND operation = %s AND operation != (
|
||||
@ -336,6 +349,7 @@ class DBModel:
|
||||
to_return = self.cursor.rowcount != 0
|
||||
|
||||
self.connection.commit()
|
||||
#cursor.close()
|
||||
|
||||
return to_return
|
||||
|
||||
|
@ -37,11 +37,14 @@ class MyHTTPClient:
|
||||
return self.client_session
|
||||
|
||||
async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
||||
# TODO make a configuration option where this throws an error if the url does not start with https://
|
||||
response = None
|
||||
try:
|
||||
headers = {}
|
||||
if authorization_header != None:
|
||||
headers['Authorization'] = authorization_header
|
||||
if body:
|
||||
headers['Content-Type'] = "application/json"
|
||||
response = await self.get_client_session().request(
|
||||
method=method,
|
||||
url=url,
|
||||
@ -85,7 +88,7 @@ class MyHTTPClient:
|
||||
|
||||
# i lifted this direct from https://stackoverflow.com/a/58616001
|
||||
# this is the bridge between Flask's one-thread-per-request world
|
||||
# and aiohttp's event-loop based world
|
||||
# and aiohttp's event-loop based world -- it allows us to call run_coroutine from a flask request handler
|
||||
|
||||
class EventLoopThread(threading.Thread):
|
||||
loop = None
|
||||
|
@ -8,6 +8,7 @@ from typing import List, Tuple
|
||||
|
||||
import aiohttp
|
||||
from flask import current_app
|
||||
from flask import session
|
||||
from time import sleep
|
||||
from os.path import join
|
||||
from subprocess import run
|
||||
@ -38,14 +39,27 @@ class MockHub(VirtualizationInterface):
|
||||
|
||||
class CapsulFlaskHub(VirtualizationInterface):
|
||||
|
||||
def synchronous_operation(self, hosts: List[OnlineHost], payload: str) -> List[HTTPResult]:
|
||||
return self.generic_operation(hosts, payload, True)[1]
|
||||
|
||||
def asynchronous_operation(self, hosts: List[OnlineHost], payload: str) -> Tuple[int, List[HTTPResult]]:
|
||||
return self.generic_operation(hosts, payload, False)
|
||||
|
||||
def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]:
|
||||
operation_id = get_model().create_operation(hosts, payload)
|
||||
email = session["account"]
|
||||
if not email or email == "":
|
||||
raise ValueError("generic_operation was called but user was not logged in")
|
||||
url_path = "/spoke/operation"
|
||||
operation_id = None
|
||||
if not immediate_mode:
|
||||
operation_id = get_model().create_operation(hosts, email, payload)
|
||||
url_path = f"/spoke/operation/{operation_id}"
|
||||
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
|
||||
results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, "/spoke/operation", payload, authorization_header=authorization_header)
|
||||
results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, url_path, payload, authorization_header=authorization_header)
|
||||
|
||||
for i in range(len(hosts)):
|
||||
host = hosts[i]
|
||||
result = results[i]
|
||||
task_result = None
|
||||
assignment_status = "pending"
|
||||
if result.status_code == -1:
|
||||
assignment_status = "no_response_from_host"
|
||||
@ -64,8 +78,6 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
assignment_status = "invalid_response_from_host"
|
||||
error_message = ""
|
||||
try:
|
||||
if immediate_mode:
|
||||
task_result = result.body
|
||||
result_body = json.loads(result.body)
|
||||
result_is_json = True
|
||||
result_is_dict = isinstance(result_body, dict)
|
||||
@ -87,16 +99,16 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
error_message: {error_message}
|
||||
"""
|
||||
)
|
||||
|
||||
get_model().update_host_operation(host.id, operation_id, assignment_status, task_result)
|
||||
|
||||
if not immediate_mode:
|
||||
get_model().update_host_operation(host.id, operation_id, assignment_status, None)
|
||||
|
||||
return results
|
||||
return (operation_id, results)
|
||||
|
||||
def capacity_avaliable(self, additional_ram_bytes):
|
||||
online_hosts = get_model().get_online_hosts()
|
||||
payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes))
|
||||
op = self.generic_operation(online_hosts, payload, True)
|
||||
results = op[1]
|
||||
results = self.synchronous_operation(online_hosts, payload)
|
||||
for result in results:
|
||||
try:
|
||||
result_body = json.loads(result.body)
|
||||
@ -108,13 +120,12 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
return False
|
||||
|
||||
|
||||
async def get(self, id) -> VirtualMachine:
|
||||
def get(self, id) -> VirtualMachine:
|
||||
validate_capsul_id(id)
|
||||
host = get_model().host_of_capsul(id)
|
||||
if host is not None:
|
||||
payload = json.dumps(dict(type="get", id=id))
|
||||
op = self.generic_operation([host], payload, True)
|
||||
results = op[1]
|
||||
results = self.synchronous_operation([host], payload)
|
||||
for result in results:
|
||||
try:
|
||||
result_body = json.loads(result.body)
|
||||
@ -128,9 +139,7 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
def list_ids(self) -> list:
|
||||
online_hosts = get_model().get_online_hosts()
|
||||
payload = json.dumps(dict(type="list_ids"))
|
||||
op = self.generic_operation(online_hosts, payload, False)
|
||||
operation_id = op[0]
|
||||
results = op[1]
|
||||
results = self.synchronous_operation(online_hosts, payload)
|
||||
to_return = []
|
||||
for i in range(len(results)):
|
||||
host = online_hosts[i]
|
||||
@ -145,11 +154,8 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
to_return.append(id)
|
||||
except:
|
||||
all_valid = False
|
||||
if all_valid:
|
||||
get_model().update_host_operation(host.id, operation_id, None, result.body)
|
||||
else:
|
||||
if not all_valid:
|
||||
result_json_string = json.dumps({"error_message": "invalid capsul id returned"})
|
||||
get_model().update_host_operation(host.id, operation_id, None, result_json_string)
|
||||
current_app.logger.error(f"""error reading ids for list_ids operation {operation_id}, host {host.id}""")
|
||||
else:
|
||||
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
|
||||
@ -173,7 +179,7 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
memory_mb=memory_mb,
|
||||
ssh_public_keys=ssh_public_keys,
|
||||
))
|
||||
op = self.generic_operation(online_hosts, payload, False)
|
||||
op = self.asynchronous_operation(online_hosts, payload)
|
||||
operation_id = op[0]
|
||||
results = op[1]
|
||||
number_of_assigned = 0
|
||||
@ -206,8 +212,7 @@ class CapsulFlaskHub(VirtualizationInterface):
|
||||
host = get_model().host_of_capsul(id)
|
||||
if host is not None:
|
||||
payload = json.dumps(dict(type="destroy", id=id))
|
||||
op = self.generic_operation([host], payload, True)
|
||||
results = op[1]
|
||||
results = self.synchronous_operation([host], payload)
|
||||
result_json_string = "<no response from host>"
|
||||
for result in results:
|
||||
try:
|
||||
|
@ -1,5 +1,6 @@
|
||||
|
||||
import sys
|
||||
import json
|
||||
import aiohttp
|
||||
from flask import Blueprint
|
||||
from flask import current_app
|
||||
@ -32,10 +33,19 @@ def heartbeat():
|
||||
current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token")
|
||||
return abort(401, "invalid hub token")
|
||||
|
||||
@bp.route("/operation/<int:operation_id>", methods=("POST",))
|
||||
def operation_with_id(operation_id: int):
|
||||
return operation_impl(operation_id)
|
||||
|
||||
@bp.route("/operation", methods=("POST",))
|
||||
def operation():
|
||||
def operation_without_id():
|
||||
return operation_impl(None)
|
||||
|
||||
def operation_impl(operation_id: int):
|
||||
if authorized_as_hub(request.headers):
|
||||
request_body = request.json()
|
||||
request_body_json = request.json
|
||||
request_body = json.loads(request_body_json)
|
||||
current_app.logger.info(f"request.json: {request_body}")
|
||||
handlers = {
|
||||
"capacity_avaliable": handle_capacity_avaliable,
|
||||
"get": handle_get,
|
||||
@ -49,7 +59,7 @@ def operation():
|
||||
if isinstance(request_body, dict) and 'type' in request_body:
|
||||
if request_body['type'] in handlers:
|
||||
try:
|
||||
return handlers[request_body['type']](request_body)
|
||||
return handlers[request_body['type']](operation_id, request_body)
|
||||
except:
|
||||
error_message = my_exec_info_message(sys.exc_info())
|
||||
current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}")
|
||||
@ -66,7 +76,7 @@ def operation():
|
||||
current_app.logger.info(f"/hosts/operation returned 401: invalid hub token")
|
||||
return abort(401, "invalid hub token")
|
||||
|
||||
def handle_capacity_avaliable(request_body):
|
||||
def handle_capacity_avaliable(operation_id, request_body):
|
||||
if 'additional_ram_bytes' not in request_body:
|
||||
current_app.logger.info(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable")
|
||||
return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable")
|
||||
@ -74,7 +84,7 @@ def handle_capacity_avaliable(request_body):
|
||||
has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes'])
|
||||
return jsonify(dict(assignment_status="assigned", capacity_avaliable=has_capacity))
|
||||
|
||||
def handle_get(request_body):
|
||||
def handle_get(operation_id, request_body):
|
||||
if 'id' not in request_body:
|
||||
current_app.logger.info(f"/hosts/operation returned 400: id is required for get")
|
||||
return abort(400, f"bad request; id is required for get")
|
||||
@ -83,24 +93,28 @@ def handle_get(request_body):
|
||||
|
||||
return jsonify(dict(assignment_status="assigned", id=vm.id, host=vm.host, ipv4=vm.ipv4, ipv6=vm.ipv6))
|
||||
|
||||
def handle_list_ids(request_body):
|
||||
def handle_list_ids(operation_id, request_body):
|
||||
return jsonify(dict(assignment_status="assigned", ids=current_app.config['SPOKE_MODEL'].list_ids()))
|
||||
|
||||
def handle_create(request_body):
|
||||
parameters = ["operation_id", "email", "id", "template_image_file_name", "vcpus", "memory_mb", "ssh_public_keys"]
|
||||
def handle_create(operation_id, request_body):
|
||||
if not operation_id:
|
||||
current_app.logger.info(f"/hosts/operation returned 400: operation_id is required for create ")
|
||||
return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/<id>")
|
||||
|
||||
parameters = ["email", "id", "template_image_file_name", "vcpus", "memory_mb", "ssh_public_keys"]
|
||||
error_message = ""
|
||||
for parameter in parameters:
|
||||
if parameter not in request_body:
|
||||
error_message = f"{error_message}\n{parameter} is required for create"
|
||||
|
||||
if error_message != "":
|
||||
current_app.logger.info(f"/hosts/operation returned 400: {error_message}")
|
||||
current_app.logger.info(f"/hosts/opasdascasderation returned 400: {error_message}")
|
||||
return abort(400, f"bad request; {error_message}")
|
||||
|
||||
# only one host should create the vm, so we first race to assign this create operation to ourselves.
|
||||
# only one host will win this race
|
||||
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
|
||||
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{request_body['operation_id']}/{current_app.config['SPOKE_HOST_ID']}"
|
||||
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}"
|
||||
result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header)
|
||||
|
||||
assignment_status = ""
|
||||
@ -132,7 +146,7 @@ def handle_create(request_body):
|
||||
|
||||
return jsonify(dict(assignment_status=assignment_status))
|
||||
|
||||
def handle_destroy(request_body):
|
||||
def handle_destroy(operation_id, request_body):
|
||||
if 'id' not in request_body:
|
||||
current_app.logger.info(f"/hosts/operation returned 400: id is required for destroy")
|
||||
return abort(400, f"bad request; id is required for destroy")
|
||||
|
Loading…
Reference in New Issue
Block a user