forked from 3wordchant/capsul-flask
		
	Basic testing using flask-testing
This commit makes it possible to override settings during tests, by switching capsulflask/__init__.py to a "create_app" pattern, and using `dotenv_values` instead of `load_dotenv`. The create_app() method returns a Flask app instance, to give more control over when to initialise the app. This allows setting environment variables in test files. Then, use dotenv_values to override loaded .env variables with ones from the environment, so that tests can set `POSTGRES_CONNECTION_PARAMETERS` and `SPOKE_MODEL` (possibly others in future..). Inital tests for the "landing" pages, and login / activation, are included.
This commit is contained in:
		
							
								
								
									
										1
									
								
								Pipfile
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								Pipfile
									
									
									
									
									
								
							| @ -9,6 +9,7 @@ blinker = "==1.4" | ||||
| click = "==7.1.2" | ||||
| Flask = "==1.1.2" | ||||
| Flask-Mail = "==0.9.1" | ||||
| Flask-Testing = "==0.8.1" | ||||
| gunicorn = "==20.0.4" | ||||
| isort = "==4.3.21" | ||||
| itsdangerous = "==1.1.0" | ||||
|  | ||||
							
								
								
									
										4
									
								
								app.py
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								app.py
									
									
									
									
									
								
							| @ -1,2 +1,4 @@ | ||||
