fixing error messages and naming functions better
This commit is contained in:
parent
0de5305eac
commit
9389c80cb6
@ -27,7 +27,7 @@ class MyHTTPClient:
|
|||||||
self.client_session = None
|
self.client_session = None
|
||||||
|
|
||||||
|
|
||||||
def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
def do_multi_http_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
||||||
future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
|
future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
|
||||||
fromOtherThread = future.result()
|
fromOtherThread = future.result()
|
||||||
toReturn = []
|
toReturn = []
|
||||||
@ -38,8 +38,8 @@ class MyHTTPClient:
|
|||||||
|
|
||||||
return toReturn
|
return toReturn
|
||||||
|
|
||||||
def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
def do_http_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
||||||
future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header))
|
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header))
|
||||||
fromOtherThread = future.result()
|
fromOtherThread = future.result()
|
||||||
if fromOtherThread.error != None and fromOtherThread.error != "":
|
if fromOtherThread.error != None and fromOtherThread.error != "":
|
||||||
current_app.logger.error(fromOtherThread.error)
|
current_app.logger.error(fromOtherThread.error)
|
||||||
@ -51,7 +51,7 @@ class MyHTTPClient:
|
|||||||
|
|
||||||
return self.client_session
|
return self.client_session
|
||||||
|
|
||||||
async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult:
|
async def do_http(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult:
|
||||||
# TODO make a configuration option where this throws an error if the url does not start with https://
|
# TODO make a configuration option where this throws an error if the url does not start with https://
|
||||||
response = None
|
response = None
|
||||||
try:
|
try:
|
||||||
@ -69,11 +69,11 @@ class MyHTTPClient:
|
|||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
error_message = my_exec_info_message(sys.exc_info())
|
error_message = my_exec_info_message(sys.exc_info())
|
||||||
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
|
response_body = json.dumps({"error_message": f"do_http (HTTP {method} {url}) {error_message}"})
|
||||||
|
|
||||||
return InterThreadResult(
|
return InterThreadResult(
|
||||||
HTTPResult(-1, response_body),
|
HTTPResult(-1, response_body),
|
||||||
f"""error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
|
f"""do_http (HTTP {method} {url}) failed with: {error_message}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
response_body = None
|
response_body = None
|
||||||
@ -81,20 +81,20 @@ class MyHTTPClient:
|
|||||||
response_body = await response.text()
|
response_body = await response.text()
|
||||||
except:
|
except:
|
||||||
error_message = my_exec_info_message(sys.exc_info())
|
error_message = my_exec_info_message(sys.exc_info())
|
||||||
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
|
response_body = json.dumps({"error_message": f"error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}"})
|
||||||
return InterThreadResult(
|
return InterThreadResult(
|
||||||
HTTPResult(response.status, response_body),
|
HTTPResult(response.status, response_body),
|
||||||
f"""error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
f"""error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
return InterThreadResult(HTTPResult(response.status, response_body), None)
|
return InterThreadResult(HTTPResult(response.status, response_body), None)
|
||||||
|
|
||||||
async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]:
|
async def do_multi_http(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]:
|
||||||
tasks = []
|
tasks = []
|
||||||
# append to tasks in the same order as online_hosts
|
# append to tasks in the same order as online_hosts
|
||||||
for host in online_hosts:
|
for host in online_hosts:
|
||||||
tasks.append(
|
tasks.append(
|
||||||
self.post_json(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header)
|
self.do_http(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header)
|
||||||
)
|
)
|
||||||
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
|
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
|
||||||
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
|
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
|
||||||
|
@ -21,7 +21,7 @@ def ping_all_hosts_task():
|
|||||||
all_hosts = get_model().get_all_hosts()
|
all_hosts = get_model().get_all_hosts()
|
||||||
current_app.logger.debug(f"pinging {len(all_hosts)} hosts...")
|
current_app.logger.debug(f"pinging {len(all_hosts)} hosts...")
|
||||||
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
|
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
|
||||||
results = current_app.config["HTTP_CLIENT"].make_requests_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header)
|
results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(all_hosts, "/spoke/heartbeat", None, authorization_header=authorization_header)
|
||||||
for i in range(len(all_hosts)):
|
for i in range(len(all_hosts)):
|
||||||
host = all_hosts[i]
|
host = all_hosts[i]
|
||||||
result = results[i]
|
result = results[i]
|
||||||
|
@ -64,7 +64,7 @@ class CapsulFlaskHub(VirtualizationInterface):
|
|||||||
operation_id = get_model().create_operation(hosts, email, payload)
|
operation_id = get_model().create_operation(hosts, email, payload)
|
||||||
url_path = f"/spoke/operation/{operation_id}"
|
url_path = f"/spoke/operation/{operation_id}"
|
||||||
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
|
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
|
||||||
results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, url_path, payload, authorization_header=authorization_header)
|
results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(hosts, url_path, payload, authorization_header=authorization_header)
|
||||||
|
|
||||||
for i in range(len(hosts)):
|
for i in range(len(hosts)):
|
||||||
host = hosts[i]
|
host = hosts[i]
|
||||||
|
@ -17,20 +17,20 @@ def heartbeat():
|
|||||||
if authorized_as_hub(request.headers):
|
if authorized_as_hub(request.headers):
|
||||||
url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}"
|
url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}"
|
||||||
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
|
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
|
||||||
result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header)
|
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header)
|
||||||
if result.status_code == -1:
|
if result.status_code == -1:
|
||||||
current_app.logger.info(f"/hosts/heartbeat returned 503: hub at {url} timed out or cannot be reached")
|
current_app.logger.info(f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached")
|
||||||
return abort(503, "Service Unavailable: hub timed out or cannot be reached")
|
return abort(503, "Service Unavailable: hub timed out or cannot be reached")
|
||||||
if result.status_code == 401:
|
if result.status_code == 401:
|
||||||
current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} rejected our token")
|
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} rejected our token")
|
||||||
return abort(502, "hub rejected our token")
|
return abort(502, "hub rejected our token")
|
||||||
if result.status_code != 200:
|
if result.status_code != 200:
|
||||||
current_app.logger.info(f"/hosts/heartbeat returned 502: hub at {url} returned {result.status_code}")
|
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}")
|
||||||
return abort(502, "Bad Gateway: hub did not return 200")
|
return abort(502, "Bad Gateway: hub did not return 200")
|
||||||
|
|
||||||
return "OK"
|
return "OK"
|
||||||
else:
|
else:
|
||||||
current_app.logger.info(f"/hosts/heartbeat returned 401: invalid hub token")
|
current_app.logger.info(f"/spoke/heartbeat returned 401: invalid hub token")
|
||||||
return abort(401, "invalid hub token")
|
return abort(401, "invalid hub token")
|
||||||
|
|
||||||
@bp.route("/operation/<int:operation_id>", methods=("POST",))
|
@bp.route("/operation/<int:operation_id>", methods=("POST",))
|
||||||
@ -120,7 +120,7 @@ def handle_create(operation_id, request_body):
|
|||||||
# only one host will win this race
|
# only one host will win this race
|
||||||
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
|
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}"
|
||||||
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}"
|
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}"
|
||||||
result = current_app.config['HTTP_CLIENT'].post_json_sync(url, body=None, authorization_header=authorization_header)
|
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header)
|
||||||
|
|
||||||
assignment_status = ""
|
assignment_status = ""
|
||||||
if result.status_code == 200:
|
if result.status_code == 200:
|
||||||
|
Loading…
Reference in New Issue
Block a user