trying to set up assignment of create operation

This commit is contained in:
forest 2021-01-03 19:17:30 -06:00
parent 42a8e0c886
commit d9c30e1ef8
6 changed files with 86 additions and 17 deletions

View File

@ -1,4 +1,6 @@
# I was never able to get this type hinting to work correctly
# from psycopg2.extensions import connection as Psycopg2Connection, cursor as Psycopg2Cursor
from nanoid import generate from nanoid import generate
from flask import current_app from flask import current_app
from typing import List from typing import List
@ -10,6 +12,7 @@ class OnlineHost:
self.url = url self.url = url
class DBModel: class DBModel:
#def __init__(self, connection: Psycopg2Connection, cursor: Psycopg2Cursor):
def __init__(self, connection, cursor): def __init__(self, connection, cursor):
self.connection = connection self.connection = connection
self.cursor = cursor self.cursor = cursor
@ -311,6 +314,30 @@ class DBModel:
else: else:
return None return None
def host_operation_exists(self, operation_id: int, host_id: str) -> bool:
self.cursor.execute("SELECT operation FROM host_operation WHERE host = %s AND operation = %s",(host_id, operation_id))
return len(self.cursor.fetchall()) != 0
def claim_operation(self, operation_id: int, host_id: str) -> bool:
self.cursor.execute("""
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN TRANSACTION;
UPDATE host_operation SET assignment_status = 'assigned'
WHERE host = %s AND operation = %s AND operation != (
SELECT COALESCE(
(SELECT operation FROM host_operation WHERE operation = %s AND assignment_status = 'assigned'),
-1
) as already_assigned_operation_id
);
COMMIT TRANSACTION;
""", (host_id, operation_id, operation_id))
to_return = self.cursor.rowcount != 0
self.connection.commit()
return to_return

View File

@ -22,16 +22,20 @@ class MyHTTPClient:
def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult):
self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body)) self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body))
def post_json_sync(self, method: str, url: str, body: str) -> HTTPResult: def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body)) self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body))
async def post_json(self, method: str, url: str, body: str) -> HTTPResult: async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
response = None response = None
try: try:
headers = {}
if authorization_header != None:
headers['Authorization'] = authorization_header
response = await self.client_session.request( response = await self.client_session.request(
method=method, method=method,
url=url, url=url,
json=body, json=body,
headers=headers,
auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']),
verify_ssl=True, verify_ssl=True,
) )
@ -60,7 +64,7 @@ class MyHTTPClient:
# append to tasks in the same order as online_hosts # append to tasks in the same order as online_hosts
for host in online_hosts: for host in online_hosts:
tasks.append( tasks.append(
self.post_json(method="POST", url=host.url, body=body) self.post_json(url=host.url, body=body)
) )
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results # gather is like Promise.all from javascript, it returns a future which resolves to an array of results
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts # in the same order as the tasks that we passed in -- which were in the same order as online_hosts

View File

@ -12,11 +12,26 @@ def authorized_for_host(id):
auth_header_value = request.headers.get('Authorization').replace("Bearer ", "") auth_header_value = request.headers.get('Authorization').replace("Bearer ", "")
return get_model().authorized_for_host(id, auth_header_value) return get_model().authorized_for_host(id, auth_header_value)
@bp.route("/heartbeat/<string:id>", methods=("POST")) @bp.route("/heartbeat/<string:host_id>", methods=("POST"))
def heartbeat(id): def heartbeat(host_id):
if authorized_for_host(id): if authorized_for_host(host_id):
get_model().host_heartbeat(id) get_model().host_heartbeat(host_id)
else: else:
current_app.logger.info(f"/hub/heartbeat/{id} returned 401: invalid token") current_app.logger.info(f"/hub/heartbeat/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token")
@bp.route("/claim-operation/<int:operation_id>/<string:host_id>", methods=("POST"))
def claim_operation(operation_id: int, host_id: str):
if authorized_for_host(host_id):
exists = get_model().host_operation_exists(operation_id, host_id)
if not exists:
return abort(404, "host operation not found")
claimed = get_model().claim_operation(operation_id, host_id)
if claimed:
return "ok"
else:
return abort(409, "operation was already assigned to another host")
else:
current_app.logger.info(f"/hub/claim-operation/{operation_id}/{host_id} returned 401: invalid token")
return abort(401, "invalid host id or token") return abort(401, "invalid host id or token")

