capsul-flask/capsulflask/hub_model.py

287 lines
11 KiB
Python
Raw Normal View History

import subprocess
import re
import sys
import requests
import json
import asyncio
from typing import List, Tuple
import aiohttp
from flask import current_app
from time import sleep
from os.path import join
from subprocess import run
from capsulflask.db_model import OnlineHost
from capsulflask.spoke_model import VirtualMachine
from capsulflask.spoke_model import validate_capsul_id
from capsulflask.db import get_model, my_exec_info_message
class HTTPResult:
def __init__(self, status_code, body=None):
self.status_code = status_code
self.body = body
class HubInterface:
def capacity_avaliable(self, additional_ram_bytes: int) -> bool:
pass
def get(self, id: str) -> VirtualMachine:
pass
def list_ids(self) -> list:
pass
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list):
pass
def destroy(self, email: str, id: str):
pass
class MockHub(HubInterface):
def capacity_avaliable(self, additional_ram_bytes):
return True
def get(self, id):
validate_capsul_id(id)
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4="1.1.1.1")
def list_ids(self) -> list:
return get_model().all_non_deleted_vm_ids()
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list):
validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}")
sleep(1)
def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}")
class CapsulFlaskHub(HubInterface):
async def post_json(self, method: str, url: str, body: str, session: aiohttp.ClientSession) -> HTTPResult:
response = None
try:
response = await session.request(
method=method,
url=url,
json=body,
auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']),
verify_ssl=True,
)
except:
error_message = my_exec_info_message(sys.exc_info())
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
current_app.logger.error(f"""
error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
)
return HTTPResult(-1, response_body)
response_body = None
try:
response_body = await response.text()
except:
error_message = my_exec_info_message(sys.exc_info())
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
current_app.logger.error(f"""
error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
)
return HTTPResult(response.status, response_body)
async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult):
timeout_seconds = 5
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) as session:
tasks = []
# append to tasks in the same order as online_hosts
for host in online_hosts:
tasks.append(
self.post_json(method="POST", url=host.url, body=body, session=session)
)
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
results = await asyncio.gather(*tasks)
return results
async def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]:
operation_id = get_model().create_operation(hosts, payload)
results = await self.make_requests(hosts, payload)
for i in range(len(hosts)):
host = hosts[i]
result = results[i]
task_result = None
assignment_status = "pending"
if result.status_code == -1:
assignment_status = "no_response_from_host"
if result.status_code != 200:
assignment_status = "error_response_from_host"
else:
valid_statuses = {
"assigned": True,
"not_applicable": True,
"assigned_to_other_host": True,
}
result_is_json = False
result_is_dict = False
result_has_status = False
result_has_valid_status = False
assignment_status = "invalid_response_from_host"
try:
if immediate_mode:
task_result = result.body
result_body = json.loads(result.body)
result_is_json = True
result_is_dict = isinstance(result_body, dict)
result_has_status = result_is_dict and 'assignment_status' in result_body
result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses
if result_has_valid_status:
assignment_status = result_body['assignment_status']
except:
pass
if not result_has_valid_status:
current_app.logger.error(f"""error reading assignment_status for operation {operation_id} from host {host.id}:
result_is_json: {result_is_json}
result_is_dict: {result_is_dict}
result_has_status: {result_has_status}
result_has_valid_status: {result_has_valid_status}
"""
)
get_model().update_host_operation(host.id, operation_id, assignment_status, task_result)
return results
async def capacity_avaliable(self, additional_ram_bytes):
online_hosts = get_model().get_online_hosts()
payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes))
op = await self.generic_operation(online_hosts, payload, True)
results = op[1]
for result in results:
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'capacity_avaliable' in result_body and result_body['capacity_avaliable'] == True:
return True
except:
pass
return False
async def get(self, id) -> VirtualMachine:
validate_capsul_id(id)
host = get_model().host_of_capsul(id)
if host is not None:
payload = json.dumps(dict(type="get", id=id))
op = await self.generic_operation([host], payload, True)
results = op[1]
for result in results:
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and ('ipv4' in result_body or 'ipv6' in result_body):
return VirtualMachine(id, host=host, ipv4=result_body['ipv4'], ipv6=result_body['ipv6'])
except:
pass
return None
def list_ids(self) -> list:
online_hosts = get_model().get_online_hosts()
payload = json.dumps(dict(type="list_ids"))
op = await self.generic_operation(online_hosts, payload, False)
operation_id = op[0]
results = op[1]
to_return = []
for i in range(len(results)):
host = online_hosts[i]
result = results[i]
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'ids' in result_body and isinstance(result_body['ids'], list):
all_valid = True
for id in result_body['ids']:
try:
validate_capsul_id(id)
to_return.append(id)
except:
all_valid = False
if all_valid:
get_model().update_host_operation(host.id, operation_id, None, result.body)
else:
result_json_string = json.dumps({"error_message": "invalid capsul id returned"})
get_model().update_host_operation(host.id, operation_id, None, result_json_string)
current_app.logger.error(f"""error reading ids for list_ids operation {operation_id}, host {host.id}""")
else:
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
get_model().update_host_operation(host.id, operation_id, "invalid_response_from_host", result_json_string)
current_app.logger.error(f"""missing 'ids' list for list_ids operation {operation_id}, host {host.id}""")
except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass
return to_return
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list):
validate_capsul_id(id)
online_hosts = get_model().get_online_hosts()
payload = json.dumps(dict(
type="create",
email=email,
id=id,
template_image_file_name=template_image_file_name,
vcpus=vcpus,
memory_mb=memory_mb,
ssh_public_keys=ssh_public_keys,
))
op = await self.generic_operation(online_hosts, payload, False)
operation_id = op[0]
results = op[1]
number_of_assigned = 0
2021-01-03 20:44:56 +00:00
error_message = ""
assigned_hosts = []
for i in range(len(results)):
host = online_hosts[i]
result = results[i]
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'assignment_status' in result_body and result_body['assignment_status'] == "assigned":
number_of_assigned += 1
assigned_hosts.append(host.id)
2021-01-03 20:44:56 +00:00
if isinstance(result_body, dict) and 'error_message' in result_body:
error_message = result_body['error_message']
except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass
if number_of_assigned != 1:
assigned_hosts_string = ", ".join(assigned_hosts)
raise ValueError(f"expected create capsul operation {operation_id} to be assigned to one host, it was assigned to {number_of_assigned} ({assigned_hosts_string})")
2021-01-03 20:44:56 +00:00
if error_message != "":
raise ValueError(f"create capsul operation {operation_id} on {assigned_hosts_string} failed with {error_message}")
def destroy(self, email: str, id: str):
validate_capsul_id(id)
result_status = None
host = get_model().host_of_capsul(id)
if host is not None:
payload = json.dumps(dict(type="destroy", id=id))
op = await self.generic_operation([host], payload, True)
results = op[1]
result_json_string = "<no response from host>"
for result in results:
try:
result_json_string = result.body
result_body = json.loads(result_json_string)
if isinstance(result_body, dict) and 'status' in result_body:
result_status = result_body['status']
except:
pass
if not result_status == "success":
raise ValueError(f"""failed to destroy vm "{id}" on host "{host}" for {email}: {result_json_string}""")