import sys import json import itertools import time import threading import aiohttp import asyncio from flask import current_app from capsulflask.shared import OnlineHost, my_exec_info_message from typing import List class HTTPResult: def __init__(self, status_code, body=None): self.status_code = status_code self.body = body class InterThreadResult: def __init__(self, http_result, error=None): self.http_result = http_result self.error = error class MyHTTPClient: def __init__(self, timeout_seconds = 5): self.timeout_seconds = timeout_seconds self.client_session = None def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]: future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header)) fromOtherThread = future.result() toReturn = [] for individualResult in fromOtherThread: if individualResult.error != None and individualResult.error != "": current_app.logger.error(fromOtherThread.error) toReturn = toReturn.append(individualResult.http_result) return toReturn def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header)) fromOtherThread = future.result() if fromOtherThread.error != None and fromOtherThread.error != "": current_app.logger.error(fromOtherThread.error) return fromOtherThread.http_result def get_client_session(self): if not self.client_session: self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout_seconds)) return self.client_session async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult: # TODO make a configuration option where this throws an error if the url does not start with https:// response = None try: headers = {} if authorization_header != None: headers['Authorization'] = authorization_header if body: headers['Content-Type'] = "application/json" response = await self.get_client_session().request( method=method, url=url, json=body, headers=headers, verify_ssl=True, ) except: error_message = my_exec_info_message(sys.exc_info()) response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) return InterThreadResult( HTTPResult(-1, response_body), f"""error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" ) response_body = None try: response_body = await response.text() except: error_message = my_exec_info_message(sys.exc_info()) response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) return InterThreadResult( HTTPResult(response.status, response_body), f"""error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" ) return InterThreadResult(HTTPResult(response.status, response_body), None) async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]: tasks = [] # append to tasks in the same order as online_hosts for host in online_hosts: tasks.append( self.post_json(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header) ) # gather is like Promise.all from javascript, it returns a future which resolves to an array of results # in the same order as the tasks that we passed in -- which were in the same order as online_hosts results = await asyncio.gather(*tasks) return results # i lifted this direct from https://stackoverflow.com/a/58616001 # this is the bridge between Flask's one-thread-per-request world # and aiohttp's event-loop based world -- it allows us to call run_coroutine from a flask request handler class EventLoopThread(threading.Thread): loop = None _count = itertools.count(0) def __init__(self): name = f"{type(self).__name__}-{next(self._count)}" super().__init__(name=name, daemon=True) def __repr__(self): loop, r, c, d = self.loop, False, True, False if loop is not None: r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug() return ( f"<{type(self).__name__} {self.name} id={self.ident} " f"running={r} closed={c} debug={d}>" ) def run(self): self.loop = loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_forever() finally: try: shutdown_asyncgens = loop.shutdown_asyncgens() except AttributeError: pass else: loop.run_until_complete(shutdown_asyncgens) loop.close() asyncio.set_event_loop(None) def stop(self): loop, self.loop = self.loop, None if loop is None: return loop.call_soon_threadsafe(loop.stop) self.join() _lock = threading.Lock() _loop_thread = None def get_event_loop(): global _loop_thread if _loop_thread is None: with _lock: if _loop_thread is None: _loop_thread = EventLoopThread() _loop_thread.start() # give the thread up to a second to produce a loop deadline = time.time() + 1 while not _loop_thread.loop and time.time() < deadline: time.sleep(0.001) return _loop_thread.loop def stop_event_loop(): global _loop_thread with _lock: if _loop_thread is not None: _loop_thread.stop() _loop_thread = None def run_coroutine(coro): """Run the coroutine in the event loop running in a separate thread Returns a Future, call Future.result() to get the output """ return asyncio.run_coroutine_threadsafe(coro, get_event_loop())