from flask import url_for from capsulflask.db import get_model from capsulflask.tests_base import BaseTestCase class ConsoleTests(BaseTestCase): capsul_data = { "size": "f1-xs", "os": "debian10", "ssh_authorized_key_count": 1, "ssh_key_0": "key" } ssh_key_data = { "name": "key2", "method": "POST", "content": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDntq1t8Ddsa2q4p+PM7W4CLYYmxakokRRVLlf7AQlsTJFPsgBe9u0zuoOaKDMkBr0dlnuLm4Eub1Mj+BrdqAokto0YDiAnxUKRuYQKuHySKK8bLkisi2k47jGBDikx/jihgiuFTawo1mYsJJepC7PPwZGsoCImJEgq1L+ug0p3Zrj3QkUx4h25MpCSs2yvfgWjDyN8hEC76O42P+4ETezYrzrd1Kj26hdzHRnrxygvIUOtfau+5ydlaz8xQBEPrEY6/+pKDuwtXg1pBL7GmoUxBXVfHQSgq5s9jIJH+G0CR0ZoHMB25Ln4X/bsCQbLOu21+IGYKSDVM5TIMLtkKUkERQMVWvnpOp1LZKir4dC0m7SW74wpA8+2b1IsURIr9ARYGJpCEv1Q1Wz/X3yTf6Mfey7992MjUc9HcgjgU01/+kYomoXHprzolk+22Gjfgo3a4dRIoTY82GO8kkUKiaWHvDkkVURCY5dpteLA05sk3Z9aRMYsNXPLeOOPfzTlDA0=" } def test_index(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.index")) self.assert_200(response) def test_create_loads(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.create")) self.assert_200(response) def test_create_fails_capacity(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = self.capsul_data data['csrf-token'] = csrf_token client.post(url_for("console.create"), data=data) capacity_message = \ '\n host(s) at capacity. no capsuls can be created at this time. sorry. \n ' self.assert_message_flashed(capacity_message, category='message') self.assertEqual( len(get_model().list_vms_for_account('test@example.com')), 0 ) def test_create_fails_invalid(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = self.capsul_data data['csrf-token'] = csrf_token data['os'] = '' client.post(url_for("console.create"), data=data) self.assert_message_flashed( 'OS is required', category='message' ) self.assertEqual( len(get_model().list_vms_for_account('test@example.com')), 0 ) def test_create_succeeds(self): with self.client as client: client.get(url_for("console.create")) csrf_token = self.get_context_variable('csrf_token') data = self.capsul_data data['csrf-token'] = csrf_token response = client.post(url_for("console.create"), data=data) vms = get_model().list_vms_for_account('test@example.com') self.assertEqual( len(vms), 1 # FIXME: mock create doesn't create, see #83 ) return vm_id = vms[0].id self.assertRedirects( response, url_for("console.index") + f'?{vm_id}' ) def test_keys_loads(self): self._login('test@example.com') with self.client as client: response = client.get(url_for("console.ssh_public_keys")) self.assert_200(response) keys = self.get_context_variable('ssh_public_keys') self.assertEqual(keys[0]['name'], 'key') def test_keys_add_fails_invalid(self): self._login('test@example.com') with self.client as client: client.get(url_for("console.ssh_public_keys")) csrf_token = self.get_context_variable('csrf_token') data = self.ssh_key_data data['csrf-token'] = csrf_token data_invalid_content = data data_invalid_content['content'] = 'foo' client.post( url_for("console.ssh_public_keys"), data=data_invalid_content ) self.assert_message_flashed( 'Content must match "^(ssh|ecdsa)-[0-9A-Za-z+/_=@:. -]+$"', category='message' ) data_missing_content = data data_missing_content['content'] = '' client.post(url_for("console.ssh_public_keys"), data=data_missing_content) self.assert_message_flashed( 'Content is required', category='message' ) def test_keys_add_fails_duplicate(self): self._login('test@example.com') with self.client as client: client.get(url_for("console.ssh_public_keys")) csrf_token = self.get_context_variable('csrf_token') data = self.ssh_key_data data['csrf-token'] = csrf_token data['name'] = 'key' client.post(url_for("console.ssh_public_keys"), data=data) self.assert_message_flashed( 'A key with that name already exists', category='message' ) def setUp(self): self._login('test@example.com') get_model().create_ssh_public_key('test@example.com', 'key', 'foo') def tearDown(self): get_model().delete_ssh_public_key('test@example.com', 'key')