import logging from copy import deepcopy from unittest.mock import patch from flask import url_for from capsulflask.db import get_model from capsulflask.tests_base import BaseTestCase from capsulflask.spoke_model import MockSpoke class ConsoleTests(BaseTestCase): capsul_data = { "size": "f1-xs", "os": "debian10", "ssh_authorized_key_count": 1, "ssh_key_0": "key" } ssh_key_data = { "name": "key2", "method": "POST", "content": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDntq1t8Ddsa2q4p+PM7W4CLYYmxakokRRVLlf7AQlsTJFPsgBe9u0zuoOaKDMkBr0dlnuLm4Eub1Mj+BrdqAokto0YDiAnxUKRuYQKuHySKK8bLkisi2k47jGBDikx/jihgiuFTawo1mYsJJepC7PPwZGsoCImJEgq1L+ug0p3Zrj3QkUx4h25MpCSs2yvfgWjDyN8hEC76O42P+4ETezYrzrd1Kj26hdzHRnrxygvIUOtfau+5ydlaz8xQBEPrEY6/+pKDuwtXg1pBL7GmoUxBXVfHQSgq5s9jIJH+G0CR0ZoHMB25Ln4X/bsCQbLOu21+IGYKSDVM5TIMLtkKUkERQMVWvnpOp1LZKir4dC0m7SW74wpA8+2b1IsURIr9ARYGJpCEv1Q1Wz/X3yTf6Mfey7992MjUc9HcgjgU01/+kYomoXHprzolk+22Gjfgo3a4dRIoTY82GO8kkUKiaWHvDkkVURCY5dpteLA05sk3Z9aRMYsNXPLeOOPfzTlDA0=" } def test_index(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.index")) self.assert_200(response) def test_create_loads(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.create")) self.assert_200(response) def test_create_fails_credit(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = self.capsul_data data['csrf-token'] = csrf_token client.post(url_for("console.create"), data=data) capacity_message = \ 'Your account must have enough credit to run an f1-xs for 1 month before you will be allowed to create it' self.assert_message_flashed(capacity_message, category='message') self.assertEqual( len(get_model().list_vms_for_account('test@example.com')), 0 ) def test_create_fails_capacity(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = self.capsul_data data['csrf-token'] = csrf_token get_model().create_payment_session('fake', 'test', 'test@example.com', 20) get_model().consume_payment_session('fake', 'test', 20) with patch.object(MockSpoke, 'capacity_avaliable', return_value=False) as mock_method: response = client.post(url_for("console.create"), data=data) self.assert200(response) mock_method.assert_called() capacity_message = \ '\n host(s) at capacity. no capsuls can be created at this time. sorry. \n ' self.assert_message_flashed(capacity_message, category='message') self.assertEqual( len(get_model().list_vms_for_account('test@example.com')), 0 ) file_object = open('unittest-output.log', 'a') file_object.write(f"{self.id()} captured output:\n{self.logs_from_test.getvalue()}\n") file_object.close() def test_create_fails_invalid(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = deepcopy(self.capsul_data) data['csrf-token'] = csrf_token data['os'] = '' client.post(url_for("console.create"), data=data) self.assert_message_flashed( 'OS is required', category='message' ) self.assertEqual( len(get_model().list_vms_for_account('test@example.com')), 0 ) def test_create_succeeds(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = deepcopy(self.capsul_data) data['csrf-token'] = csrf_token get_model().create_payment_session('fake', 'test', 'test@example.com', 20) get_model().consume_payment_session('fake', 'test', 20) response = client.post(url_for("console.create"), data=data) self.assertEqual( len(get_model().list_all_operations()), 1 ) vms = get_model().list_vms_for_account('test@example.com') self.assertEqual( len(vms), 1 ) vm_id = vms[0]['id'] self.assertRedirects( response, url_for("console.index") + f'?created={vm_id}' ) def test_keys_loads(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.ssh_public_keys")) self.assert_200(response) keys = self.get_context_variable('ssh_public_keys') self.assertEqual(keys[0]['name'], 'key') def test_keys_add_fails_invalid(self): self._login('test@example.com') with self.client as client: client.get(url_for("console.ssh_public_keys")) csrf_token = self.get_context_variable('csrf_token') data = self.ssh_key_data data['csrf-token'] = csrf_token data_invalid_content = data data_invalid_content['content'] = 'foo' client.post( url_for("console.ssh_public_keys"), data=data_invalid_content ) self.assert_message_flashed( 'Content must match "^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$"', category='message' ) data_missing_content = data data_missing_content['content'] = '' client.post(url_for("console.ssh_public_keys"), data=data_missing_content) self.assert_message_flashed( 'Content is required', category='message' ) def test_keys_add_fails_duplicate(self): self._login('test@example.com') with self.client as client: client.get(url_for("console.ssh_public_keys")) csrf_token = self.get_context_variable('csrf_token') data = self.ssh_key_data data['csrf-token'] = csrf_token data['name'] = 'key' client.post(url_for("console.ssh_public_keys"), data=data) self.assert_message_flashed( 'A key with that name already exists', category='message' ) def setUp(self): super().setUp() self._login('test@example.com') get_model().create_ssh_public_key('test@example.com', 'key', 'foo') def tearDown(self): super().tearDown() get_model().cursor.execute("DELETE FROM ssh_public_keys") get_model().cursor.execute("DELETE FROM login_tokens") get_model().cursor.execute("DELETE FROM vms") get_model().cursor.execute("DELETE FROM payments") get_model().cursor.connection.commit()