import re import sys import json import ipaddress from datetime import datetime, timedelta from flask import Blueprint, current_app, render_template, make_response from werkzeug.exceptions import abort from nanoid import generate from capsulflask.metrics import durations as metric_durations from capsulflask.auth import admin_account_required from capsulflask.db import get_model from capsulflask.shared import my_exec_info_message bp = Blueprint("admin", __name__, url_prefix="/admin") @bp.route("/") @admin_account_required def index(): hosts = get_model().list_hosts_with_networks(None) vms_by_host_and_network = get_model().non_deleted_vms_by_host_and_network(None) network_display_width_px = float(270) #operations = get_model().list_all_operations() display_hosts = [] inline_styles = [f""" .network-display {'{'} width: {network_display_width_px}px; {'}'} """] for kv in hosts.items(): host_id = kv[0] value = kv[1] display_host = dict(name=host_id, networks=value['networks']) for network in display_host['networks']: network_start_int = int(ipaddress.ip_address(network["public_ipv4_first_usable_ip"])) network_end_int = int(ipaddress.ip_address(network["public_ipv4_last_usable_ip"])) network['allocations'] = [] network_addresses_width = float((network_end_int-network_start_int)+1) if host_id in vms_by_host_and_network: if network['network_name'] in vms_by_host_and_network[host_id]: for vm in vms_by_host_and_network[host_id][network['network_name']]: ip_address_int = int(ipaddress.ip_address(vm['public_ipv4'])) if network_start_int <= ip_address_int and ip_address_int <= network_end_int: allocation = f"{host_id}_{network['network_name']}_{len(network['allocations'])}" inline_styles.append( f""" .{allocation} {'{'} left: {(float(ip_address_int-network_start_int)/network_addresses_width)*network_display_width_px}px; width: {network_display_width_px/network_addresses_width}px; {'}'} """ ) network['allocations'].append(allocation) else: current_app.logger.warning(f"/admin: capsul {vm['id']} has public_ipv4 {vm['public_ipv4']} which is out of range for its host network {host_id} {network['network_name']} {network['public_ipv4_cidr_block']}") display_hosts.append(display_host) csp_inline_style_nonce = generate(alphabet="1234567890qwertyuiopasdfghjklzxcvbnm", size=10) response_text = render_template( "admin.html", display_hosts=display_hosts, network_display_width_px=network_display_width_px, csp_inline_style_nonce=csp_inline_style_nonce, inline_style='\n'.join(inline_styles) ) response = make_response(response_text) response.headers.set('Content-Type', 'text/html') response.headers.set('Content-Security-Policy', f"default-src 'self'; style-src 'self' 'nonce-{csp_inline_style_nonce}'") return response