capsul-flask/capsulflask/hub_model.py

279 lines
12 KiB
Python
Raw Normal View History

import subprocess
import re
import sys
import requests
import json
import asyncio
from typing import List, Tuple
import aiohttp
from flask import current_app
from time import sleep
from os.path import join
from subprocess import run
from capsulflask.db import get_model
2021-01-03 21:19:29 +00:00
from capsulflask.http_client import HTTPResult
from capsulflask.shared import VirtualizationInterface, VirtualMachine, OnlineHost, validate_capsul_id, my_exec_info_message
2021-01-03 21:19:29 +00:00
class MockHub(VirtualizationInterface):
def __init__(self):
self.default_network = "public1"
self.default_ipv4 = "1.1.1.1"
def capacity_avaliable(self, additional_ram_bytes):
return True
def get(self, id, get_ssh_host_keys):
validate_capsul_id(id)
if get_ssh_host_keys:
ssh_host_keys = json.loads("""[
{"key_type":"ED25519", "content":"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIN8cna0zeKSKl/r8whdn/KmDWhdzuWRVV0GaKIM+eshh", "sha256":"V4X2apAF6btGAfS45gmpldknoDX0ipJ5c6DLfZR2ttQ"},
{"key_type":"RSA", "content":"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCvotgzgEP65JUQ8S8OoNKy1uEEPEAcFetSp7QpONe6hj4wPgyFNgVtdoWdNcU19dX3hpdse0G8OlaMUTnNVuRlbIZXuifXQ2jTtCFUA2mmJ5bF+XjGm3TXKMNGh9PN+wEPUeWd14vZL+QPUMev5LmA8cawPiU5+vVMLid93HRBj118aCJFQxLgrdP48VPfKHFRfCR6TIjg1ii3dH4acdJAvlmJ3GFB6ICT42EmBqskz2MPe0rIFxH8YohCBbAbrbWYcptHt4e48h4UdpZdYOhEdv89GrT8BF2C5cbQ5i9qVpI57bXKrj8hPZU5of48UHLSpXG8mbH0YDiOQOfKX/Mt", "sha256":"ghee6KzRnBJhND2kEUZSaouk7CD6o6z2aAc8GPkV+GQ"},
{"key_type":"ECDSA", "content":"ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBLLgOoATz9R4aS2kk7vWoxX+lshK63t9+5BIHdzZeFE1o+shlcf0Wji8cN/L1+m3bi0uSETZDOAWMP3rHLJj9Hk=", "sha256":"aCYG1aD8cv/TjzJL0bi9jdabMGksdkfa7R8dCGm1yYs"}
]""")
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4=self.default_ipv4, ssh_host_keys=ssh_host_keys)
return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4=self.default_ipv4)
def list_ids(self) -> list:
return get_model().all_non_deleted_vm_ids()
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id)
current_app.logger.info(f"mock create: {id} for {email}")
sleep(1)
get_model().create_vm(
email=email,
id=id,
size=size,
os=os,
host=current_app.config["SPOKE_HOST_ID"],
network_name=self.default_network,
public_ipv4=self.default_ipv4,
ssh_authorized_keys=list(map(lambda x: x["name"], ssh_authorized_keys)),
)
def destroy(self, email: str, id: str):
current_app.logger.info(f"mock destroy: {id} for {email}")
def vm_state_command(self, email: str, id: str, command: str):
current_app.logger.info(f"mock {command}: {id} for {email}")
2021-01-03 21:19:29 +00:00
class CapsulFlaskHub(VirtualizationInterface):
def synchronous_operation(self, hosts: List[OnlineHost], email: str, payload: str) -> List[HTTPResult]:
return self.generic_operation(hosts, email, payload, True)[1]
2021-01-04 21:02:56 +00:00
def asynchronous_operation(self, hosts: List[OnlineHost], email: str, payload: str) -> Tuple[int, List[HTTPResult]]:
return self.generic_operation(hosts, email, payload, False)
2021-01-04 21:02:56 +00:00
def generic_operation(self, hosts: List[OnlineHost], email: str, payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]:
2021-01-04 21:02:56 +00:00
url_path = "/spoke/operation"
operation_id = None
if not immediate_mode:
if not email or email == "":
raise ValueError("can't create_operation in the db cuz no email was provided")
2021-01-04 21:02:56 +00:00
operation_id = get_model().create_operation(hosts, email, payload)
url_path = f"/spoke/operation/{operation_id}"
authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}"
results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(hosts, url_path, payload, authorization_header=authorization_header)
2021-01-04 21:02:56 +00:00
for i in range(len(hosts)):
host = hosts[i]
result = results[i]
assignment_status = "pending"
if result.status_code == -1:
assignment_status = "no_response_from_host"
if result.status_code != 200:
assignment_status = "error_response_from_host"
else:
valid_statuses = {
"assigned": True,
"not_applicable": True,
"assigned_to_other_host": True,
}
result_is_json = False
result_is_dict = False
result_has_status = False
result_has_valid_status = False
assignment_status = "invalid_response_from_host"
2021-01-03 21:19:29 +00:00
error_message = ""
try:
result_body = json.loads(result.body)
result_is_json = True
result_is_dict = isinstance(result_body, dict)
result_has_status = result_is_dict and 'assignment_status' in result_body
result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses
if result_has_valid_status:
assignment_status = result_body['assignment_status']
2021-01-03 21:19:29 +00:00
if result_is_dict and "error_message" in result_body:
error_message = result_body['error_message']
except:
pass
if not result_has_valid_status:
2021-01-04 21:27:18 +00:00
operation_desc = ""
if operation_id:
operation_desc = f"for operation {operation_id}"
current_app.logger.error(f"""error reading assignment_status {operation_desc} from host {host.id}:
result_is_json: {result_is_json}
result_is_dict: {result_is_dict}
result_has_status: {result_has_status}
result_has_valid_status: {result_has_valid_status}
2021-01-03 21:19:29 +00:00
error_message: {error_message}
"""
)
2021-01-04 21:02:56 +00:00
if not immediate_mode:
get_model().update_host_operation(host.id, operation_id, assignment_status, None)
2021-01-04 21:02:56 +00:00
return (operation_id, results)
2021-01-03 21:19:29 +00:00
def capacity_avaliable(self, additional_ram_bytes):
online_hosts = get_model().get_online_hosts()
payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes))
results = self.synchronous_operation(online_hosts, None, payload)
for result in results:
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'capacity_avaliable' in result_body and result_body['capacity_avaliable'] == True:
return True
except:
pass
return False
def get(self, id, get_ssh_host_keys) -> VirtualMachine:
validate_capsul_id(id)
host = get_model().host_of_capsul(id)
if host is not None:
payload = json.dumps(dict(type="get", id=id, get_ssh_host_keys=get_ssh_host_keys))
results = self.synchronous_operation([host], None, payload)
for result in results:
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and ('state' in result_body):
return VirtualMachine(id, host=host, state=result_body['state'], ipv4=result_body['ipv4'], ipv6=result_body['ipv6'], ssh_host_keys=result_body['ssh_host_keys'])
except:
pass
return None
def list_ids(self) -> list:
online_hosts = get_model().get_online_hosts()
payload = json.dumps(dict(type="list_ids"))
results = self.synchronous_operation(online_hosts, None, payload)
to_return = []
for i in range(len(results)):
host = online_hosts[i]
result = results[i]
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'ids' in result_body and isinstance(result_body['ids'], list):
all_valid = True
for id in result_body['ids']:
try:
validate_capsul_id(id)
to_return.append(id)
except:
all_valid = False
2021-01-04 21:02:56 +00:00
if not all_valid:
current_app.logger.error(f"""error reading ids for list_ids operation, host {host.id}""")
else:
result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"})
current_app.logger.error(f"""missing 'ids' list for list_ids operation, host {host.id}""")
except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass
return to_return
def create(self, email: str, id: str, os: str, size: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list):
validate_capsul_id(id)
online_hosts = get_model().get_online_hosts()
#current_app.logger.debug(f"hub_model.create(): ${len(online_hosts)} hosts")
payload = json.dumps(dict(
type="create",
email=email,
id=id,
os=os,
size=size,
template_image_file_name=template_image_file_name,
vcpus=vcpus,
memory_mb=memory_mb,
ssh_authorized_keys=ssh_authorized_keys,
))
op = self.asynchronous_operation(online_hosts, email, payload)
operation_id = op[0]
results = op[1]
number_of_assigned = 0
2021-01-03 20:44:56 +00:00
error_message = ""
assigned_hosts = []
for i in range(len(results)):
host = online_hosts[i]
result = results[i]
try:
result_body = json.loads(result.body)
if isinstance(result_body, dict) and 'assignment_status' in result_body and result_body['assignment_status'] == "assigned":
number_of_assigned += 1
assigned_hosts.append(host.id)
2021-01-03 20:44:56 +00:00
if isinstance(result_body, dict) and 'error_message' in result_body:
error_message = result_body['error_message']
except:
# no need to do anything here since if it cant be parsed then generic_operation will handle it.
pass
if error_message != "":
raise ValueError(f"create capsul operation {operation_id} on {assigned_hosts} failed with {error_message}")
if number_of_assigned != 1:
assigned_hosts_string = ", ".join(assigned_hosts)
raise ValueError(f"expected create capsul operation {operation_id} to be assigned to one host, it was assigned to {number_of_assigned} ({assigned_hosts_string})")
def destroy(self, email: str, id: str):
validate_capsul_id(id)
result_status = None
host = get_model().host_of_capsul(id)
if host is not None:
2021-01-04 21:27:18 +00:00
payload = json.dumps(dict(type="destroy", email=email, id=id))
results = self.synchronous_operation([host], email, payload)
result_json_string = "<no response from host>"
for result in results:
try:
result_json_string = result.body
result_body = json.loads(result_json_string)
if isinstance(result_body, dict) and 'status' in result_body:
result_status = result_body['status']
except:
pass
if not result_status == "success":
2021-01-04 21:27:18 +00:00
raise ValueError(f"""failed to destroy vm "{id}" on host "{host.id}" for {email}: {result_json_string}""")
def vm_state_command(self, email: str, id: str, command: str):
validate_capsul_id(id)
result_status = None
host = get_model().host_of_capsul(id)
if host is not None:
payload = json.dumps(dict(type="vm_state_command", email=email, id=id, command=command))
results = self.synchronous_operation([host], email, payload)
result_json_string = "<no response from host>"
for result in results:
try:
result_json_string = result.body
result_body = json.loads(result_json_string)
if isinstance(result_body, dict) and 'status' in result_body:
result_status = result_body['status']
except:
pass
if not result_status == "success":
raise ValueError(f"""failed to {command} vm "{id}" on host "{host.id}" for {email}: {result_json_string}""")