diff --git a/tyop.py b/tyop.py index baba599..7230b86 100644 --- a/tyop.py +++ b/tyop.py @@ -1,24 +1,206 @@ """Automate your mass typo updates.""" +from argparse import ArgumentParser +from glob import glob +from logging import DEBUG, INFO, basicConfig, getLogger +from os.path import expanduser +from pathlib import Path +from shlex import split +from subprocess import check_output, run from sys import exit +from ruamel.yaml import YAML + class Migration: - def __init__(self): - self.validate() - self.run() - def validate(self): - if not hasattr(self, "GLOB"): - print("Define `GLOB` on your migration!") + DIFF_LIMIT = 3 + + def __init__(self): + self.args = self._parse() + self.log = self._init_logging() + self._validate() + + self.yaml = YAML(typ="rt") + self.yaml.preserve_quotes = True + self.yaml.indent(mapping=2, sequence=4, offset=2) + self.yaml.explicit_start = True + self.yaml.allow_duplicate_keys = True + + if self.args.validate: + self.log.info("Bailing out as requested...") + exit(0) + + self.matches = glob(expanduser(self.GLOB)) + self.log.info(f"Discovered '{self.GLOB}' as user defined glob") + self.log.info(f"Matched paths are: {[m for m in self.matches]}") + if not self._confirm(): + self.log.info("Bailing out on request...") exit(1) - def run(self): - pass - # TODO: - # For each path in the GLOB - # Load it and pass it into the migrate function - # Once it comes back, save the contents back to the FS - # Run a diff without pager and ask whether that looks ok - # After like 3 tries, ask if "this looks ok" + skip the rest - # Run through each repo and ask to commit + push + if self.args.reset: + self.log.info("Resetting all changes as requested...") + for match in self.matches: + self._clean(match) + exit(0) + + try: + self._run() + except Exception as exception: + self.log.error(f"Failed to run migration, saw: {exception}") + self.log.info("Resetting all changes...") + for match in self.matches: + self._clean(match) + exit(1) + + def _parse(self): + self.parser = ArgumentParser(description="Tyop: mass typo updates for all") + + self.parser.add_argument( + "-d", + "--debug", + help="enable verbose debug logs", + action="store_const", + dest="log", + const=DEBUG, + default=INFO, + ) + self.parser.add_argument( + "-v", + "--validate", + default=False, + action="store_true", + dest="validate", + help="Validate end-user defined migrationa and exit", + ) + self.parser.add_argument( + "-r", + "--reset", + default=False, + action="store_true", + dest="reset", + help="Reset changes without running migrations (git-checkout)", + ) + self.parser.add_argument( + "-y", + "--yaml", + default=False, + action="store_true", + dest="yaml", + help="Expect YAML and load for parsing", + ) + + self.args = self.parser.parse_args() + + return self.args + + def _init_logging(self): + basicConfig(level=self.args.log, format="%(levelname)-8s %(message)s") + self.log = getLogger(__name__) + return self.log + + def _shell(self, cmd, shell=False, check=True, **kwargs): + runner = check_output + args = [split(cmd)] + + if shell: + args = [cmd] + kwargs = {"shell": shell} + + if not check: + runner = run + + try: + output = runner(*args, **kwargs) + if check: + return output.decode("utf-8").strip() + except Exception as exception: + self.log.error(f"Failed to run {cmd}, saw {str(exception)}") + exit(1) + + def _confirm(self): + answer = "" + while answer not in ["y", "Y", "n", "N"]: + answer = input("Does this look good? [y/N]? ").lower() + return any((answer == y for y in ["y", "Y"])) + + def _message(self): + return input("Commit message?") + + def _commit(self, match): + root_path = Path(match).parent + self._shell("git --no-pager diff", check=False, cwd=root_path) + if self._confirm(): + message = self._message() + self._shell("git add .", check=False, cwd=root_path) + self._shell(f"git commit -vm '{message}'", check=False, cwd=root_path) + self._shell("git push", check=False, cwd=root_path) + + def _validate(self): + if not hasattr(self, "GLOB"): + self.log.error("Missing GLOB attribute!") + exit(1) + self.log.info("Validation succeeded!") + + def _diff(self, match, idx): + root_path = Path(match).parent + self.log.debug(f"Running git-diff in {root_path} ({idx+1}/{self.DIFF_LIMIT})") + self._shell("git --no-pager diff", check=False, cwd=root_path) + if not self._confirm(): + self._clean(match) + self.log.info("Bailing out on request...") + exit(1) + + def _clean(self, match, branch=False): + root_path = Path(match).parent + self.log.info(f"Cleaning {root_path} of local changes...") + + self._shell("git checkout .", check=False, cwd=root_path) + + if branch: + self.log.info("Checkout out the default branch...") + self._shell( + "git checkout main > /dev/null 2>&1 || git checkout master > /dev/null 2>&1", # noqa + check=False, + shell=True, + cwd=root_path, + ) + + def _run(self): + idx = 0 + for match in self.matches: + self._clean(match, branch=True) + + self.log.info(f"Processing {match}...") + + with open(match, "r") as handle: + contents = handle.read() + + if self.args.yaml: + self.log.debug("Attempting to load YAML...") + contents = self.yaml.load(contents) + + migrated = self.migrate(contents) + + self.log.debug(f"Migrated {match}...") + + with open(match, "w") as handle: + if self.args.yaml: + self.yaml.dump(migrated, handle) + else: + handle.write(migrated) + + self.log.debug(f"Saved {match} back to the file system...") + + if idx < self.DIFF_LIMIT and contents != migrated: + self._diff(match, idx=idx) + idx += 1 + + self.log.info("Finished migrating files...") + self.log.info("Commencing change commit run...") + + for match in self.matches: + self._commit(match) + + self.log.info("Finished committing changes...") + self.log.info("Finished! May your tyops be ever glorious!")