|  | ||||
| from capsulflask import app | ||||
| from capsulflask import create_app | ||||
|  | ||||
| create_app() | ||||
|  | ||||
| @ -9,7 +9,7 @@ import requests | ||||
| import sys | ||||
|  | ||||
| import stripe | ||||
| from dotenv import load_dotenv, find_dotenv | ||||
| from dotenv import find_dotenv, dotenv_values | ||||
| from flask import Flask | ||||
| from flask_mail import Mail, Message | ||||
| from flask import render_template | ||||
| @ -23,124 +23,128 @@ from capsulflask import hub_model, spoke_model, cli | ||||
| from capsulflask.btcpay import client as btcpay | ||||
| from capsulflask.http_client import MyHTTPClient | ||||
|  | ||||
|  | ||||
| class StdoutMockFlaskMail: | ||||
|     def send(self, message: Message): | ||||
|       current_app.logger.info(f"Email would have been sent if configured:\n\nto: {','.join(message.recipients)}\nsubject: {message.subject}\nbody:\n\n{message.body}\n\n") | ||||
|  | ||||
| def create_app(): | ||||
|  | ||||
| load_dotenv(find_dotenv()) | ||||
|   for var_name in [ | ||||
|     "SPOKE_HOST_TOKEN", "HUB_TOKEN", "STRIPE_SECRET_KEY", | ||||
|     "BTCPAY_PRIVATE_KEY", "MAIL_PASSWORD" | ||||
|   ]: | ||||
|     var = os.environ.get(f"{var_name}_FILE") | ||||
|     if not var: | ||||
|       continue | ||||
|  | ||||
| for var_name in [ | ||||
|   "SPOKE_HOST_TOKEN", "HUB_TOKEN", "STRIPE_SECRET_KEY", | ||||
|   "BTCPAY_PRIVATE_KEY", "MAIL_PASSWORD" | ||||
| ]: | ||||
|   var = os.environ.get(f"{var_name}_FILE") | ||||
|   if not var: | ||||
|     continue | ||||
|     if not os.path.isfile(var): | ||||
|       continue | ||||
|  | ||||
|   if not os.path.isfile(var): | ||||
|     continue | ||||
|     with open(var) as secret_file: | ||||
|       os.environ[var_name] = secret_file.read().rstrip('\n') | ||||
|     del os.environ[f"{var_name}_FILE"] | ||||
|  | ||||
|   with open(var) as secret_file: | ||||
|     os.environ[var_name] = secret_file.read().rstrip('\n') | ||||
|   del os.environ[f"{var_name}_FILE"] | ||||
|  | ||||
| app = Flask(__name__) | ||||
|   config = { | ||||
|       **dotenv_values(find_dotenv()), | ||||
|       **os.environ,  # override loaded values with environment variables | ||||
|   } | ||||
|  | ||||
| app.config.from_mapping( | ||||
|   BASE_URL=os.environ.get("BASE_URL", default="http://localhost:5000"), | ||||
|   SECRET_KEY=os.environ.get("SECRET_KEY", default="dev"), | ||||
|   HUB_MODE_ENABLED=os.environ.get("HUB_MODE_ENABLED", default="True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|   SPOKE_MODE_ENABLED=os.environ.get("SPOKE_MODE_ENABLED", default="True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|   INTERNAL_HTTP_TIMEOUT_SECONDS=os.environ.get("INTERNAL_HTTP_TIMEOUT_SECONDS", default="300"), | ||||
|   HUB_MODEL=os.environ.get("HUB_MODEL", default="capsul-flask"), | ||||
|   SPOKE_MODEL=os.environ.get("SPOKE_MODEL", default="mock"), | ||||
|   LOG_LEVEL=os.environ.get("LOG_LEVEL", default="INFO"), | ||||
|   SPOKE_HOST_ID=os.environ.get("SPOKE_HOST_ID", default="baikal"), | ||||
|   SPOKE_HOST_TOKEN=os.environ.get("SPOKE_HOST_TOKEN", default="changeme"), | ||||
|   SSH_USERNAME=os.environ.get("SSH_USERNAME", default="cyberian"), | ||||
|   HUB_TOKEN=os.environ.get("HUB_TOKEN", default="changeme"), | ||||
|   app = Flask(__name__) | ||||
|  | ||||
|   # https://www.postgresql.org/docs/9.1/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS | ||||
|   # https://stackoverflow.com/questions/56332906/where-to-put-ssl-certificates-when-trying-to-connect-to-a-remote-database-using | ||||
|   # TLS example: sslmode=verify-full sslrootcert=letsencrypt-root-ca.crt host=db.example.com port=5432 user=postgres password=dev dbname=postgres | ||||
|   POSTGRES_CONNECTION_PARAMETERS=os.environ.get( | ||||
|     "POSTGRES_CONNECTION_PARAMETERS",  | ||||
|     default="host=localhost port=5432 user=postgres password=dev dbname=postgres" | ||||
|   ), | ||||
|   app.config.from_mapping( | ||||
|     TESTING=config.get("TESTING", False), | ||||
|     BASE_URL=config.get("BASE_URL", "http://localhost:5000"), | ||||
|     SECRET_KEY=config.get("SECRET_KEY", "dev"), | ||||
|     HUB_MODE_ENABLED=config.get("HUB_MODE_ENABLED", "True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|     SPOKE_MODE_ENABLED=config.get("SPOKE_MODE_ENABLED", "True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|     INTERNAL_HTTP_TIMEOUT_SECONDS=config.get("INTERNAL_HTTP_TIMEOUT_SECONDS", "300"), | ||||
|     HUB_MODEL=config.get("HUB_MODEL", "capsul-flask"), | ||||
|     SPOKE_MODEL=config.get("SPOKE_MODEL", "mock"), | ||||
|     LOG_LEVEL=config.get("LOG_LEVEL", "INFO"), | ||||
|     SPOKE_HOST_ID=config.get("SPOKE_HOST_ID", "baikal"), | ||||
|     SPOKE_HOST_TOKEN=config.get("SPOKE_HOST_TOKEN", "changeme"), | ||||
|     HUB_TOKEN=config.get("HUB_TOKEN", "changeme"), | ||||
|  | ||||
|   DATABASE_SCHEMA=os.environ.get("DATABASE_SCHEMA", default="public"), | ||||
|     # https://www.postgresql.org/docs/9.1/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS | ||||
|     # https://stackoverflow.com/questions/56332906/where-to-put-ssl-certificates-when-trying-to-connect-to-a-remote-database-using | ||||
|     # TLS example: sslmode=verify-full sslrootcert=letsencrypt-root-ca.crt host=db.example.com port=5432 user=postgres password=dev dbname=postgres | ||||
|     POSTGRES_CONNECTION_PARAMETERS=config.get( | ||||
|       "POSTGRES_CONNECTION_PARAMETERS",  | ||||
|       "host=localhost port=5432 user=postgres password=dev dbname=postgres" | ||||
|     ), | ||||
|  | ||||
|   MAIL_SERVER=os.environ.get("MAIL_SERVER", default=""), | ||||
|   MAIL_PORT=os.environ.get("MAIL_PORT", default="465"), | ||||
|   MAIL_USE_TLS=os.environ.get("MAIL_USE_TLS", default="False").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|   MAIL_USE_SSL=os.environ.get("MAIL_USE_SSL", default="True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|   MAIL_USERNAME=os.environ.get("MAIL_USERNAME", default=""), | ||||
|   MAIL_PASSWORD=os.environ.get("MAIL_PASSWORD", default=""), | ||||
|   MAIL_DEFAULT_SENDER=os.environ.get("MAIL_DEFAULT_SENDER", default="no-reply@capsul.org"), | ||||
|   ADMIN_EMAIL_ADDRESSES=os.environ.get("ADMIN_EMAIL_ADDRESSES", default="ops@cyberia.club"), | ||||
|   ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES=os.environ.get("ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES", default="forest.n.johnson@gmail.com,capsul@cyberia.club"), | ||||
|     DATABASE_SCHEMA=config.get("DATABASE_SCHEMA", "public"), | ||||
|  | ||||
|   PROMETHEUS_URL=os.environ.get("PROMETHEUS_URL", default="https://prometheus.cyberia.club"), | ||||
|     MAIL_SERVER=config.get("MAIL_SERVER", ""), | ||||
|     MAIL_PORT=config.get("MAIL_PORT", "465"), | ||||
|     MAIL_USE_TLS=config.get("MAIL_USE_TLS", "False").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|     MAIL_USE_SSL=config.get("MAIL_USE_SSL", "True").lower() in ['true', '1', 't', 'y', 'yes'], | ||||
|     MAIL_USERNAME=config.get("MAIL_USERNAME", ""), | ||||
|     MAIL_PASSWORD=config.get("MAIL_PASSWORD", ""), | ||||
|     MAIL_DEFAULT_SENDER=config.get("MAIL_DEFAULT_SENDER", "no-reply@capsul.org"), | ||||
|     ADMIN_EMAIL_ADDRESSES=config.get("ADMIN_EMAIL_ADDRESSES", "ops@cyberia.club"), | ||||
|     ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES=config.get("ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES", "forest.n.johnson@gmail.com,capsul@cyberia.club"), | ||||
|  | ||||
|   STRIPE_API_VERSION=os.environ.get("STRIPE_API_VERSION", default="2020-03-02"), | ||||
|   STRIPE_SECRET_KEY=os.environ.get("STRIPE_SECRET_KEY", default=""), | ||||
|   STRIPE_PUBLISHABLE_KEY=os.environ.get("STRIPE_PUBLISHABLE_KEY", default=""), | ||||
|   #STRIPE_WEBHOOK_SECRET=os.environ.get("STRIPE_WEBHOOK_SECRET", default="") | ||||
|     PROMETHEUS_URL=config.get("PROMETHEUS_URL", "https://prometheus.cyberia.club"), | ||||
|  | ||||
|   BTCPAY_PRIVATE_KEY=os.environ.get("BTCPAY_PRIVATE_KEY", default="").replace("\\n", "\n"), | ||||
|   BTCPAY_URL=os.environ.get("BTCPAY_URL", default=""), | ||||
|     STRIPE_API_VERSION=config.get("STRIPE_API_VERSION", "2020-03-02"), | ||||
|     STRIPE_SECRET_KEY=config.get("STRIPE_SECRET_KEY", ""), | ||||
|     STRIPE_PUBLISHABLE_KEY=config.get("STRIPE_PUBLISHABLE_KEY", ""), | ||||
|     #STRIPE_WEBHOOK_SECRET=config.get("STRIPE_WEBHOOK_SECRET", "") | ||||
|  | ||||
|   THEME=os.environ.get("THEME", default="") | ||||
| ) | ||||
|     BTCPAY_PRIVATE_KEY=config.get("BTCPAY_PRIVATE_KEY", "").replace("\\n", "\n"), | ||||
|     BTCPAY_URL=config.get("BTCPAY_URL", "https://btcpay.cyberia.club") | ||||
|   ) | ||||
|  | ||||
| app.config['HUB_URL'] = os.environ.get("HUB_URL", default=app.config['BASE_URL']) | ||||
|   app.config['HUB_URL'] = config.get("HUB_URL", app.config['BASE_URL']) | ||||
|  | ||||
| class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter): | ||||
|   def isHeartbeatRelatedString(self, thing): | ||||
|     # thing_string = "<error>" | ||||
|     is_in_string = False | ||||
|     try: | ||||
|       thing_string = "%s" % thing | ||||
|       is_in_string = 'heartbeat-task' in thing_string or 'hub/heartbeat' in thing_string or 'spoke/heartbeat' in thing_string | ||||
|     except: | ||||
|       pass | ||||
|     # self.warning("isHeartbeatRelatedString(%s): %s", thing_string, is_in_string ) | ||||
|     return is_in_string | ||||
|   class SetLogLevelToDebugForHeartbeatRelatedMessagesFilter(logging.Filter): | ||||
|     def isHeartbeatRelatedString(self, thing): | ||||
|       # thing_string = "<error>" | ||||
|       is_in_string = False | ||||
|       try: | ||||
|         thing_string = "%s" % thing | ||||
|         is_in_string = 'heartbeat-task' in thing_string or 'hub/heartbeat' in thing_string or 'spoke/heartbeat' in thing_string | ||||
|       except: | ||||
|         pass | ||||
|       # self.warning("isHeartbeatRelatedString(%s): %s", thing_string, is_in_string ) | ||||
|       return is_in_string | ||||
|  | ||||
|     def filter(self, record): | ||||
|       if app.config['LOG_LEVEL'] == "DEBUG": | ||||
|         return True | ||||
|  | ||||
|       if self.isHeartbeatRelatedString(record.msg): | ||||
|         return False | ||||
|       for arg in record.args: | ||||
|         if self.isHeartbeatRelatedString(arg): | ||||
|           return False | ||||
|  | ||||
|   def filter(self, record): | ||||
|     if app.config['LOG_LEVEL'] == "DEBUG": | ||||
|       return True | ||||
|  | ||||
|     if self.isHeartbeatRelatedString(record.msg): | ||||
|       return False | ||||
|     for arg in record.args: | ||||
|       if self.isHeartbeatRelatedString(arg): | ||||
|         return False | ||||
|  | ||||
|     return True | ||||
|  | ||||
| logging_dict_config({ | ||||
|   'version': 1, | ||||
|   'formatters': {'default': { | ||||
|     'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', | ||||
|   }}, | ||||
|   'filters': { | ||||
|     'setLogLevelToDebugForHeartbeatRelatedMessages': { | ||||
|         '()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter, | ||||
|   logging_dict_config({ | ||||
|     'version': 1, | ||||
|     'formatters': {'default': { | ||||
|       'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', | ||||
|     }}, | ||||
|     'filters': { | ||||
|       'setLogLevelToDebugForHeartbeatRelatedMessages': { | ||||
|           '()': SetLogLevelToDebugForHeartbeatRelatedMessagesFilter, | ||||
|       } | ||||
|     }, | ||||
|     'handlers': {'wsgi': { | ||||
|       'class': 'logging.StreamHandler', | ||||
|       'stream': 'ext://flask.logging.wsgi_errors_stream', | ||||
|       'formatter': 'default', | ||||
|       'filters': ['setLogLevelToDebugForHeartbeatRelatedMessages'] | ||||
|     }}, | ||||
|     'root': { | ||||
|       'level': app.config['LOG_LEVEL'], | ||||
|       'handlers': ['wsgi'] | ||||
|     } | ||||
|   }, | ||||
|   'handlers': {'wsgi': { | ||||
|     'class': 'logging.StreamHandler', | ||||
|     'stream': 'ext://flask.logging.wsgi_errors_stream', | ||||
|     'formatter': 'default', | ||||
|     'filters': ['setLogLevelToDebugForHeartbeatRelatedMessages'] | ||||
|   }}, | ||||
|   'root': { | ||||
|     'level': app.config['LOG_LEVEL'], | ||||
|     'handlers': ['wsgi'] | ||||
|   } | ||||
| }) | ||||
|   }) | ||||
|  | ||||
| # app.logger.critical("critical") | ||||
| # app.logger.error("error") | ||||
| @ -148,16 +152,16 @@ logging_dict_config({ | ||||
| # app.logger.info("info") | ||||
| # app.logger.debug("debug") | ||||
|  | ||||
| stripe.api_key = app.config['STRIPE_SECRET_KEY'] | ||||
| stripe.api_version = app.config['STRIPE_API_VERSION'] | ||||
|   stripe.api_key = app.config['STRIPE_SECRET_KEY'] | ||||
|   stripe.api_version = app.config['STRIPE_API_VERSION'] | ||||
|  | ||||
| if app.config['MAIL_SERVER'] != "": | ||||
|   app.config['FLASK_MAIL_INSTANCE'] = Mail(app) | ||||
| else: | ||||
|   app.logger.warning("No MAIL_SERVER configured. capsul will simply print emails to stdout.") | ||||
|   app.config['FLASK_MAIL_INSTANCE'] = StdoutMockFlaskMail() | ||||
|   if app.config['MAIL_SERVER'] != "": | ||||
|     app.config['FLASK_MAIL_INSTANCE'] = Mail(app) | ||||
|   else: | ||||
|     app.logger.warning("No MAIL_SERVER configured. capsul will simply print emails to stdout.") | ||||
|     app.config['FLASK_MAIL_INSTANCE'] = StdoutMockFlaskMail() | ||||
|  | ||||
| app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=int(app.config['INTERNAL_HTTP_TIMEOUT_SECONDS'])) | ||||
|   app.config['HTTP_CLIENT'] = MyHTTPClient(timeout_seconds=int(app.config['INTERNAL_HTTP_TIMEOUT_SECONDS'])) | ||||
|  | ||||
| app.config['BTCPAY_ENABLED'] = False | ||||
| if app.config['BTCPAY_URL'] != "": | ||||
| @ -169,107 +173,101 @@ if app.config['BTCPAY_URL'] != "": | ||||
|  | ||||
| # only start the scheduler and attempt to migrate the database if we are running the app. | ||||
| # otherwise we are running a CLI command. | ||||
| command_line = ' '.join(sys.argv) | ||||
| is_running_server = ('flask run' in command_line) or ('gunicorn' in command_line) | ||||
|   command_line = ' '.join(sys.argv) | ||||
|   is_running_server = ( | ||||
|     ('flask run' in command_line) or | ||||
|     ('gunicorn' in command_line) or | ||||
|     ('test' in command_line) | ||||
|   ) | ||||
|  | ||||
| app.logger.info(f"is_running_server: {is_running_server}") | ||||
|   app.logger.info(f"is_running_server: {is_running_server}") | ||||
|  | ||||
| if app.config['THEME'] != "": | ||||
|   my_loader = jinja2.ChoiceLoader([ | ||||
|     jinja2.FileSystemLoader( | ||||
|       [os.path.join('capsulflask', 'theme', app.config['THEME']), | ||||
|        'capsulflask/templates']), | ||||
|   ]) | ||||
|   app.jinja_loader = my_loader | ||||
|   if app.config['HUB_MODE_ENABLED']: | ||||
|     if app.config['HUB_MODEL'] == "capsul-flask": | ||||
|       app.config['HUB_MODEL'] = hub_model.CapsulFlaskHub() | ||||
|  | ||||
| if app.config['HUB_MODE_ENABLED']: | ||||
|       # debug mode (flask reloader) runs two copies of the app. When running in debug mode, | ||||
|       # we only want to start the scheduler one time. | ||||
|       if is_running_server and (not app.debug or config.get('WERKZEUG_RUN_MAIN') == 'true'): | ||||
|         scheduler = BackgroundScheduler() | ||||
|         heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task" | ||||
|         heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"} | ||||
|         heartbeat_task = lambda: requests.post(heartbeat_task_url, headers=heartbeat_task_headers) | ||||
|         scheduler.add_job(name="heartbeat-task", func=heartbeat_task, trigger="interval", seconds=5) | ||||
|         scheduler.start() | ||||
|  | ||||
|   if app.config['HUB_MODEL'] == "capsul-flask": | ||||
|     app.config['HUB_MODEL'] = hub_model.CapsulFlaskHub() | ||||
|         atexit.register(lambda: scheduler.shutdown()) | ||||
|  | ||||
|     # debug mode (flask reloader) runs two copies of the app. When running in debug mode, | ||||
|     # we only want to start the scheduler one time. | ||||
|     if is_running_server and (not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true'): | ||||
|       scheduler = BackgroundScheduler() | ||||
|       heartbeat_task_url = f"{app.config['HUB_URL']}/hub/heartbeat-task" | ||||
|       heartbeat_task_headers = {'Authorization': f"Bearer {app.config['HUB_TOKEN']}"} | ||||
|       heartbeat_task = lambda: requests.post(heartbeat_task_url, headers=heartbeat_task_headers) | ||||
|       scheduler.add_job(name="heartbeat-task", func=heartbeat_task, trigger="interval", seconds=5) | ||||
|       scheduler.start() | ||||
|     else: | ||||
|       app.config['HUB_MODEL'] = hub_model.MockHub() | ||||
|  | ||||
|       atexit.register(lambda: scheduler.shutdown()) | ||||
|     from capsulflask import db | ||||
|     db.init_app(app, is_running_server) | ||||
|  | ||||
|   else: | ||||
|     app.config['HUB_MODEL'] = hub_model.MockHub() | ||||
|     from capsulflask import auth, landing, console, payment, metrics, cli, hub_api, admin | ||||
|  | ||||
|   from capsulflask import db | ||||
|   db.init_app(app, is_running_server) | ||||
|     app.register_blueprint(landing.bp) | ||||
|     app.register_blueprint(auth.bp) | ||||
|     app.register_blueprint(console.bp) | ||||
|     app.register_blueprint(payment.bp) | ||||
|     app.register_blueprint(metrics.bp) | ||||
|     app.register_blueprint(cli.bp) | ||||
|     app.register_blueprint(hub_api.bp) | ||||
|     app.register_blueprint(admin.bp) | ||||
|  | ||||
|   from capsulflask import auth, landing, console, payment, metrics, cli, hub_api, admin | ||||
|     app.add_url_rule("/", endpoint="index") | ||||
|  | ||||
|   app.register_blueprint(landing.bp) | ||||
|   app.register_blueprint(auth.bp) | ||||
|   app.register_blueprint(console.bp) | ||||
|   app.register_blueprint(payment.bp) | ||||
|   app.register_blueprint(metrics.bp) | ||||
|   app.register_blueprint(cli.bp) | ||||
|   app.register_blueprint(hub_api.bp) | ||||
|   app.register_blueprint(admin.bp) | ||||
|   if app.config['SPOKE_MODE_ENABLED']: | ||||
|     if app.config['SPOKE_MODEL'] == "shell-scripts": | ||||
|       app.config['SPOKE_MODEL'] = spoke_model.ShellScriptSpoke() | ||||
|     else: | ||||
|       app.config['SPOKE_MODEL'] = spoke_model.MockSpoke() | ||||
|  | ||||
|   app.add_url_rule("/", endpoint="index") | ||||
|     from capsulflask import spoke_api | ||||
|  | ||||
|     app.register_blueprint(spoke_api.bp) | ||||
|  | ||||
|   @app.after_request | ||||
|   def security_headers(response): | ||||
|     response.headers['X-Frame-Options'] = 'SAMEORIGIN' | ||||
|     if 'Content-Security-Policy' not in response.headers: | ||||
|       response.headers['Content-Security-Policy'] = "default-src 'self'" | ||||
|     response.headers['X-Content-Type-Options'] = 'nosniff' | ||||
|     return response | ||||
|  | ||||
|  | ||||
|  | ||||
| if app.config['SPOKE_MODE_ENABLED']: | ||||
|  | ||||
|   if app.config['SPOKE_MODEL'] == "shell-scripts": | ||||
|     app.config['SPOKE_MODEL'] = spoke_model.ShellScriptSpoke() | ||||
|   else: | ||||
|     app.config['SPOKE_MODEL'] = spoke_model.MockSpoke() | ||||
|  | ||||
|   from capsulflask import spoke_api | ||||
|  | ||||
|   app.register_blueprint(spoke_api.bp) | ||||
|  | ||||
| @app.after_request | ||||
| def security_headers(response): | ||||
|   response.headers['X-Frame-Options'] = 'SAMEORIGIN' | ||||
|   if 'Content-Security-Policy' not in response.headers: | ||||
|     response.headers['Content-Security-Policy'] = "default-src 'self'" | ||||
|   response.headers['X-Content-Type-Options'] = 'nosniff' | ||||
|   return response | ||||
|   @app.context_processor | ||||
|   def override_url_for(): | ||||
|     """ | ||||
|     override the url_for function built into flask  | ||||
|     with our own custom implementation that busts the cache correctly when files change  | ||||
|     """ | ||||
|     return dict(url_for=url_for_with_cache_bust) | ||||
|  | ||||
|  | ||||
| @app.context_processor | ||||
| def override_url_for(): | ||||
|   """ | ||||
|   override the url_for function built into flask  | ||||
|   with our own custom implementation that busts the cache correctly when files change  | ||||
|   """ | ||||
|   return dict(url_for=url_for_with_cache_bust) | ||||
|  | ||||
|  | ||||
| def url_for_with_cache_bust(endpoint, **values): | ||||
|   """ | ||||
|   Add a query parameter based on the hash of the file, this acts as a cache bust | ||||
|   """ | ||||
|    | ||||
|   if endpoint == 'static': | ||||
|     filename = values.get('filename', None) | ||||
|     if filename: | ||||
|       if 'STATIC_FILE_HASH_CACHE' not in current_app.config: | ||||
|         current_app.config['STATIC_FILE_HASH_CACHE'] = dict() | ||||
|        | ||||
|       if filename not in current_app.config['STATIC_FILE_HASH_CACHE']: | ||||
|         filepath = os.path.join(current_app.root_path, endpoint, filename) | ||||
|         #print(filepath) | ||||
|         if os.path.isfile(filepath) and os.access(filepath, os.R_OK): | ||||
|            | ||||
|           with open(filepath, 'rb') as file: | ||||
|             hasher = hashlib.md5() | ||||
|             hasher.update(file.read()) | ||||
|             current_app.config['STATIC_FILE_HASH_CACHE'][filename] = hasher.hexdigest()[-6:] | ||||
|   def url_for_with_cache_bust(endpoint, **values): | ||||
|     """ | ||||
|     Add a query parameter based on the hash of the file, this acts as a cache bust | ||||
|     """ | ||||
|      | ||||
|     if endpoint == 'static': | ||||
|       filename = values.get('filename', None) | ||||
|       if filename: | ||||
|         if 'STATIC_FILE_HASH_CACHE' not in current_app.config: | ||||
|           current_app.config['STATIC_FILE_HASH_CACHE'] = dict() | ||||
|          | ||||
|         if filename not in current_app.config['STATIC_FILE_HASH_CACHE']: | ||||
|           filepath = os.path.join(current_app.root_path, endpoint, filename) | ||||
|           #print(filepath) | ||||
|           if os.path.isfile(filepath) and os.access(filepath, os.R_OK): | ||||
|              | ||||
|       values['q'] = current_app.config['STATIC_FILE_HASH_CACHE'][filename] | ||||
|             with open(filepath, 'rb') as file: | ||||
|               hasher = hashlib.md5() | ||||
|               hasher.update(file.read()) | ||||
|               current_app.config['STATIC_FILE_HASH_CACHE'][filename] = hasher.hexdigest()[-6:] | ||||
|                | ||||
|         values['q'] = current_app.config['STATIC_FILE_HASH_CACHE'][filename] | ||||
|  | ||||
|   return url_for(endpoint, **values) | ||||
|     return url_for(endpoint, **values) | ||||
|  | ||||
|   return app | ||||
|  | ||||
| @ -31,7 +31,6 @@ def account_required(view): | ||||
|  | ||||
|     return wrapped_view | ||||
|  | ||||
|  | ||||
| def admin_account_required(view): | ||||
|     """View decorator that redirects non-admin users to the login page.""" | ||||
|  | ||||
|  | ||||
							
								
								
									
										0
									
								
								capsulflask/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								capsulflask/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										23
									
								
								capsulflask/tests/test_auth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								capsulflask/tests/test_auth.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,23 @@ | ||||
| from flask import url_for, session | ||||
|  | ||||
| from capsulflask.db import get_model | ||||
| from capsulflask.tests_base import BaseTestCase | ||||
|  | ||||
|  | ||||
| class LoginTests(BaseTestCase): | ||||
|     render_templates = False | ||||
|  | ||||
|     def test_login_request(self): | ||||
|         with self.client as client: | ||||
|             response = client.get(url_for("auth.login")) | ||||
|             self.assert_200(response) | ||||
|  | ||||
|             # FIXME test generated login link | ||||
|  | ||||
|     def test_login_magiclink(self): | ||||
|         token, ignoreCaseMatches = get_model().login('test@example.com') | ||||
|  | ||||
|         with self.client as client: | ||||
|             response = client.get(url_for("auth.magiclink", token=token)) | ||||
|             self.assertRedirects(response, url_for("console.index")) | ||||
|             self.assertEqual(session['account'], 'test@example.com') | ||||
							
								
								
									
										14
									
								
								capsulflask/tests/test_landing.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								capsulflask/tests/test_landing.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,14 @@ | ||||
| from capsulflask.tests_base import BaseTestCase | ||||
|  | ||||
|  | ||||
| class LandingTests(BaseTestCase): | ||||
|     #: Do not render templates, we're only testing logic here. | ||||
|     render_templates = False | ||||
|  | ||||
|     def test_landing(self): | ||||
|         pages = ['/', 'pricing', 'faq', 'about-ssh', 'changelog', 'support'] | ||||
|  | ||||
|         with self.client as client: | ||||
|             for page in pages: | ||||
|                 response = client.get(page) | ||||
|                 self.assert_200(response) | ||||
							
								
								
									
										25
									
								
								capsulflask/tests_base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								capsulflask/tests_base.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,25 @@ | ||||
| import os | ||||
| from nanoid import generate | ||||
|  | ||||
| from flask_testing import TestCase | ||||
|  | ||||
| from capsulflask import create_app | ||||
|  | ||||
| class BaseTestCase(TestCase): | ||||
|     def create_app(self): | ||||
|         # Use default connection paramaters | ||||
|         os.environ['POSTGRES_CONNECTION_PARAMETERS'] = "host=localhost port=5432 user=postgres password=dev dbname=capsulflask_test" | ||||
|         os.environ['TESTING'] = '1' | ||||
|         os.environ['SPOKE_MODEL'] = 'mock' | ||||
|         return create_app() | ||||
|  | ||||
|     def setUp(self): | ||||
|         pass | ||||
|  | ||||
|     def tearDown(self): | ||||
|         pass | ||||
|  | ||||
|     def _login(self, user_email): | ||||
|         with self.client.session_transaction() as session: | ||||
|             session['account'] = user_email | ||||
|             session['csrf-token'] = generate() | ||||
		Reference in New Issue
	
	Block a user