forked from 3wordchant/capsul-flask
180 lines
6.2 KiB
Python
180 lines
6.2 KiB
Python
|
|
import sys
|
|
import json
|
|
import itertools
|
|
import time
|
|
import threading
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
from flask import current_app
|
|
from capsulflask.shared import *
|
|
from typing import List
|
|
|
|
class HTTPResult:
|
|
def __init__(self, status_code, body=None):
|
|
self.status_code = status_code
|
|
self.body = body
|
|
|
|
class InterThreadResult:
|
|
def __init__(self, http_result, error=None):
|
|
self.http_result = http_result
|
|
self.error = error
|
|
|
|
class MyHTTPClient:
|
|
def __init__(self, timeout_seconds = 5):
|
|
self.timeout_seconds = timeout_seconds
|
|
self.client_session = None
|
|
|
|
|
|
def do_multi_http_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
|
|
future = run_coroutine(self.do_multi_http(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
|
|
fromOtherThread = future.result()
|
|
toReturn = []
|
|
for individualResult in fromOtherThread:
|
|
if individualResult.error != None and individualResult.error != "":
|
|
mylog_error(current_app, individualResult.error)
|
|
toReturn.append(individualResult.http_result)
|
|
|
|
return toReturn
|
|
|
|
def do_http_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
|
|
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header))
|
|
fromOtherThread = future.result()
|
|
if fromOtherThread.error != None and fromOtherThread.error != "":
|
|
mylog_error(current_app, fromOtherThread.error)
|
|
return fromOtherThread.http_result
|
|
|
|
def get_client_session(self):
|
|
if not self.client_session:
|
|
self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout_seconds))
|
|
|
|
return self.client_session
|
|
|
|
async def do_http(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult:
|
|
# TODO make a configuration option where this throws an error if the url does not start with https://
|
|
response = None
|
|
try:
|
|
headers = {}
|
|
if authorization_header != None:
|
|
headers['Authorization'] = authorization_header
|
|
if body:
|
|
headers['Content-Type'] = "application/json"
|
|
response = await self.get_client_session().request(
|
|
method=method,
|
|
url=url,
|
|
json=body,
|
|
headers=headers,
|
|
verify_ssl=True,
|
|
)
|
|
except:
|
|
error_message = my_exec_info_message(sys.exc_info())
|
|
response_body = json.dumps({"error_message": f"do_http (HTTP {method} {url}) {error_message}"})
|
|
|
|
return InterThreadResult(
|
|
HTTPResult(-1, response_body),
|
|
f"""do_http (HTTP {method} {url}) failed with: {error_message}"""
|
|
)
|
|
|
|
response_body = None
|
|
try:
|
|
response_body = await response.text()
|
|
except:
|
|
error_message = my_exec_info_message(sys.exc_info())
|
|
response_body = json.dumps({"error_message": f"error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}"})
|
|
return InterThreadResult(
|
|
HTTPResult(response.status, response_body),
|
|
f"""error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
|
)
|
|
|
|
return InterThreadResult(HTTPResult(response.status, response_body), None)
|
|
|
|
async def do_multi_http(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]:
|
|
tasks = []
|
|
# append to tasks in the same order as online_hosts
|
|
for host in online_hosts:
|
|
tasks.append(
|
|
self.do_http(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header)
|
|
)
|
|
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
|
|
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
return results
|
|
|
|
|
|
# i lifted this direct from https://stackoverflow.com/a/58616001
|
|
# this is the bridge between Flask's one-thread-per-request world
|
|
# and aiohttp's event-loop based world -- it allows us to call run_coroutine from a flask request handler
|
|
|
|
class EventLoopThread(threading.Thread):
|
|
loop = None
|
|
_count = itertools.count(0)
|
|
|
|
def __init__(self):
|
|
name = f"{type(self).__name__}-{next(self._count)}"
|
|
super().__init__(name=name, daemon=True)
|
|
|
|
def __repr__(self):
|
|
loop, r, c, d = self.loop, False, True, False
|
|
if loop is not None:
|
|
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
|
|
return (
|
|
f"<{type(self).__name__} {self.name} id={self.ident} "
|
|
f"running={r} closed={c} debug={d}>"
|
|
)
|
|
|
|
def run(self):
|
|
self.loop = loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
try:
|
|
shutdown_asyncgens = loop.shutdown_asyncgens()
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
loop.run_until_complete(shutdown_asyncgens)
|
|
loop.close()
|
|
asyncio.set_event_loop(None)
|
|
|
|
def stop(self):
|
|
loop, self.loop = self.loop, None
|
|
if loop is None:
|
|
return
|
|
loop.call_soon_threadsafe(loop.stop)
|
|
self.join()
|
|
|
|
_lock = threading.Lock()
|
|
_loop_thread = None
|
|
|
|
def get_event_loop():
|
|
global _loop_thread
|
|
|
|
if _loop_thread is None:
|
|
with _lock:
|
|
if _loop_thread is None:
|
|
_loop_thread = EventLoopThread()
|
|
_loop_thread.start()
|
|
# give the thread up to a second to produce a loop
|
|
deadline = time.time() + 1
|
|
while not _loop_thread.loop and time.time() < deadline:
|
|
time.sleep(0.001)
|
|
|
|
return _loop_thread.loop
|
|
|
|
def stop_event_loop():
|
|
global _loop_thread
|
|
with _lock:
|
|
if _loop_thread is not None:
|
|
_loop_thread.stop()
|
|
_loop_thread = None
|
|
|
|
def run_coroutine(coro):
|
|
"""Run the coroutine in the event loop running in a separate thread
|
|
Returns a Future, call Future.result() to get the output
|
|
"""
|
|
|
|
return asyncio.run_coroutine_threadsafe(coro, get_event_loop()) |