forked from 3wordchant/capsul-flask
		
	http client class
This commit is contained in:
		| @ -14,6 +14,7 @@ from flask import current_app | ||||
|  | ||||
| from capsulflask import hub_model, spoke_model, cli | ||||
| from capsulflask.btcpay import client as btcpay | ||||
| from capsulflask.http_client import MyHTTPClient | ||||
|  | ||||
| load_dotenv(find_dotenv()) | ||||
|  | ||||
| @ -81,7 +82,7 @@ stripe.api_key = app.config['STRIPE_SECRET_KEY'] | ||||
| stripe.api_version = app.config['STRIPE_API_VERSION'] | ||||
|  | ||||
| app.config['FLASK_MAIL_INSTANCE'] = Mail(app) | ||||
|  | ||||
| app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=5) | ||||
| app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY']) | ||||
|  | ||||
| if app.config['HUB_MODE_ENABLED']: | ||||
|  | ||||
							
								
								
									
										69
									
								
								capsulflask/http_client.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								capsulflask/http_client.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,69 @@ | ||||
|  | ||||
| import sys | ||||
| import json | ||||
|  | ||||
| import aiohttp | ||||
| import asyncio | ||||
| from flask import current_app | ||||
| from capsulflask.db import  my_exec_info_message | ||||
| from capsulflask.db_model import OnlineHost | ||||
| from typing import List | ||||
|  | ||||
| class HTTPResult: | ||||
|   def __init__(self, status_code, body=None): | ||||
|     self.status_code = status_code | ||||
|     self.body = body | ||||
|  | ||||
| class MyHTTPClient: | ||||
|   def __init__(self, timeout_seconds = 5): | ||||
|     self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) | ||||
|     self.event_loop = asyncio.get_event_loop() | ||||
|  | ||||
|   def make_requests_sync(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): | ||||
|     self.event_loop.run_until_complete(self.make_requests(online_hosts=online_hosts, body=body)) | ||||
|  | ||||
|   def post_json_sync(self, method: str, url: str,  body: str) -> HTTPResult: | ||||
|       self.event_loop.run_until_complete(self.post_json_sync(method=method, url=url, body=body)) | ||||
|  | ||||
|   async def post_json(self, method: str, url: str,  body: str) -> HTTPResult: | ||||
|     response = None | ||||
|     try: | ||||
|       response = await self.client_session.request( | ||||
|         method=method,  | ||||
|         url=url,  | ||||
|         json=body,  | ||||
|         auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']),  | ||||
|         verify_ssl=True, | ||||
|       ) | ||||
|     except: | ||||
|       error_message = my_exec_info_message(sys.exc_info()) | ||||
|       response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) | ||||
|       current_app.logger.error(f""" | ||||
|         error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" | ||||
|       ) | ||||
|       return HTTPResult(-1, response_body) | ||||
|  | ||||
|     response_body = None | ||||
|     try: | ||||
|       response_body = await response.text() | ||||
|     except: | ||||
|       error_message = my_exec_info_message(sys.exc_info()) | ||||
|       response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) | ||||
|       current_app.logger.error(f""" | ||||
|         error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" | ||||
|       ) | ||||
|        | ||||
|     return HTTPResult(response.status, response_body) | ||||
|  | ||||
|   async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): | ||||
|     tasks = [] | ||||
|     # append to tasks in the same order as online_hosts | ||||
|     for host in online_hosts: | ||||
|         tasks.append( | ||||
|             self.post_json(method="POST", url=host.url, body=body) | ||||
|         ) | ||||
|     # gather is like Promise.all from javascript, it returns a future which resolves to an array of results  | ||||
|     # in the same order as the tasks that we passed in -- which were in the same order as online_hosts | ||||
|     results = await asyncio.gather(*tasks) | ||||
|  | ||||
|     return results | ||||
| @ -13,16 +13,18 @@ from os.path import join | ||||
| from subprocess import run | ||||
|  | ||||
| from capsulflask.db_model import OnlineHost | ||||
| from capsulflask.spoke_model import VirtualMachine | ||||
| from capsulflask.spoke_model import validate_capsul_id | ||||
| from capsulflask.db import get_model, my_exec_info_message | ||||
| from capsulflask.http_client import HTTPResult | ||||
|  | ||||
| class HTTPResult: | ||||
|   def __init__(self, status_code, body=None): | ||||
|     self.status_code = status_code | ||||
|     self.body = body | ||||
| class VirtualMachine: | ||||
|   def __init__(self, id, host, ipv4=None, ipv6=None): | ||||
|     self.id = id | ||||
|     self.host = host | ||||
|     self.ipv4 = ipv4 | ||||
|     self.ipv6 = ipv6 | ||||
|  | ||||
| class HubInterface: | ||||
| class VirtualizationInterface: | ||||
|   def capacity_avaliable(self, additional_ram_bytes: int) -> bool: | ||||
|     pass | ||||
|  | ||||
| @ -38,7 +40,7 @@ class HubInterface: | ||||
|   def destroy(self, email: str, id: str): | ||||
|     pass | ||||
|  | ||||
| class MockHub(HubInterface): | ||||
| class MockHub(VirtualizationInterface): | ||||
|   def capacity_avaliable(self, additional_ram_bytes): | ||||
|     return True | ||||
|  | ||||
| @ -58,58 +60,11 @@ class MockHub(HubInterface): | ||||
|     current_app.logger.info(f"mock destroy: {id} for {email}") | ||||
|  | ||||
|  | ||||
| class CapsulFlaskHub(HubInterface): | ||||
|  | ||||
|    | ||||
|   async def post_json(self, method: str, url: str,  body: str, session: aiohttp.ClientSession) -> HTTPResult: | ||||
|       response = None | ||||
|       try: | ||||
|         response = await session.request( | ||||
|           method=method,  | ||||
|           url=url,  | ||||
|           json=body,  | ||||
|           auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']),  | ||||
|           verify_ssl=True, | ||||
|         ) | ||||
|       except: | ||||
|         error_message = my_exec_info_message(sys.exc_info()) | ||||
|         response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) | ||||
|         current_app.logger.error(f""" | ||||
|           error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" | ||||
|         ) | ||||
|         return HTTPResult(-1, response_body) | ||||
|  | ||||
|       response_body = None | ||||
|       try: | ||||
|         response_body = await response.text() | ||||
|       except: | ||||
|         error_message = my_exec_info_message(sys.exc_info()) | ||||
|         response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) | ||||
|         current_app.logger.error(f""" | ||||
|           error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" | ||||
|         ) | ||||
|          | ||||
|       return HTTPResult(response.status, response_body) | ||||
|  | ||||
|   async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): | ||||
|       timeout_seconds = 5 | ||||
|       async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) as session: | ||||
|           tasks = [] | ||||
|           # append to tasks in the same order as online_hosts | ||||
|           for host in online_hosts: | ||||
|               tasks.append( | ||||
|                   self.post_json(method="POST", url=host.url, body=body, session=session) | ||||
|               ) | ||||
|           # gather is like Promise.all from javascript, it returns a future which resolves to an array of results  | ||||
|           # in the same order as the tasks that we passed in -- which were in the same order as online_hosts | ||||
|           results = await asyncio.gather(*tasks) | ||||
|  | ||||
|           return results | ||||
| class CapsulFlaskHub(VirtualizationInterface): | ||||
|            | ||||
|  | ||||
|   async def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: | ||||
|   def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: | ||||
|     operation_id = get_model().create_operation(hosts, payload) | ||||
|     results = await self.make_requests(hosts, payload) | ||||
|     results = current_app.config["HTTP_CLIENT"].make_requests_sync(hosts, payload) | ||||
|     for i in range(len(hosts)): | ||||
|       host = hosts[i] | ||||
|       result = results[i] | ||||
| @ -130,6 +85,7 @@ class CapsulFlaskHub(HubInterface): | ||||
|         result_has_status = False | ||||
|         result_has_valid_status = False | ||||
|         assignment_status = "invalid_response_from_host" | ||||
|         error_message = "" | ||||
|         try: | ||||
|           if immediate_mode: | ||||
|             task_result = result.body | ||||
| @ -140,6 +96,8 @@ class CapsulFlaskHub(HubInterface): | ||||
|           result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses | ||||
|           if result_has_valid_status: | ||||
|             assignment_status = result_body['assignment_status'] | ||||
|           if result_is_dict and "error_message" in result_body: | ||||
|             error_message = result_body['error_message'] | ||||
|         except: | ||||
|           pass | ||||
|  | ||||
| @ -149,6 +107,7 @@ class CapsulFlaskHub(HubInterface): | ||||
|               result_is_dict: {result_is_dict} | ||||
|               result_has_status: {result_has_status} | ||||
|               result_has_valid_status: {result_has_valid_status} | ||||
|               error_message: {error_message} | ||||
|             """ | ||||
|           ) | ||||
|  | ||||
| @ -156,10 +115,10 @@ class CapsulFlaskHub(HubInterface): | ||||
|      | ||||
|     return results | ||||
|  | ||||
|   async def capacity_avaliable(self, additional_ram_bytes): | ||||
|   def capacity_avaliable(self, additional_ram_bytes): | ||||
|     online_hosts = get_model().get_online_hosts() | ||||
|     payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes)) | ||||
|     op = await self.generic_operation(online_hosts, payload, True) | ||||
|     op = self.generic_operation(online_hosts, payload, True) | ||||
|     results = op[1] | ||||
|     for result in results: | ||||
|       try: | ||||
|  | ||||
| @ -41,7 +41,12 @@ def operation(): | ||||
|     types_csv = ", ".join(handlers.keys()) | ||||
|     if isinstance(request_body, dict) and 'type' in request_body: | ||||
|       if request_body['type'] in handlers: | ||||
|         return handlers[request_body['type']](request_body) | ||||
|         try: | ||||
|           return handlers[request_body['type']](request_body) | ||||
|         except: | ||||
|           error_message = my_exec_info_message(sys.exc_info()) | ||||
|           current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") | ||||
|           return jsonify(dict(error_message=error_message)) | ||||
|       else: | ||||
|         error_message = f"'type' must be one of {types_csv}" | ||||
|     else: | ||||
|  | ||||
| @ -8,34 +8,13 @@ from subprocess import run | ||||
|  | ||||
| from capsulflask.db import get_model | ||||
|  | ||||
| from capsulflask.hub_model import VirtualizationInterface, VirtualMachine | ||||
|  | ||||
| def validate_capsul_id(id): | ||||
|   if not re.match(r"^(cvm|capsul)-[a-z0-9]{10}$", id): | ||||
|     raise ValueError(f"vm id \"{id}\" must match \"^capsul-[a-z0-9]{{10}}$\"") | ||||
|  | ||||
| class VirtualMachine: | ||||
|   def __init__(self, id, host, ipv4=None, ipv6=None): | ||||
|     self.id = id | ||||
|     self.host = host | ||||
|     self.ipv4 = ipv4 | ||||
|     self.ipv6 = ipv6 | ||||
|  | ||||
| class SpokeInterface: | ||||
|   def capacity_avaliable(self, additional_ram_bytes: int) -> bool: | ||||
|     pass | ||||
|  | ||||
|   def get(self, id: str) -> VirtualMachine: | ||||
|     pass | ||||
|  | ||||
|   def list_ids(self) -> list: | ||||
|     pass | ||||
|  | ||||
|   def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list): | ||||
|     pass | ||||
|  | ||||
|   def destroy(self, email: str, id: str): | ||||
|     pass | ||||
|  | ||||
| class MockSpoke(SpokeInterface): | ||||
| class MockSpoke(VirtualizationInterface): | ||||
|   def capacity_avaliable(self, additional_ram_bytes): | ||||
|     return True | ||||
|  | ||||
| @ -54,7 +33,7 @@ class MockSpoke(SpokeInterface): | ||||
|   def destroy(self, email: str, id: str): | ||||
|     current_app.logger.info(f"mock destroy: {id} for {email}") | ||||
|  | ||||
| class ShellScriptSpoke(SpokeInterface): | ||||
| class ShellScriptSpoke(VirtualizationInterface): | ||||
|  | ||||
|   def validate_completed_process(self, completedProcess, email=None): | ||||
|     emailPart = "" | ||||
|  | ||||
		Reference in New Issue
	
	Block a user