View File

@ -64,7 +64,8 @@ class CapsulFlaskHub(VirtualizationInterface):
def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]:
operation_id = get_model().create_operation(hosts, payload) operation_id = get_model().create_operation(hosts, payload)
results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload) authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload, authorization_header=authorization_header)
for i in range(len(hosts)): for i in range(len(hosts)):
host = hosts[i] host = hosts[i]
result = results[i] result = results[i]

View File

@ -29,5 +29,4 @@ CREATE TABLE host_operation (
PRIMARY KEY (host, operation) PRIMARY KEY (host, operation)
); );
UPDATE schemaversion SET version = 9;
UPDATE schemaversion SET version = 9;

View File

@ -18,9 +18,20 @@ def authorized_as_hub(id):
@bp.route("/heartbeat", methods=("POST")) @bp.route("/heartbeat", methods=("POST"))
def heartbeat(): def heartbeat():
if authorized_as_hub(id): if authorized_as_hub(id):
# make request to hub-domain.com/hub/heartbeat/{current_app.config["SPOKE_HOST_ID"]} url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}"
# succeed or fail based on whether the request succeeds or fails. authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
pass result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header)
if result.status_code == -1:
current_app.logger.info(f"/hosts/heartbeat returned 503: hub at {url} timed out or cannot be reached")
return abort(503, "Service Unavailable: hub timed out or cannot be reached")
if result.status_code == 401:
current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} rejected our token")
return abort(502, "hub rejected our token")
if result.status_code != 200:
current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} returned {result.status_code}")
return abort(502, "Bad Gateway: hub did not return 200")
return "OK"
else: else:
current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token") current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token")
return abort(401, "invalid hub token") return abort(401, "invalid hub token")
@ -90,9 +101,21 @@ def handle_create(request_body):
current_app.logger.info(f"/hosts/operation returned 400: {error_message}") current_app.logger.info(f"/hosts/operation returned 400: {error_message}")
return abort(400, f"bad request; {error_message}") return abort(400, f"bad request; {error_message}")
# try to aquire operation_id # only one host should create the vm, so we first race to assign this create operation to ourselves.
assignment_status = "assigned" # only one host will win this race
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{request_body['operation_id']}/{current_app.config['SPOKE_HOST_ID']}"
result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header)
assignment_status = ""
if result.status_code == 200:
assignment_status = "assigned"
elif result.status_code == 409:
assignment_status = "assigned_to_other_host"
else:
current_app.logger.info(f"{url} returned {result.status_code}: {result.body}")
return abort(503, f"hub did not cleanly handle our request to claim the create operation")
if assignment_status == "assigned": if assignment_status == "assigned":
try: try:
current_app.config['SPOKE_MODEL'].create( current_app.config['SPOKE_MODEL'].create(
@ -108,7 +131,7 @@ def handle_create(request_body):
params = f"email='{request_body['email']}', id='{request_body['id']}', " params = f"email='{request_body['email']}', id='{request_body['id']}', "
params = f"{params}, template_image_file_name='{request_body['template_image_file_name']}', vcpus='{request_body['vcpus']}'" params = f"{params}, template_image_file_name='{request_body['template_image_file_name']}', vcpus='{request_body['vcpus']}'"
params = f"{params}, memory_mb='{request_body['memory_mb']}', ssh_public_keys='{request_body['ssh_public_keys']}'" params = f"{params}, memory_mb='{request_body['memory_mb']}', ssh_public_keys='{request_body['ssh_public_keys']}'"
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].create({params}) failed: {error_message}") current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}")
return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) return jsonify(dict(assignment_status=assignment_status, error_message=error_message))
return jsonify(dict(assignment_status=assignment_status)) return jsonify(dict(assignment_status=assignment_status))