forked from 3wordchant/capsul-flask
forest uncommitted changes on multi-host branch
This commit is contained in:
@ -12,7 +12,7 @@ from flask import render_template
|
||||
from flask import url_for
|
||||
from flask import current_app
|
||||
|
||||
from capsulflask import virt_model, cli
|
||||
from capsulflask import operation_model, cli
|
||||
from capsulflask.btcpay import client as btcpay
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
@ -22,7 +22,7 @@ app = Flask(__name__)
|
||||
app.config.from_mapping(
|
||||
BASE_URL=os.environ.get("BASE_URL", default="http://localhost:5000"),
|
||||
SECRET_KEY=os.environ.get("SECRET_KEY", default="dev"),
|
||||
VIRTUALIZATION_MODEL=os.environ.get("VIRTUALIZATION_MODEL", default="mock"),
|
||||
OPERATION_MODEL=os.environ.get("OPERATION_MODEL", default="mock"),
|
||||
LOG_LEVEL=os.environ.get("LOG_LEVEL", default="INFO"),
|
||||
ADMIN_EMAIL_ADDRESSES=os.environ.get("ADMIN_EMAIL_ADDRESSES", default="ops@cyberia.club"),
|
||||
|
||||
@ -74,10 +74,10 @@ stripe.api_version = app.config['STRIPE_API_VERSION']
|
||||
|
||||
app.config['FLASK_MAIL_INSTANCE'] = Mail(app)
|
||||
|
||||
if app.config['VIRTUALIZATION_MODEL'] == "shell_scripts":
|
||||
app.config['VIRTUALIZATION_MODEL'] = virt_model.ShellScriptVirtualization()
|
||||
if app.config['OPERATION_MODEL'] == "shell_scripts":
|
||||
app.config['OPERATION_MODEL'] = operation_model.GoshtOperation()
|
||||
else:
|
||||
app.config['VIRTUALIZATION_MODEL'] = virt_model.MockVirtualization()
|
||||
app.config['OPERATION_MODEL'] = operation_model.MockOperation()
|
||||
|
||||
app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY'])
|
||||
|
||||
|
@ -249,13 +249,13 @@ def notify_users_about_account_balance():
|
||||
if index_to_send == len(warnings)-1:
|
||||
for vm in vms:
|
||||
current_app.logger.warning(f"cron_task: deleting {vm['id']} ( {account['email']} ) due to negative account balance.")
|
||||
current_app.config["VIRTUALIZATION_MODEL"].destroy(email=account["email"], id=vm['id'])
|
||||
current_app.config["OPERATION_MODEL"].destroy(email=account["email"], id=vm['id'])
|
||||
get_model().delete_vm(email=account["email"], id=vm['id'])
|
||||
|
||||
|
||||
def ensure_vms_and_db_are_synced():
|
||||
db_ids = get_model().all_non_deleted_vm_ids()
|
||||
virt_ids = current_app.config["VIRTUALIZATION_MODEL"].list_ids()
|
||||
virt_ids = current_app.config["OPERATION_MODEL"].list_ids()
|
||||
|
||||
db_ids_dict = dict()
|
||||
virt_ids_dict = dict()
|
||||
|
@ -27,7 +27,7 @@ def makeCapsulId():
|
||||
|
||||
def double_check_capsul_address(id, ipv4):
|
||||
try:
|
||||
result = current_app.config["VIRTUALIZATION_MODEL"].get(id)
|
||||
result = current_app.config["OPERATION_MODEL"].get(id)
|
||||
if result.ipv4 != ipv4:
|
||||
ipv4 = result.ipv4
|
||||
get_model().update_vm_ip(email=session["account"], id=id, ipv4=result.ipv4)
|
||||
@ -98,7 +98,7 @@ def detail(id):
|
||||
)
|
||||
else:
|
||||
current_app.logger.info(f"deleting {vm['id']} per user request ({session['account']})")
|
||||
current_app.config["VIRTUALIZATION_MODEL"].destroy(email=session['account'], id=id)
|
||||
current_app.config["OPERATION_MODEL"].destroy(email=session['account'], id=id)
|
||||
get_model().delete_vm(email=session['account'], id=id)
|
||||
|
||||
return render_template("capsul-detail.html", vm=vm, delete=True, deleted=True)
|
||||
@ -125,7 +125,7 @@ def create():
|
||||
operating_systems = get_model().operating_systems_dict()
|
||||
ssh_public_keys = get_model().list_ssh_public_keys_for_account(session["account"])
|
||||
account_balance = get_account_balance(get_vms(), get_payments(), datetime.utcnow())
|
||||
capacity_avaliable = current_app.config["VIRTUALIZATION_MODEL"].capacity_avaliable(512*1024*1024)
|
||||
capacity_avaliable = current_app.config["OPERATION_MODEL"].capacity_avaliable(512*1024*1024)
|
||||
errors = list()
|
||||
|
||||
if request.method == "POST":
|
||||
@ -165,7 +165,7 @@ def create():
|
||||
if len(posted_keys) == 0:
|
||||
errors.append("At least one SSH Public Key is required")
|
||||
|
||||
capacity_avaliable = current_app.config["VIRTUALIZATION_MODEL"].capacity_avaliable(vm_sizes[size]['memory_mb']*1024*1024)
|
||||
capacity_avaliable = current_app.config["OPERATION_MODEL"].capacity_avaliable(vm_sizes[size]['memory_mb']*1024*1024)
|
||||
|
||||
if not capacity_avaliable:
|
||||
errors.append("""
|
||||
@ -181,7 +181,7 @@ def create():
|
||||
os=os,
|
||||
ssh_public_keys=list(map(lambda x: x["name"], posted_keys))
|
||||
)
|
||||
current_app.config["VIRTUALIZATION_MODEL"].create(
|
||||
current_app.config["OPERATION_MODEL"].create(
|
||||
email = session["account"],
|
||||
id=id,
|
||||
template_image_file_name=operating_systems[os]['template_image_file_name'],
|
||||
|
@ -1,7 +1,12 @@
|
||||
|
||||
from nanoid import generate
|
||||
from flask import current_app
|
||||
from typing import List
|
||||
|
||||
class OnlineHost:
|
||||
def __init__(self, id: str, url: str):
|
||||
self.id = id
|
||||
self.url = url
|
||||
|
||||
class DBModel:
|
||||
def __init__(self, connection, cursor):
|
||||
@ -267,14 +272,28 @@ class DBModel:
|
||||
|
||||
# ------ HOSTS ---------
|
||||
|
||||
def authorized_for_host(self, id, token):
|
||||
def authorized_for_host(self, id, token) -> bool:
|
||||
self.cursor.execute("SELECT id FROM hosts WHERE id = %s token = %s", (id, token))
|
||||
return self.cursor.fetchone() != None
|
||||
|
||||
def host_heartbeat(self, id):
|
||||
def host_heartbeat(self, id) -> None:
|
||||
self.cursor.execute("UPDATE hosts SET last_health_check = NOW() WHERE id = %s", (id,))
|
||||
self.connection.commit()
|
||||
|
||||
def get_online_hosts(self) -> List[OnlineHost]:
|
||||
self.cursor.execute("SELECT id, https_url FROM hosts WHERE last_health_check > NOW() - INTERVAL '10 seconds'")
|
||||
return list(map(lambda x: OnlineHost(id=x[0], url=x[1]), self.cursor.fetchall()))
|
||||
|
||||
def create_operation(self, online_hosts: List[OnlineHost], email: str, payload: str) -> None:
|
||||
|
||||
self.cursor.execute( "INSERT INTO operations (email, payload) VALUES (%s, %s) RETURNING id", (email, payload) )
|
||||
operation_id = self.cursor.fetchone()[0]
|
||||
|
||||
for host in online_hosts:
|
||||
self.cursor.execute( "INSERT INTO host_operation (host, operation) VALUES (%s, %s)", (host.id, operation_id) )
|
||||
|
||||
self.connection.commit()
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -1,11 +1,17 @@
|
||||
import subprocess
|
||||
import re
|
||||
import requests
|
||||
import json
|
||||
import asyncio
|
||||
from typing import List
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from flask import current_app
|
||||
from time import sleep
|
||||
from os.path import join
|
||||
from subprocess import run
|
||||
|
||||
from capsulflask.db_model import OnlineHost
|
||||
from capsulflask.db import get_model
|
||||
|
||||
def validate_capsul_id(id):
|
||||
@ -18,7 +24,7 @@ class VirtualMachine:
|
||||
self.ipv4 = ipv4
|
||||
self.ipv6 = ipv6
|
||||
|
||||
class VirtualizationInterface:
|
||||
class OperationInterface:
|
||||
def capacity_avaliable(self, additional_ram_bytes: int) -> bool:
|
||||
pass
|
||||
|
||||
@ -34,7 +40,7 @@ class VirtualizationInterface:
|
||||
def destroy(self, email: str, id: str):
|
||||
pass
|
||||
|
||||
class MockVirtualization(VirtualizationInterface):
|
||||
class MockVirtualization(OperationInterface):
|
||||
def capacity_avaliable(self, additional_ram_bytes):
|
||||
return True
|
||||
|
||||
@ -54,7 +60,23 @@ class MockVirtualization(VirtualizationInterface):
|
||||
current_app.logger.info(f"mock destroy: {id} for {email}")
|
||||
|
||||
|
||||
class ShellScriptVirtualization(VirtualizationInterface):
|
||||
class GoshtOperation(OperationInterface):
|
||||
|
||||
|
||||
async def post_json(self, method: str, url: str, body: str, session: ClientSession, **kwargs) -> str:
|
||||
resp = await session.request(method=method, url=url, json=body, **kwargs)
|
||||
resp.raise_for_status()
|
||||
return await resp.text()
|
||||
|
||||
async def make_requests(self, online_hosts: List[OnlineHost], body: str, **kwargs) -> None:
|
||||
async with ClientSession() as session:
|
||||
tasks = []
|
||||
for host in online_hosts:
|
||||
tasks.append(
|
||||
self.post_json(method="POST", url=host.url, body=body, session=session, **kwargs)
|
||||
)
|
||||
results = await asyncio.gather(*tasks)
|
||||
# do something with results
|
||||
|
||||
def validate_completed_process(self, completedProcess, email=None):
|
||||
emailPart = ""
|
||||
@ -69,27 +91,17 @@ class ShellScriptVirtualization(VirtualizationInterface):
|
||||
{completedProcess.stderr}
|
||||
""")
|
||||
|
||||
|
||||
|
||||
def capacity_avaliable(self, additional_ram_bytes):
|
||||
my_args=[join(current_app.root_path, 'shell_scripts/capacity-avaliable.sh'), str(additional_ram_bytes)]
|
||||
completedProcess = run(my_args, capture_output=True)
|
||||
|
||||
if completedProcess.returncode != 0:
|
||||
current_app.logger.error(f"""
|
||||
capacity-avaliable.sh exited {completedProcess.returncode} with
|
||||
stdout:
|
||||
{completedProcess.stdout}
|
||||
stderr:
|
||||
{completedProcess.stderr}
|
||||
""")
|
||||
return False
|
||||
online_hosts = get_model().get_online_hosts()
|
||||
payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=""))
|
||||
get_model().create_operation(online_hosts, None, payload)
|
||||
|
||||
lines = completedProcess.stdout.splitlines()
|
||||
output = lines[len(lines)-1]
|
||||
if not output == b"yes":
|
||||
current_app.logger.error(f"capacity-avaliable.sh exited 0 and returned {output} but did not return \"yes\" ")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def get(self, id):
|
||||
validate_capsul_id(id)
|
||||
@ -119,10 +131,10 @@ class ShellScriptVirtualization(VirtualizationInterface):
|
||||
raise ValueError(f"ssh_public_key \"{ssh_public_key}\" must match \"^(ssh|ecdsa)-[0-9A-Za-z+/_=@. -]+$\"")
|
||||
|
||||
if vcpus < 1 or vcpus > 8:
|
||||
raise ValueError(f"vcpus \"{vcpus}\" must match 1 <= vcpus <= 8")
|
||||
raise ValueError(f"vcpus ({vcpus}) must match 1 <= vcpus <= 8")
|
||||
|
||||
if memory_mb < 512 or memory_mb > 16384:
|
||||
raise ValueError(f"memory_mb \"{memory_mb}\" must match 512 <= memory_mb <= 16384")
|
||||
raise ValueError(f"memory_mb ({memory_mb}) must match 512 <= memory_mb <= 16384")
|
||||
|
||||
ssh_keys_string = "\n".join(ssh_public_keys)
|
||||
|
@ -3,6 +3,7 @@
|
||||
CREATE TABLE hosts (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
last_health_check TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
https_url TEXT NOT NULL,
|
||||
token TEXT NOT NULL
|
||||
);
|
||||
|
||||
@ -12,22 +13,21 @@ ALTER TABLE vms
|
||||
ADD COLUMN host TEXT REFERENCES hosts(id) ON DELETE RESTRICT DEFAULT 'baikal';
|
||||
|
||||
CREATE TABLE operations (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
id SERIAL PRIMARY KEY ,
|
||||
email TEXT REFERENCES accounts(email) ON DELETE RESTRICT,
|
||||
assigned_host TEXT NULL,
|
||||
host_status TEXT NULL,
|
||||
created TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
assigned TIMESTAMP NULL,
|
||||
completed TIMESTAMP NULL,
|
||||
payload TEXT NOT NULL,
|
||||
);
|
||||
|
||||
CREATE TABLE host_operation (
|
||||
host TEXT NOT NULL,
|
||||
operation TEXT NOT NULL,
|
||||
host TEXT NOT NULL REFERENCES hosts(id) ON DELETE RESTRICT,
|
||||
operation INTEGER NOT NULL REFERENCES operations(id) ON DELETE RESTRICT,
|
||||
assignment_status TEXT NULL,
|
||||
assignment_status_timestamp TIMESTAMP,
|
||||
assigned TIMESTAMP NULL,
|
||||
completed TIMESTAMP NULL,
|
||||
results TEXT NULL,
|
||||
PRIMARY KEY (host, operation)
|
||||
);
|
||||
|
||||
|
||||
UPDATE schemaversion SET version = 9;
|
Reference in New Issue
Block a user