forked from 3wordchant/capsul-flask
introduce InterThreadResult to ensure that current_app is only accessed
from the main flask thread
This commit is contained in:
parent
0a70c974ec
commit
47b2b3ee13
@ -16,6 +16,11 @@ class HTTPResult:
|
|||||||
self.status_code = status_code
|
self.status_code = status_code
|
||||||
self.body = body
|
self.body = body
|
||||||
|
|
||||||
|
class InterThreadResult:
|
||||||
|
def __init__(self, http_result, error=None):
|
||||||
|
self.http_result = http_result
|
||||||
|
self.error = error
|
||||||
|
|
||||||
class MyHTTPClient:
|
class MyHTTPClient:
|
||||||
def __init__(self, timeout_seconds = 5):
|
def __init__(self, timeout_seconds = 5):
|
||||||
self.timeout_seconds = timeout_seconds
|
self.timeout_seconds = timeout_seconds
|
||||||
@ -24,11 +29,21 @@ class MyHTTPClient:
|
|||||||
|
|
||||||
def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
||||||
future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
|
future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
|
||||||
return future.result()
|
fromOtherThread = future.result()
|
||||||
|
toReturn = []
|
||||||
|
for individualResult in fromOtherThread:
|
||||||
|
if individualResult.error != None and individualResult.error != "":
|
||||||
|
current_app.logger.error(fromOtherThread.error)
|
||||||
|
toReturn = toReturn.append(individualResult.http_result)
|
||||||
|
|
||||||
|
return toReturn
|
||||||
|
|
||||||
def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
||||||
future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header))
|
future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header))
|
||||||
return future.result()
|
fromOtherThread = future.result()
|
||||||
|
if fromOtherThread.error != None and fromOtherThread.error != "":
|
||||||
|
current_app.logger.error(fromOtherThread.error)
|
||||||
|
return fromOtherThread.http_result
|
||||||
|
|
||||||
def get_client_session(self):
|
def get_client_session(self):
|
||||||
if not self.client_session:
|
if not self.client_session:
|
||||||
@ -36,7 +51,7 @@ class MyHTTPClient:
|
|||||||
|
|
||||||
return self.client_session
|
return self.client_session
|
||||||
|
|
||||||
async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult:
|
||||||
# TODO make a configuration option where this throws an error if the url does not start with https://
|
# TODO make a configuration option where this throws an error if the url does not start with https://
|
||||||
response = None
|
response = None
|
||||||
try:
|
try:
|
||||||
@ -55,10 +70,11 @@ class MyHTTPClient:
|
|||||||
except:
|
except:
|
||||||
error_message = my_exec_info_message(sys.exc_info())
|
error_message = my_exec_info_message(sys.exc_info())
|
||||||
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
|
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
|
||||||
current_app.logger.error(f"""
|
|
||||||
error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
|
return InterThreadResult(
|
||||||
|
HTTPResult(-1, response_body),
|
||||||
|
f"""error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
|
||||||
)
|
)
|
||||||
return HTTPResult(-1, response_body)
|
|
||||||
|
|
||||||
response_body = None
|
response_body = None
|
||||||
try:
|
try:
|
||||||
@ -66,13 +82,14 @@ class MyHTTPClient:
|
|||||||
except:
|
except:
|
||||||
error_message = my_exec_info_message(sys.exc_info())
|
error_message = my_exec_info_message(sys.exc_info())
|
||||||
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
|
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
|
||||||
current_app.logger.error(f"""
|
return InterThreadResult(
|
||||||
error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
HTTPResult(response.status, response_body),
|
||||||
|
f"""error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
return HTTPResult(response.status, response_body)
|
return InterThreadResult(HTTPResult(response.status, response_body), None)
|
||||||
|
|
||||||
async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]:
|
||||||
tasks = []
|
tasks = []
|
||||||
# append to tasks in the same order as online_hosts
|
# append to tasks in the same order as online_hosts
|
||||||
for host in online_hosts:
|
for host in online_hosts:
|
||||||
|
Loading…
Reference in New Issue
Block a user