#!/usr/bin/env python3.7 import os import csv import logging from datetime import datetime from itertools import chain from pathlib import Path import sys import tomllib import click from clockify_sdk import Clockify import requests from peewee import fn, JOIN from textual.logging import TextualHandler from xdg.BaseDirectory import xdg_config_home from .db import ( KimaiTag, db, HamsterCategory, HamsterActivity, HamsterFact, KimaiCustomer, KimaiProject, KimaiActivity, HamsterActivityKimaiMapping, HamsterFactKimaiImport, HamsterFactTag, ClockifyProject, HamsterClockifyMapping, HamsterFactClockifyImport ) from .kimaiapi import KimaiAPI, Timesheet from .sync import sync CONFIG_FILE = Path(xdg_config_home) / "hamstertools.toml" HAMSTER_DIR = Path.home() / ".local/share/hamster" HAMSTER_FILE = HAMSTER_DIR / "hamster.db" db.init(HAMSTER_FILE) @click.group(context_settings={"auto_envvar_prefix": "HAMSTERTOOL"}) @click.option("-d", "--debug", is_flag=True) @click.option("--config", default=CONFIG_FILE, type=click.Path()) @click.pass_context def cli(ctx, config, debug): file_config = {} if os.path.exists(config): with open(config, "rb") as f: file_config = tomllib.load(f) if debug: logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) peewee_logger = logging.getLogger("peewee") peewee_logger.addHandler(TextualHandler()) peewee_logger.setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True ctx.ensure_object(dict) ctx.obj['config'] = file_config @cli.group() def categories(): pass @categories.command("list") @click.option("--search", help="Search string") def list_categories(search): """List / search categories""" categories = HamsterCategory.select() if search is not None: categories = categories.where(HamsterCategory.name.contains(search)) for c in categories: click.echo(f"@{c.id}: {c.name}") @categories.command("delete") @click.argument("ids", nargs=-1) def delete_categories(ids): """Delete categories specified by IDS""" click.secho("Deleting:", fg="red") categories = ( HamsterCategory.select( HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count") ) .join(HamsterActivity, JOIN.LEFT_OUTER) .group_by(HamsterCategory) .where(HamsterCategory.id.in_(ids)) ) for c in categories: click.echo(f"@{c.id}: {c.name} ({c.activities_count} activities)") click.confirm("Do you want to continue?", abort=True) count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute() click.secho("Deleted {0} categories".format(count), fg="green") @categories.command("rename") @click.argument("id_", metavar="ID") @click.argument("name") def rename_category(id_, name): """Rename a category""" category = HamsterCategory.get(id=id_) click.echo(f'Renaming @{category.id}: {category.name} to "{name}"') category.name = name category.save() @categories.command("activities") @click.argument("ids", nargs=-1) def list_category_activities(ids): """Show activities for categories specified by ids""" activities = ( HamsterActivity.select(HamsterActivity, HamsterCategory.name) .join(HamsterCategory, JOIN.LEFT_OUTER) .where(HamsterCategory.id.in_(ids)) ) for a in activities: click.echo(f"@{a.id}: {a.category.name} » {a.name}") @categories.command("tidy") def tidy_categories(): """Remove categories with no activities""" subquery = ( HamsterCategory.select( HamsterCategory, fn.COUNT(HamsterActivity.id).alias("activities_count") ) .join(HamsterActivity, JOIN.LEFT_OUTER) .group_by(HamsterCategory) .alias("subquery") ) categories = ( HamsterCategory.select() .join(subquery, on=(HamsterCategory.id == subquery.c.id)) .where(subquery.c.activities_count == 0) ) click.echo("Found {0} empty categories:".format(categories.count())) for cat in categories: click.echo(f"@{cat.id}: {cat.name}") click.confirm("Do you want to continue?", abort=True) [cat.delete_instance() for cat in categories] @cli.group() def activities(): pass @activities.command("list") @click.option("--search", help="Search string") @click.option("--csv/--no-csv", "csv_output", default=False, help="CSV output") def list_activities(search, csv_output): """List / search activities""" activities = ( HamsterActivity.select(HamsterActivity, HamsterCategory) .join(HamsterCategory, JOIN.LEFT_OUTER) .order_by(HamsterCategory.name, HamsterActivity.name) ) if search is not None: activities = activities.where(HamsterActivity.name.contains(search)) if csv_output: csv_writer = csv.writer(sys.stdout) for a in activities: category_name = a.category.name if a.category_id != -1 else "" if csv_output: csv_writer.writerow([a.category_id, category_name, a.id, a.name]) else: click.echo(f"@{a.category_id}: {category_name} » {a.id}: {a.name}") @activities.command("delete") @click.argument("ids", nargs=-1) def delete_activities(ids): """Delete activities specified by IDS""" activities = ( HamsterActivity.select( HamsterActivity, HamsterCategory.name, fn.Count(HamsterFact.id).alias("facts_count"), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join(HamsterFact, JOIN.LEFT_OUTER) .group_by(HamsterActivity) .where(HamsterActivity.id.in_(ids)) ) click.secho("Deleting:", fg="red") for a in activities: category_name = a.category.name if a.category_id != -1 else "" click.echo(f"@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)") click.confirm("Do you want to continue?", abort=True) [a.delete_instance() for a in activities] click.secho("Deleted {0} activities".format(len(ids)), fg="green") @activities.command() @click.argument("category_id") @click.argument("ids", nargs=-1) def move(category_id, ids): """Move activities to another category""" category = HamsterCategory.get(id=category_id) activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) click.secho(f'Moving to "@{category.id}: {category.name}":', fg="green") for a in activities: category_name = a.category.name if a.category_id != -1 else "" click.secho(f"@{a.category_id}: {category_name} » @{a.id}: {a.name}", fg="blue") click.confirm("Do you want to continue?", abort=True) for a in activities: a.category = category a.save() click.secho("Moved {0} activities".format(len(ids)), fg="green") @activities.command() @click.argument("ids", nargs=-1) def list_facts(ids): """Show facts for activities""" activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) for a in activities: click.secho(f"@{a.id}: {a.name}", fg="green") for f in a.facts: click.secho(f"@{f.id}, {f.start_time}", fg="blue") @activities.command() @click.argument("from_id") @click.argument("to_id") def move_facts(from_id, to_id): """Move facts from one activity to another""" from_activity = HamsterActivity.get(id=from_id) to_activity = HamsterActivity.get(id=to_id) from_category_name = ( from_activity.category.name if from_activity.category_id != -1 else "" ) to_category_name = ( to_activity.category.name if to_activity.category_id != -1 else "" ) click.secho( f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"', fg="green", ) for f in from_activity.facts: click.secho(f"@{f.id}, {f.start_time}", fg="blue") click.confirm("Do you want to continue?", abort=True) count = ( HamsterFact.update(activity_id=to_activity.id) .where(HamsterFact.activity == from_activity) .execute() ) click.secho("Moved {0} facts".format(count), fg="green") click.confirm( f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?', abort=True, ) from_activity.delete_instance() @activities.command() def find_duplicates(): """Show activities which are not unique in their categories""" non_unique_activities = ( HamsterActivity.select( HamsterActivity, HamsterCategory.id, fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .group_by(HamsterActivity.category_id, HamsterActivity.name) .having(fn.COUNT(HamsterActivity.id) > 1) ) for activity in non_unique_activities: click.secho( f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}", fg="blue", ) @cli.group() @click.option("--kimai-api-url", envvar="KIMAI_API_URL") @click.option("--kimai-username", envvar="KIMAI_USERNAME") @click.option("--kimai-api-key", envvar="KIMAI_API_KEY") @click.pass_context def kimai(ctx, kimai_api_url, kimai_username, kimai_api_key): file_config = ctx.obj['config'] ctx.obj['kimai'] = KimaiAPI( kimai_username if kimai_username is not None else file_config.get("kimai_username"), kimai_api_key if kimai_api_key is not None else file_config.get("kimai_api_key"), kimai_api_url if kimai_api_url is not None else file_config.get("kimai_api_url"), ) pass def _get_kimai_mapping_file(path, category_search=None): try: return open(path) except FileNotFoundError: click.confirm("Mapping file {} not found, create it?:".format(path), abort=True) mapping_file = open(path, "w") mapping_writer = csv.writer(mapping_file) mapping_writer.writerow( [ "FROM category", "FROM activity", "TO Customer", "TO Project", "TO Activity", "TO Tag", "TO Note", ] ) activities = HamsterActivity.select(HamsterActivity, HamsterCategory).join( HamsterCategory, JOIN.LEFT_OUTER ) for a in activities: mapping_writer.writerow( [a.category.name if a.category_id != -1 else "", a.name] ) mapping_file.close() return open(path) @kimai.command() @click.option( "--mapping-path", help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", multiple=True, ) @click.argument("username") @click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors") @click.option("--ignore-activities", is_flag=True, help="Ignore missing activities") def check(username, api_key, just_errors, ignore_activities, mapping_path=None): """ Check customer / project / activity data from Kimai """ kimai_api_url = "https://kimai.autonomic.zone/api" if len(mapping_path) == 0: mapping_path = (HAMSTER_DIR / "mapping.kimai.csv",) mapping_files = [] for mapping_path_item in mapping_path: if not Path(mapping_path_item).exists(): raise click.UsageError(f"{mapping_path_item} does not exist") mapping_file = _get_kimai_mapping_file(mapping_path_item) next(mapping_file) mapping_files.append(mapping_file) mapping_reader = csv.reader(chain(*mapping_files)) next(mapping_reader) mapping_data = [[row[2], row[3], row[4]] for row in mapping_reader] mapping_file.close() auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key} customers = requests.get( f"{kimai_api_url}/customers?visible=3", headers=auth_headers ).json() projects = requests.get( f"{kimai_api_url}/projects?visible=3", headers=auth_headers ).json() activities = requests.get( f"{kimai_api_url}/activities?visible=3", headers=auth_headers ).json() found_customers = [] found_projects = [] found_activities = [] for row in mapping_data: # Check if each mapping still exists in Kimai matching_customers = list(filter(lambda x: x["name"] == row[0], customers)) if row[0] in found_customers: just_errors or click.secho( "Skipping existing customer '{0}'".format(row[0]), fg="green" ) else: if len(matching_customers) > 1: click.secho( "More than one match for customer '{0}'".format(row[0]), fg="red" ) continue elif len(matching_customers) < 1: click.secho("Missing customer '{0}'".format(row[0]), fg="yellow") continue else: just_errors or click.secho( "Found customer '{0}'".format(row[0]), fg="green" ) found_customers.append(row[0]) project_str = ":".join(row[0:2]) matching_projects = list( filter( lambda x: x["name"] == row[1] and x["customer"] == matching_customers[0]["id"], projects, ) ) if project_str in found_projects: just_errors or click.secho( "Skipping existing project '{0}'".format(project_str), fg="green" ) else: if len(matching_projects) > 1: click.secho( "More than one match for project '{0}'".format(project_str), fg="red", ) continue elif len(matching_projects) < 1: click.secho("Missing project '{0}'".format(project_str), fg="yellow") continue else: just_errors or click.secho( "Found project '{0}'".format(project_str), fg="green" ) found_projects.append(project_str) if ignore_activities: continue activity_str = ":".join(row) if activity_str in found_activities: just_errors or click.secho( "Skipping existing activity '{0}'".format(activity_str), fg="green" ) else: matching_activities = list( filter( lambda x: x["name"] == row[2] and x["project"] == matching_projects[0]["id"], activities, ) ) if len(matching_activities) > 1: click.secho( "More than one match for activity '{0}'".format(activity_str), fg="red", ) elif len(matching_activities) < 1: click.secho("Missing activity '{0}'".format(activity_str), fg="yellow") else: just_errors or click.secho( "Found activity '{0}'".format(activity_str), fg="green" ) found_activities.append(activity_str) @kimai.command("csv") @click.option( "--mapping-path", help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", multiple=True, ) @click.option("--output", help="Output file (default kimai.csv)") @click.option("--category-search", help="Category search string") @click.option("--after", help="Only show time entries after this date") @click.option("--show-missing", help="Just report on the missing entries", is_flag=True) @click.argument("username") def _csv( username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False, ): """ Export time tracking data in Kimai format """ if mapping_path is None: mapping_path = HAMSTER_DIR / "mapping.kimai.csv" if output is None: timestamp = datetime.now().strftime("%F") output = f"kimai_{timestamp}.csv" if isinstance(mapping_path, tuple): mapping_files = [] for mapping_path_item in mapping_path: mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search) next(mapping_file) mapping_files.append(mapping_file) mapping_reader = csv.reader(chain(*mapping_files)) else: mapping_file = _get_kimai_mapping_file(mapping_path, category_search) next(mapping_file) mapping_reader = csv.reader(mapping_file) mapping = { "{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]] for row in mapping_reader } if isinstance(mapping_path, tuple): for mapping_file in mapping_files: mapping_file.close() else: mapping_file.close() output_file = open(output, "w") output_writer = csv.writer(output_file) facts = ( HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory) .join(HamsterActivity) .join(HamsterCategory, JOIN.LEFT_OUTER) ) if category_search is not None: facts = facts.where(HamsterCategory.name.contains(category_search)) if after is not None: facts = facts.where(HamsterFact.start_time >= after) if not show_missing: output_writer.writerow( [ "Date", "From", "To", "Duration", "Rate", "User", "Customer", "Project", "Activity", "Description", "Exported", "Tags", "HourlyRate", "FixedRate", "InternalRate", ] ) for fact in facts: k = f"{fact.activity.category.name}:{fact.activity.name}" try: mapping_ = mapping[k] except KeyError: if show_missing: output_writer.writerow( [fact.activity.category.name, fact.activity.name] ) click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow") continue if show_missing: continue if fact.start_time is None or fact.end_time is None: click.secho( f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping", fg="yellow", ) continue if len(mapping_) < 5: mapping_.append(None) date_start, date_end = ( datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"), datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"), ) duration = (date_start - date_end).seconds / 3600 output_writer.writerow( [ date_start.strftime("%Y-%m-%d"), date_start.strftime("%H:%M"), "", # To (time) duration, "", # Rate username, mapping_[0], mapping_[1], mapping_[2], fact.description or mapping_[4] or "", "0", # Exported mapping_[3], "", # Hourly rate "", # Fixed rate ] ) output_file.close() @kimai.command("import") @click.argument("search") @click.argument("after") @click.argument("before") @click.pass_context def _import(ctx, search, after, before): api = ctx.obj SEARCH = "auto" facts = ( HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory) .join(HamsterActivity, JOIN.LEFT_OUTER) .join(HamsterCategory, JOIN.LEFT_OUTER) .switch(HamsterFact) .join(HamsterFactTag, JOIN.LEFT_OUTER) .where( (HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d")) & (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d")) & HamsterCategory.name.contains(SEARCH) ) ) has_errors = False # check data for f in facts: mappings = f.activity.mappings if len(mappings) == 0: print( f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping" ) has_errors = True continue if len(mappings) > 1: print( f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings" ) has_errors = True continue if ( mappings[0].kimai_activity.project is None and not mappings[0].kimai_project.allow_global_activities ): click.secho( f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red", ) has_errors = True continue if f.imports.count() > 0: click.secho( f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)", fg="yellow", ) continue if has_errors: sys.exit(1) # upload data for f in facts: try: mapping = f.activity.mappings[0] except IndexError: print( f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})" ) continue if f.imports.count() > 0: print( f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})" ) continue t = Timesheet( api, activity=mapping.kimai_activity, project=mapping.kimai_project, begin=f.start_time, end=f.end_time, description=f.description if f.description != "" else mapping.kimai_description, tags=",".join([t.tag.name for t in f.tags]) if len(f.tags) > 0 else mapping.kimai_tags ) r = t.upload().json() if len(f.tags) > 0 or mapping.kimai_tags: print(",".join([t.tag.name for t in f.tags]) if len(f.tags)> 0 else mapping.kimai_tags) print(r["tags"]) if r.get("code", 200) != 200: print(r) print(f"{f.id} ({f.activity.category.name} » {f.activity.name})") else: HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save() print(f'Created Kimai timesheet {r["id"]}') @kimai.group("db") def kimai_db(): pass @kimai_db.command() def init(): db.create_tables( [ KimaiCustomer, KimaiProject, KimaiActivity, KimaiTag, HamsterActivityKimaiMapping, HamsterFactKimaiImport, ] ) @kimai_db.command() def reset(): HamsterActivityKimaiMapping.delete().execute() @kimai_db.command("sync") @click.pass_context def kimai_db_sync(ctx): sync( ctx.obj['kimai'] ) @kimai_db.command() @click.option( "-g", "--global", "global_", help="Does this file contain mappings to global activties", is_flag=True, ) @click.option("--mapping-path", help="Mapping file") def mapping2db(mapping_path=None, global_=False): mapping_file = _get_kimai_mapping_file(mapping_path, None) next(mapping_file) mapping_reader = csv.reader(mapping_file) for row in mapping_reader: hamster_category = HamsterCategory.get(name=row[0]) hamster_activity = HamsterActivity.get( name=row[1], category_id=hamster_category.id ) kimai_customer = KimaiCustomer.get(name=row[2]) kimai_project = KimaiProject.get(name=row[3], customer_id=kimai_customer.id) try: kimai_activity = KimaiActivity.get(name=row[4], project_id=kimai_project.id) except KimaiActivity.DoesNotExist: kimai_activity = KimaiActivity.get(name=row[4], project_id=None) HamsterActivityKimaiMapping.create( hamster_activity=hamster_activity, kimai_customer=kimai_customer, kimai_project=kimai_project, kimai_activity=kimai_activity, kimai_description=row[6], kimai_tags=row[5], ) @cli.group() @click.option("--api-key", envvar="CLOCKIFY_API_KEY", help="Clockify API key") @click.option("--workspace-id", envvar="CLOCKIFY_WORKSPACE_ID", help="Clockify workspace ID") @click.pass_context def clockify(ctx, api_key, workspace_id): file_config = ctx.obj['config'] ctx.obj['clockify'] = Clockify( api_key=api_key if api_key is not None else file_config.get("clockify_api_key"), workspace_id=workspace_id if workspace_id is not None else file_config.get("clockify_workspace_id") ) @clockify.group("db") def clockify_db(): pass @clockify_db.command("sync") @click.pass_context def clockify_db_sync(ctx): from .clockify import sync_projects count = sync_projects(ctx.obj['clockify'], db) click.echo(f"Synced {count} Clockify projects") @clockify_db.command("init") def clockify_db_init(): db.create_tables( [ ClockifyProject, HamsterClockifyMapping, HamsterFactClockifyImport, ] ) @clockify.command("app") @click.pass_context def clockify_app(ctx): from .app import HamsterToolsAppClockify app = HamsterToolsAppClockify(ctx.obj['clockify']) app.run() @kimai.command("app") @click.pass_context def kimai_app(ctx): from .app import HamsterToolsAppKimai app = HamsterToolsAppKimai(ctx.obj['kimai']) app.run() @cli.command() def app(): from .app import HamsterToolsApp app = HamsterToolsApp() app.run() @cli.command() def hamster(): click.echo("🐹") if __name__ == "__main__": cli()