capsul-flask/capsulflask/http_client.py

160 lines
5.3 KiB
Python
Raw Normal View History

2021-01-03 21:19:29 +00:00
import sys
import json
import itertools
import time
import threading
2021-01-03 21:19:29 +00:00
import aiohttp
import asyncio
from flask import current_app
from capsulflask.shared import OnlineHost, my_exec_info_message
2021-01-03 21:19:29 +00:00
from typing import List
class HTTPResult:
def __init__(self, status_code, body=None):
self.status_code = status_code
self.body = body
class MyHTTPClient:
def __init__(self, timeout_seconds = 5):
self.timeout_seconds = timeout_seconds
self.client_session = None
2021-01-03 21:19:29 +00:00
def make_requests_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
future = run_coroutine(self.make_requests(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header))
return future.result()
2021-01-03 21:19:29 +00:00
def post_json_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
future = run_coroutine(self.post_json(method=method, url=url, body=body, authorization_header=authorization_header))
return future.result()
def get_client_session(self):
if not self.client_session:
self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout_seconds))
return self.client_session
2021-01-03 21:19:29 +00:00
async def post_json(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult:
2021-01-03 21:19:29 +00:00
response = None
try:
headers = {}
if authorization_header != None:
headers['Authorization'] = authorization_header
response = await self.get_client_session().request(
2021-01-03 21:19:29 +00:00
method=method,
url=url,
json=body,
headers=headers,
2021-01-03 21:19:29 +00:00
verify_ssl=True,
)
except:
error_message = my_exec_info_message(sys.exc_info())
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
current_app.logger.error(f"""
error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
)
return HTTPResult(-1, response_body)
response_body = None
try:
response_body = await response.text()
except:
error_message = my_exec_info_message(sys.exc_info())
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
current_app.logger.error(f"""
error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
)
return HTTPResult(response.status, response_body)
async def make_requests(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]:
2021-01-03 21:19:29 +00:00
tasks = []
# append to tasks in the same order as online_hosts
for host in online_hosts:
tasks.append(
self.post_json(url=f"{host.url}/{url_suffix}", body=body, authorization_header=authorization_header)
2021-01-03 21:19:29 +00:00
)
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
results = await asyncio.gather(*tasks)
return results
# i lifted this direct from https://stackoverflow.com/a/58616001
# this is the bridge between Flask's one-thread-per-request world
# and aiohttp's event-loop based world
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
deadline = time.time() + 1
while not _loop_thread.loop and time.time() < deadline:
time.sleep(0.001)
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())