civicrm-update-tester/civicrm_tester/base.py

227 lines
9.4 KiB
Python

import logging
import re
import urllib.parse as urlparse
from urllib.parse import parse_qs
import requests
from rich.console import Console
from rich.logging import RichHandler
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains
from typing import Tuple, List
class BaseTester:
"""
Base class for all test suites
One caveat: This class defines a pseudo function "_test" which is called by "_test_all"
Overloading classes are to define their own tests to be run by selenium after logging in
Then they are to call "_test_all" which will run these tests in a loop with given variables
This allows the test to be run multiple times with different terms to make sure the test covers all bases.
"""
def __init__(self, user: str, passwd: str, dev: bool, show_browser: bool):
self.console = Console()
# Disable pdfminers logger cause its annoying as fuck and provides no value
logging.getLogger("pdfminer").setLevel(100)
logging.basicConfig(
level="INFO",
format="%(message)s",
datefmt="[%X]", # TODO: Change date format
handlers=[RichHandler()]
)
self.log = logging.getLogger("civiCRM-tester")
self.user = user
self.passwd = passwd
if dev:
self.base_url = "https://crm.dev.caat.org.uk"
else:
self.base_url = "https://crm.staging.caat.org.uk"
firefox_options = webdriver.ChromeOptions()
firefox_options.headless = not show_browser
self.browser = webdriver.Chrome(options=firefox_options)
self.wait = WebDriverWait(self.browser, 20)
def debug(self, msg: str):
"""Wrapper for logging debug levels for rich output"""
return self.log.debug(msg, extra={"markup": True})
def info(self, msg: str):
"""Wrapper for logging info levels for rich output"""
return self.log.info(msg, extra={"markup": True})
def warn(self, msg: str):
"""Wrapper for logging warn levels for rich output"""
return self.log.warning(msg, extra={"markup": True})
def error(self, msg: str):
"""Wrapper for logging error levels for rich output"""
return self.log.error(msg, extra={"markup": True})
def critical(self, msg: str):
"""Wrapper for logging critical levels for rich output"""
return self.log.critical(msg, extra={"markup": True})
def passed(self, msg: str):
"""Wrapper for logging passed tests"""
return self.info("[reverse green]PASSED[/reverse green] %s" % msg)
def issue(self, msg: str):
"""
Wrapper for logging tests with issues
This is used for tests where the result is inconclusive due to some issue that is neither a fail nor pass.
Usually requires human interaction
"""
return self.warn("[reverse yellow]ISSUE[/reverse yellow] %s" % msg)
def failed(self, msg: str):
"""Wrapper for logging failed tests"""
return self.info("[reverse red]FAILED[/reverse red] %s" % msg)
def title(self, test_name: str):
"""Create title header for new suite of tests"""
return self.info("[cyan]%s[/cyan]" % test_name)
def desc(self, test_desc: str):
"""Log test description of upcoming number of tests"""
return self.info("%s" % test_desc)
def login(self):
""" Login to civicrm so we can continue with the proper cookies """
self.browser.get(self.base_url)
self.debug("Logging in as %s @ %s" % (self.user, self.base_url))
username = self.browser.find_element_by_id("edit-name")
password = self.browser.find_element_by_id("edit-pass")
submit = self.browser.find_element_by_id("edit-submit")
username.send_keys(self.user)
password.send_keys(self.passwd)
submit.click()
# Wait for the js elements load so we know the cookies are good.
# Waits for "Recent Items" part of sidebar which is unique when logged in
self.wait.until(
EC.visibility_of_element_located((By.ID, "block-civicrm-2"))
)
self.debug("Successfully logged in as %s" % self.user)
def logout(self):
"""Log browser out of civicrm to reset our test environment"""
self.browser.get(self.base_url + "/user/logout")
# Wait for the next page to load to finish logging out
self.wait.until(
EC.visibility_of_element_located((By.ID, "tabs-wrapper"))
)
def _test(self, *args):
"""Placeholder to be overwritten by overloading classes"""
def _test_all(self, test_strings: Tuple[str, str, str]):
"""Loops testing over the given terms"""
try:
self.login()
for term in test_strings:
self._test(term)
finally:
self.logout()
self.browser.close()
def find_element(self, *args, **kwargs):
"""Alias for browser.find_element"""
return self.browser.find_element(*args, **kwargs)
def find_element_by_id(self, *args, **kwargs):
"""Alias for browser.find_element_by_id"""
return self.browser.find_element_by_id(*args, **kwargs)
def find_element_by_css_selector(self, *args, **kwargs):
"""Alias for browser.find_element_by_css_selector"""
return self.browser.find_element_by_css_selector(*args, **kwargs)
def wait_until_visible(self, locator):
"""Alias for using inbuilt wait object for wait.until(EC.visibility_of_element_located)"""
return self.wait.until(EC.visibility_of_element_located(locator))
def wait_until_clickable(self, locator):
"""Alias for using inbuilt wait object for wait.until(EC.element_to_be_clickable)"""
return self.wait.until(EC.element_to_be_clickable(locator))
def get_tab_selector(self, tabtitle):
"""Return an XPATH string to the tab labelled `tablabel`."""
return "//li/a[@title='{label}']/..".format(label=tabtitle)
def get_actions_dropdown_option_selector(self, optionlabel):
"""Return an XPATH string to the option labelled `optionlabel` (This is not a normal dropdown)."""
return "//div[text()='{label}']".format(label=optionlabel)
class SearchExportTester(BaseTester):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.search_url = self.base_url + "/civicrm/contact/search"
self.contact_selectall_id = "CIVICRM_QFID_ts_all_4"
self.contact_dropdown_id = "select2-chosen-4"
def _get_export_id(self) -> List[str]:
"""Parses url to get the param used to ID what search we are currently doing"""
export_page_url = self.browser.current_url
parsed = urlparse.urlparse(export_page_url)
qf_key = parse_qs(parsed.query)['qfKey']
self.debug("got qf_key '%s' from url" % qf_key)
return qf_key
def _get_contact_search_number(self) -> int:
"""Grabs the number of reported matches in a search on civicrm page"""
results = self.find_element_by_css_selector(
".form-layout-compressed > tbody:nth-child(1) > tr:nth-child(2) > td:nth-child(2) > label:nth-child(2)"
)
matches = re.findall(r"(\d+)", results.text)
# Should just be one match in normal cases
result_no = int(matches[0])
return result_no
def _wait_for_search_to_load(self):
"""Wrapper to wait for a contact search to load
MUST BE CALLED AFTER YOU HAVE SEARCHED"""
# "alpha-filter" is element of the table
self.wait_until_visible((By.ID, "alpha-filter"))
self.debug("table of results has loaded")
def _select_option_from_magic_dropdown_label(self, option_label: str):
self.find_element_by_id(self.contact_selectall_id).click()
self.find_element_by_id(self.contact_dropdown_id).click()
self.wait_until_visible((By.XPATH, self.get_actions_dropdown_option_selector(option_label)))
option = self.find_element(By.XPATH, self.get_actions_dropdown_option_selector(option_label))
ActionChains(self.browser).move_to_element(option).perform()
option.click()
def download_export(self, data: dict) -> requests.Response:
"""
Download exports from searches manually using requests
This is because managing downloads within Selenium is a nightmare
Function tries to replicate the download within the browser as much as possible
"""
session_cookie = {}
for cookie in self.browser.get_cookies():
if re.findall(r"^SSESS.*", cookie.get("name")):
session_cookie = cookie
self.debug("session cookie: %s" % session_cookie)
if not session_cookie:
self.critical("NO SESSION COOKIE FOUND. Are you logged in?")
raise RuntimeError("No session cookie found.")
session = requests.Session()
session.cookies.update(
{session_cookie["name"]: session_cookie["value"]}
)
# Most of this data is replicating the export settings so that the response is the csv data
data = {
"qfKey": self._get_export_id(),
"entryURL": self.search_url,
**data # Add provided export data required for specific export requests
}
res = session.request("POST", self.search_url, data=data)
return res