283 lines
10 KiB
Python
283 lines
10 KiB
Python
|
import subprocess
|
||
|
import re
|
||
|
import sys
|
||
|
import requests
|
||
|
import json
|
||
|
import asyncio
|
||
|
from typing import List, Tuple
|
||
|
|
||
|
import aiohttp
|
||
|
from flask import current_app
|
||
|
from time import sleep
|
||
|
from os.path import join
|
||
|
from subprocess import run
|
||
|
|
||
|
from capsulflask.db_model import OnlineHost
|
||
|
from capsulflask.spoke_model import VirtualMachine
|
||
|
from capsulflask.spoke_model import validate_capsul_id
|
||
|
from capsulflask.db import get_model, my_exec_info_message
|
||
|
|
||
|
class HTTPResult:
|
||
|
def __init__(self, status_code, body=None):
|
||
|
self.status_code = status_code
|
||
|
self.body = body
|
||
|
|
||
|
class HubInterface:
|
||
|
def capacity_avaliable(self, additional_ram_bytes: int) -> bool:
|
||
|
pass
|
||
|
|
||
|
def get(self, id: str) -> VirtualMachine:
|
||
|
pass
|
||
|
|
||
|
def list_ids(self) -> list:
|
||
|
pass
|
||
|
|
||
|
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list):
|
||
|
pass
|
||
|
|
||
|
def destroy(self, email: str, id: str):
|
||
|
pass
|
||
|
|
||
|
class MockHub(HubInterface):
|
||
|
def capacity_avaliable(self, additional_ram_bytes):
|
||
|
return True
|
||
|
|
||
|
def get(self, id):
|
||
|
validate_capsul_id(id)
|
||
|
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4="1.1.1.1")
|
||
|
|
||
|
def list_ids(self) -> list:
|
||
|
return get_model().all_non_deleted_vm_ids()
|
||
|
|
||
|
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list):
|
||
|
validate_capsul_id(id)
|
||
|
current_app.logger.info(f"mock create: {id} for {email}")
|
||
|
sleep(1)
|
||
|
|
||
|
def destroy(self, email: str, id: str):
|
||
|
current_app.logger.info(f"mock destroy: {id} for {email}")
|
||
|
|
||
|
|
||
|
class CapsulFlaskHub(HubInterface):
|
||
|
|
||
|
|
||
|
async def post_json(self, method: str, url: str, body: str, session: aiohttp.ClientSession) -> HTTPResult:
|
||
|
response = None
|
||
|
try:
|
||
|
response = await session.request(
|
||
|
method=method,
|
||
|
url=url,
|
||
|
json=body,
|
||
|
auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']),
|
||
|
verify_ssl=True,
|
||
|
)
|
||
|
except:
|
||
|
error_message = my_exec_info_message(sys.exc_info())
|
||
|
response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"})
|
||
|
current_app.logger.error(f"""
|
||
|
error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}"""
|
||
|
)
|
||
|
return HTTPResult(-1, response_body)
|
||
|
|
||
|
response_body = None
|
||
|
try:
|
||
|
response_body = await response.text()
|
||
|
except:
|
||
|
error_message = my_exec_info_message(sys.exc_info())
|
||
|
response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"})
|
||
|
current_app.logger.error(f"""
|
||
|
error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}"""
|
||
|
)
|
||
|
|
||
|
return HTTPResult(response.status, response_body)
|
||
|
|
||
|
async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult):
|
||
|
timeout_seconds = 5
|
||
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) as session:
|
||
|
tasks = []
|
||
|
# append to tasks in the same order as online_hosts
|
||
|
for host in online_hosts:
|
||
|
tasks.append(
|
||
|
self.post_json(method="POST", url=host.url, body=body, session=session)
|
||
|
)
|
||
|
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results
|
||
|
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts
|
||
|
results = await asyncio.gather(*tasks)
|
||
|
|
||
|
return results
|
||
|
|
||
|
|
||
|
async def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]:
|
||
|
operation_id = get_model().create_operation(hosts, payload)
|
||
|
results = await self.make_requests(hosts, payload)
|
||
|
for i in range(len(hosts)):
|
||
|
host = hosts[i]
|
||
|
result = results[i]
|
||
|
task_result = None
|
||
|
assignment_status = "pending"
|
||
|
if result.status_code == -1:
|
||
|
assignment_status = "no_response_from_host"
|
||
|
if result.status_code != 200:
|
||
|
assignment_status = "error_response_from_host"
|
||
|
else:
|
||
|
valid_statuses = {
|
||
|
"assigned": True,
|
||
|
"not_applicable": True,
|
||
|
"assigned_to_other_host": True,
|
||
|
}
|
||
|
result_is_json = False
|
||
|
result_is_dict = False
|
||
|
result_has_status = False
|
||
|
result_has_valid_status = False
|
||
|
assignment_status = "invalid_response_from_host"
|
||
|
try:
|
||
|
if immediate_mode:
|
||
|
task_result = result.body
|
||
|
result_body = json.loads(result.body)
|
||
|
result_is_json = True
|
||
|
result_is_dict = isinstance(result_body, dict)
|
||
|
result_has_status = result_is_dict and 'assignment_status' in result_body
|
||
|
result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses
|
||
|
if result_has_valid_status:
|
||
|
assignment_status = result_body['assignment_status']
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
if not result_has_valid_status:
|
||
|
current_app.logger.error(f"""error reading assignment_status for operation {operation_id} from host {host.id}:
|
||
|
result_is_json: {result_is_json}
|
||
|
result_is_dict: {result_is_dict}
|
||
|
result_has_status: {result_has_status}
|
||
|
result_has_valid_status: {result_has_valid_status}
|
||
|
"""
|
||
|
)
|
||
|
|
||
|
get_model().update_host_operation(host.id, operation_id, assignment_status, task_result)
|
||
|
|
||
|
return results
|
||
|
|
||
|
async def capacity_avaliable(self, additional_ram_bytes):
|
||
|
online_hosts = get_model().get_online_hosts()
|
||
|
payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes))
|
||
|
op = await self.generic_operation(online_hosts, payload, True)
|
||
|
results = op[1]
|
||
|
for result in results:
|
||
|
try:
|
||
|
result_body = json.loads(result.body)
|
||
|
if isinstance(result_body, dict) and 'capacity_avaliable' in result_body and result_body['capacity_avaliable'] == True:
|
||
|
return True
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
return False
|
||
|
|
||
|
|
||
|
async def get(self, id) -> VirtualMachine:
|
||
|
validate_capsul_id(id)
|
||
|
host = get_model().host_of_capsul(id)
|
||
|
if host is not None:
|
||
|
payload = json.dumps(dict(type="get", id=id))
|
||
|
op = await self.generic_operation([host], payload, True)
|
||
|
results = op[1]
|
||
|
for result in results:
|
||
|
try:
|
||
|
result_body = json.loads(result.body)
|
||
|
if isinstance(result_body, dict) and ('ipv4' in result_body or 'ipv6' in result_body):
|
||
|
return VirtualMachine(id, host=host, ipv4=result_body['ipv4'], ipv6=result_body['ipv6'])
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
return None
|
||
|
|
||
|
def list_ids(self) -> list:
|
||
|
online_hosts = get_model().get_online_hosts()
|
||
|
payload = json.dumps(dict(type="list_ids"))
|
||
|
op = await self.generic_operation(online_hosts, payload, False)
|
||
|
operation_id = op[0]
|
||
|
results = op[1]
|
||
|
to_return = []
|
||
|
for i in range(len(results)):
|
||
|
host = online_hosts[i]
|
||
|
result = results[i]
|
||
|
try:
|
||
|
result_body = json.loads(result.body)
|
||
|
if isinstance(result_body, dict) and 'ids' in result_body and isinstance(result_body['ids'], list):
|
||
|
all_valid = True
|
||
|
for id in result_body['ids']:
|
||
|
try:
|
||
|
validate_capsul_id(id)
|
||
|
to_return.append(id)
|
||
|
except:
|
||
|
all_valid = False
|
||
|
if all_valid:
|
||
|
get_model().update_host_operation(host.id, operation_id, None, result.body)
|
||
|
else:
|
||
|
result_json_string = json.dumps({"error_message": "invalid capsul id returned"})
|
||
|
get_model().update_host_operation(host.id, operation_id, None, result_json_string)
|
||
|
current_app.logger.error(f"""error reading ids for list_ids operation {operation_id}, host {host.id}""")
|
||
|
else:
|
||
|
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
|
||
|
get_model().update_host_operation(host.id, operation_id, "invalid_response_from_host", result_json_string)
|
||
|
current_app.logger.error(f"""missing 'ids' list for list_ids operation {operation_id}, host {host.id}""")
|
||
|
except:
|
||
|
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
|
||
|
pass
|
||
|
|
||
|
return to_return
|
||
|
|
||
|
def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list):
|
||
|
validate_capsul_id(id)
|
||
|
online_hosts = get_model().get_online_hosts()
|
||
|
payload = json.dumps(dict(
|
||
|
type="create",
|
||
|
email=email,
|
||
|
id=id,
|
||
|
template_image_file_name=template_image_file_name,
|
||
|
vcpus=vcpus,
|
||
|
memory_mb=memory_mb,
|
||
|
ssh_public_keys=ssh_public_keys,
|
||
|
))
|
||
|
op = await self.generic_operation(online_hosts, payload, False)
|
||
|
operation_id = op[0]
|
||
|
results = op[1]
|
||
|
number_of_assigned = 0
|
||
|
assigned_hosts = []
|
||
|
for i in range(len(results)):
|
||
|
host = online_hosts[i]
|
||
|
result = results[i]
|
||
|
try:
|
||
|
result_body = json.loads(result.body)
|
||
|
if isinstance(result_body, dict) and 'assignment_status' in result_body and result_body['assignment_status'] == "assigned":
|
||
|
number_of_assigned += 1
|
||
|
assigned_hosts.append(host.id)
|
||
|
except:
|
||
|
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
|
||
|
pass
|
||
|
|
||
|
if number_of_assigned != 1:
|
||
|
assigned_hosts_string = ", ".join(assigned_hosts)
|
||
|
raise ValueError(f"expected create capsul operation {operation_id} to be assigned to one host, it was assigned to {number_of_assigned} ({assigned_hosts_string})")
|
||
|
|
||
|
|
||
|
|
||
|
def destroy(self, email: str, id: str):
|
||
|
validate_capsul_id(id)
|
||
|
result_status = None
|
||
|
host = get_model().host_of_capsul(id)
|
||
|
if host is not None:
|
||
|
payload = json.dumps(dict(type="destroy", id=id))
|
||
|
op = await self.generic_operation([host], payload, True)
|
||
|
results = op[1]
|
||
|
result_json_string = "<no response from host>"
|
||
|
for result in results:
|
||
|
try:
|
||
|
result_json_string = result.body
|
||
|
result_body = json.loads(result_json_string)
|
||
|
if isinstance(result_body, dict) and 'status' in result_body:
|
||
|
result_status = result_body['status']
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
if not result_status == "success":
|
||
|
raise ValueError(f"""failed to destroy vm "{id}" on host "{host}" for {email}: {result_json_string}""")
|