219 lines
9.0 KiB
Python
219 lines
9.0 KiB
Python
import logging
|
|
import re
|
|
import urllib.parse as urlparse
|
|
from urllib.parse import parse_qs
|
|
|
|
import requests
|
|
from rich.console import Console
|
|
from rich.logging import RichHandler
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
|
|
class BaseTester:
|
|
"""
|
|
Base class for all test suites
|
|
|
|
One caveat: This class defines a pseudo function "_test" which is called by "_test_all"
|
|
Overloading classes are to define their own tests to be run by selenium after logging in
|
|
Then they are to call "_test_all" which will run these tests in a loop with given variables
|
|
This allows the test to be run multiple times with different terms to make sure the test covers all bases.
|
|
"""
|
|
def __init__(self, user: str, passwd: str, dev: bool, show_browser: bool):
|
|
self.console = Console()
|
|
# Disable pdfminers logger cause its annoying as fuck and provides no value
|
|
logging.getLogger("pdfminer").setLevel(100)
|
|
logging.basicConfig(
|
|
level="INFO",
|
|
format="%(message)s",
|
|
datefmt="[%X]", # TODO: Change date format
|
|
handlers=[RichHandler()]
|
|
)
|
|
self.log = logging.getLogger("civiCRM-tester")
|
|
self.user = user
|
|
self.passwd = passwd
|
|
if dev:
|
|
self.base_url = "https://crm.staging.caat.org.uk"
|
|
else:
|
|
self.base_url = "https://crm.staging.caat.org.uk"
|
|
|
|
firefox_options = webdriver.ChromeOptions()
|
|
firefox_options.headless = not show_browser
|
|
self.browser = webdriver.Chrome(options=firefox_options)
|
|
self.wait = WebDriverWait(self.browser, 20)
|
|
|
|
def debug(self, msg: str):
|
|
"""Wrapper for logging debug levels for rich output"""
|
|
return self.log.debug(msg, extra={"markup": True})
|
|
|
|
def info(self, msg: str):
|
|
"""Wrapper for logging info levels for rich output"""
|
|
return self.log.info(msg, extra={"markup": True})
|
|
|
|
def warn(self, msg: str):
|
|
"""Wrapper for logging warn levels for rich output"""
|
|
return self.log.warning(msg, extra={"markup": True})
|
|
|
|
def error(self, msg: str):
|
|
"""Wrapper for logging error levels for rich output"""
|
|
return self.log.error(msg, extra={"markup": True})
|
|
|
|
def critical(self, msg: str):
|
|
"""Wrapper for logging critical levels for rich output"""
|
|
return self.log.critical(msg, extra={"markup": True})
|
|
|
|
def passed(self, msg: str):
|
|
"""Wrapper for logging passed tests"""
|
|
return self.info("[reverse green]PASSED[/reverse green] %s" % msg)
|
|
|
|
def issue(self, msg: str):
|
|
"""
|
|
Wrapper for logging tests with issues
|
|
This is used for tests where the result is inconclusive due to some issue that is neither a fail nor pass.
|
|
Usually requires human interaction
|
|
"""
|
|
return self.warn("[reverse yellow]ISSUE[/reverse yellow] %s" % msg)
|
|
|
|
def failed(self, msg: str):
|
|
"""Wrapper for logging failed tests"""
|
|
return self.info("[reverse red]FAILED[/reverse red] %s" % msg)
|
|
|
|
def title(self, test_name: str):
|
|
"""Create title header for new suite of tests"""
|
|
return self.info("[cyan]%s[/cyan]" % test_name)
|
|
|
|
def desc(self, test_desc: str):
|
|
"""Log test description of upcoming number of tests"""
|
|
return self.info("%s" % test_desc)
|
|
|
|
def login(self):
|
|
""" Login to civicrm so we can continue with the proper cookies """
|
|
self.browser.get(self.base_url)
|
|
self.debug("Logging in as %s @ %s" % (self.user, self.base_url))
|
|
username = self.browser.find_element_by_id("edit-name")
|
|
password = self.browser.find_element_by_id("edit-pass")
|
|
submit = self.browser.find_element_by_id("edit-submit")
|
|
username.send_keys(self.user)
|
|
password.send_keys(self.passwd)
|
|
submit.click()
|
|
|
|
# Wait for the js elements load so we know the cookies are good.
|
|
# Waits for "Recent Items" part of sidebar which is unique when logged in
|
|
self.wait.until(
|
|
EC.visibility_of_element_located((By.ID, "block-civicrm-2"))
|
|
)
|
|
self.debug("Successfully logged in as %s" % self.user)
|
|
|
|
def logout(self):
|
|
"""Log browser out of civicrm to reset our test environment"""
|
|
self.browser.get(self.base_url + "/user/logout")
|
|
# Wait for the next page to load to finish logging out
|
|
self.wait.until(
|
|
EC.visibility_of_element_located((By.ID, "tabs-wrapper"))
|
|
)
|
|
|
|
def _test(self, *args):
|
|
"""Placeholder to be overwritten by overloading classes"""
|
|
|
|
def _test_all(self, test_strings: tuple[str, str, str]):
|
|
"""Loops testing over the given terms"""
|
|
try:
|
|
self.login()
|
|
for term in test_strings:
|
|
self._test(term)
|
|
finally:
|
|
self.logout()
|
|
self.browser.close()
|
|
|
|
def find_element_by_id(self, *args, **kwargs):
|
|
"""Alias for browser.find_element_by_id"""
|
|
return self.browser.find_element_by_id(*args, **kwargs)
|
|
|
|
def find_element_by_css_selector(self, *args, **kwargs):
|
|
"""Alias for browser.find_element_by_css_selector"""
|
|
return self.browser.find_element_by_css_selector(*args, **kwargs)
|
|
|
|
def wait_until_visible(self, locator):
|
|
"""Alias for using inbuilt wait object for wait.until(EC.visibility_of_element_located)"""
|
|
return self.wait.until(EC.visibility_of_element_located(locator))
|
|
|
|
def wait_until_clickable(self, locator):
|
|
"""Alias for using inbuilt wait object for wait.until(EC.element_to_be_clickable)"""
|
|
return self.wait.until(EC.element_to_be_clickable(locator))
|
|
|
|
|
|
class SearchExportTester(BaseTester):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.search_url = self.base_url + "/civicrm/contact/search"
|
|
self.contact_selectall_id = "CIVICRM_QFID_ts_all_4"
|
|
self.contact_dropdown_id = "select2-chosen-4"
|
|
|
|
def _get_export_id(self) -> list[str]:
|
|
"""Parses url to get the param used to ID what search we are currently doing"""
|
|
export_page_url = self.browser.current_url
|
|
parsed = urlparse.urlparse(export_page_url)
|
|
qf_key = parse_qs(parsed.query)['qfKey']
|
|
self.debug("got qf_key '%s' from url" % qf_key)
|
|
return qf_key
|
|
|
|
def _get_contact_search_number(self) -> int:
|
|
"""Grabs the number of reported matches in a search on civicrm page"""
|
|
results = self.find_element_by_css_selector(
|
|
".form-layout-compressed > tbody:nth-child(1) > tr:nth-child(2) > td:nth-child(2) > label:nth-child(2)"
|
|
)
|
|
matches = re.findall(r"(\d+)", results.text)
|
|
# Should just be one match in normal cases
|
|
result_no = int(matches[0])
|
|
return result_no
|
|
|
|
def _wait_for_search_to_load(self):
|
|
"""Wrapper to wait for a contact search to load
|
|
MUST BE CALLED AFTER YOU HAVE SEARCHED"""
|
|
# "alpha-filter" is element of the table
|
|
self.wait_until_visible((By.ID, "alpha-filter"))
|
|
self.debug("table of results has loaded")
|
|
|
|
def _select_option_from_magic_dropdown(self, option_id: str):
|
|
"""
|
|
Wrapper to click an option from the dropdown menu within the search on civicrm.
|
|
Magic dropdown because it literally is not how dropdowns should work at all
|
|
All options have an ID but this can change depending on the context of how you get to the search page
|
|
MUST BE CALLED WHEN ON THE SEARCH RESULTS PAGE
|
|
"""
|
|
self.debug("exporting results using the magic dropdown")
|
|
self.find_element_by_id(self.contact_selectall_id).click()
|
|
self.find_element_by_id(self.contact_dropdown_id).click()
|
|
self.find_element_by_id(option_id).click()
|
|
self.wait_until_visible((By.CSS_SELECTOR, ".crm-block"))
|
|
|
|
def download_export(self, data: dict) -> requests.Response:
|
|
"""
|
|
Download exports from searches manually using requests
|
|
This is because managing downloads within Selenium is a nightmare
|
|
Function tries to replicate the download within the browser as much as possible
|
|
"""
|
|
session_cookie = {}
|
|
for cookie in self.browser.get_cookies():
|
|
if re.findall(r"^SSESS.*", cookie.get("name")):
|
|
session_cookie = cookie
|
|
self.debug("session cookie: %s" % session_cookie)
|
|
if not session_cookie:
|
|
self.critical("NO SESSION COOKIE FOUND. Are you logged in?")
|
|
raise RuntimeError("No session cookie found.")
|
|
|
|
session = requests.Session()
|
|
session.cookies.update(
|
|
{session_cookie["name"]: session_cookie["value"]}
|
|
)
|
|
# Most of this data is replicating the export settings so that the response is the csv data
|
|
data = {
|
|
"qfKey": self._get_export_id(),
|
|
"entryURL": self.search_url,
|
|
**data # Add provided export data required for specific export requests
|
|
}
|
|
res = session.request("POST", self.search_url, data=data)
|
|
return res
|