diff --git a/hamstertools/__init__.py b/hamstertools/__init__.py index f65890f..cfcdb20 100755 --- a/hamstertools/__init__.py +++ b/hamstertools/__init__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3.7 import csv +import logging from datetime import datetime from itertools import chain from pathlib import Path @@ -7,80 +8,43 @@ import sys import click import requests -import sqlite3 +from peewee import fn, JOIN +from textual.logging import TextualHandler -HAMSTER_DIR = Path.home() / '.local/share/hamster' -HAMSTER_FILE = HAMSTER_DIR / 'hamster.db' +from .db import ( + db, + HamsterCategory, + HamsterActivity, + HamsterFact, + KimaiCustomer, + KimaiProject, + KimaiActivity, + HamsterActivityKimaiMapping, + HamsterFactKimaiImport, +) +from .kimaiapi import KimaiAPI, Timesheet +from .sync import sync -conn = sqlite3.connect(HAMSTER_FILE) -c = conn.cursor() +HAMSTER_DIR = Path.home() / ".local/share/hamster" +HAMSTER_FILE = HAMSTER_DIR / "hamster.db" - -def get_categories(ids=None, search=None): - sql = ''' - SELECT - id, name - FROM - categories ''' - - args = [] - - if ids is not None: - sql = sql + 'WHERE id IN ({seq})'.format( - seq=','.join(['?'] * len(ids)) - ) - args = args + list(ids) - - if search is not None: - sql = sql + " WHERE name LIKE ?" - search = '%{0}%'.format(search) - args.append(search) - - results = c.execute(sql, args) - - results = c.fetchall() - - return results - - -def get_activities(ids=None, search=None, category_search=None): - sql = ''' - SELECT - activities.id, activities.name, categories.name, categories.id - FROM - activities - LEFT JOIN - categories - ON - activities.category_id = categories.id ''' - - args = [] - - if ids is not None: - sql = sql + 'WHERE activities.id IN ({seq})'.format( - seq=','.join(['?'] * len(ids)) - ) - args = args + list(ids) - - if search is not None: - sql = sql + " WHERE activities.name LIKE ?" - search = '%{0}%'.format(search) - args.append(search) - - if category_search is not None: - sql = sql + " WHERE categories.name LIKE ?" - category_search = '%{0}%'.format(category_search) - args.append(category_search) - - results = c.execute(sql, args) - results = c.fetchall() - - return results +db.init(HAMSTER_FILE) @click.group() -def cli(): - pass +@click.option("-d", "--debug", is_flag=True) +def cli(debug): + if debug: + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + + peewee_logger = logging.getLogger("peewee") + peewee_logger.addHandler(TextualHandler()) + peewee_logger.setLevel(logging.DEBUG) + + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.propagate = True @cli.group() @@ -88,110 +52,100 @@ def categories(): pass -@categories.command('list') -@click.option('--search', help='Search string') +@categories.command("list") +@click.option("--search", help="Search string") def list_categories(search): - """ List / search categories """ - results = get_categories(search=search) + """List / search categories""" + categories = HamsterCategory.select() - for r in results: - click.echo('@{0[0]}: {0[1]}'.format(r)) + if search is not None: + categories = categories.where(HamsterCategory.name.contains(search)) + + for c in categories: + click.echo(f"@{c.id}: {c.name}") -@categories.command('delete') -@click.argument('ids', nargs=-1) +@categories.command("delete") +@click.argument("ids", nargs=-1) def delete_categories(ids): - """ Delete categories specified by IDS """ - click.secho('Deleting:', fg='red') + """Delete categories specified by IDS""" + click.secho("Deleting:", fg="red") - results = get_categories(ids) - - for r in results: - sql = 'select count(id) from activities where category_id = ?' - count = c.execute(sql, (r[0],)).fetchone()[0] - click.echo('@{0[0]}: {0[1]} ({1} activities)'.format(r, count)) - - click.confirm('Do you want to continue?', abort=True) - - for r in results: - sql = 'DELETE FROM activities WHERE category_id = ?' - c.execute(sql, (r[0],)) - - sql = 'DELETE FROM categories ' - - sql = sql + 'WHERE id IN ({seq})'.format( - seq=','.join(['?'] * len(ids)) + categories = ( + HamsterCategory.select( + HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count") + ) + .join(HamsterActivity, JOIN.LEFT_OUTER) + .group_by(HamsterCategory) + .where(HamsterCategory.id.in_(ids)) ) - c.execute(sql, ids) - conn.commit() + for c in categories: + click.echo(f"@{c.id}: {c.name} ({c.activities_count} activities)") - click.secho('Deleted {0} categories'.format(len(ids)), fg='green') + click.confirm("Do you want to continue?", abort=True) + + count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute() + + click.secho("Deleted {0} categories".format(count), fg="green") -@categories.command('rename') -@click.argument('id_', metavar='ID') -@click.argument('name') +@categories.command("rename") +@click.argument("id_", metavar="ID") +@click.argument("name") def rename_category(id_, name): - """ Rename a category """ + """Rename a category""" - r = get_categories((id_,))[0] + category = HamsterCategory.get(id=id_) - click.echo('Renaming @{0[0]}: {0[1]} to "{1}"'.format(r, name)) + click.echo(f'Renaming @{category.id}: {category.name} to "{name}"') - sql = 'UPDATE categories SET name = ? WHERE id = ?' - - c.execute(sql, (name, r[0])) - conn.commit() + category.name = name + category.save() -@categories.command('activities') -@click.argument('ids', nargs=-1) +@categories.command("activities") +@click.argument("ids", nargs=-1) def list_category_activities(ids): - """ Show activities for categories specified by IDS """ - sql = ''' - SELECT - activities.id, activities.name, categories.name - FROM - activities - LEFT JOIN - categories - ON - activities.category_id = categories.id - WHERE - categories.id IN ({seq}) - '''.format( - seq=','.join(['?'] * len(ids)) + """Show activities for categories specified by ids""" + + activities = ( + HamsterActivity.select(HamsterActivity, HamsterCategory.name) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .where(HamsterCategory.id.in_(ids)) ) - results = c.execute(sql, ids) - - for r in results: - click.echo('@{0[0]}: {0[2]} » {0[1]}'.format(r)) + for a in activities: + click.echo(f"@{a.id}: {a.category.name} » {a.name}") -@categories.command('tidy') +@categories.command("tidy") def tidy_categories(): - """ Remove categories with no activities """ + """Remove categories with no activities""" - sql = 'SELECT categories.id, categories.name FROM categories LEFT JOIN activities ON categories.id = activities.category_id WHERE activities.id IS NULL' - categories = c.execute(sql).fetchall() + subquery = ( + HamsterCategory.select( + HamsterCategory, fn.COUNT(HamsterActivity.id).alias("activities_count") + ) + .join(HamsterActivity, JOIN.LEFT_OUTER) + .group_by(HamsterCategory) + .alias("subquery") + ) - click.echo('Found {0} empty categories:'.format(len(categories))) + categories = ( + HamsterCategory.select() + .join(subquery, on=(HamsterCategory.id == subquery.c.id)) + .where(subquery.c.activities_count == 0) + ) + + click.echo("Found {0} empty categories:".format(categories.count())) for cat in categories: - click.echo('@{0[0]}: {0[1]}'.format(cat)) + click.echo(f"@{cat.id}: {cat.name}") - click.confirm('Do you want to continue?', abort=True) + click.confirm("Do you want to continue?", abort=True) - sql = 'DELETE FROM categories ' - - sql = sql + 'WHERE id IN ({seq})'.format( - seq=','.join(['?'] * len(categories)) - ) - - c.execute(sql, [cat[0] for cat in categories]) - conn.commit() + [cat.delete_instance() for cat in categories] @cli.group() @@ -199,199 +153,160 @@ def activities(): pass -@activities.command('list') -@click.option('--search', help='Search string') -@click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output') +@activities.command("list") +@click.option("--search", help="Search string") +@click.option("--csv/--no-csv", "csv_output", default=False, help="CSV output") def list_activities(search, csv_output): - """ List / search activities """ + """List / search activities""" - results = get_activities(search=search) + activities = ( + HamsterActivity.select(HamsterActivity, HamsterCategory) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .order_by(HamsterCategory.name, HamsterActivity.name) + ) - results.sort(key=lambda t: (t[2], t[1])) + if search is not None: + activities = activities.where(HamsterActivity.name.contains(search)) if csv_output: csv_writer = csv.writer(sys.stdout) - for r in results: + for a in activities: + category_name = a.category.name if a.category_id != -1 else "" if csv_output: - csv_writer.writerow([r[3], r[2], r[0], r[1]]) + csv_writer.writerow([a.category_id, category_name, a.id, a.name]) else: - click.echo('@{0[3]}: {0[2]} » {0[0]}: {0[1]}'.format(r)) + click.echo(f"@{a.category_id}: {category_name} » {a.id}: {a.name}") -@activities.command('delete') -@click.argument('ids', nargs=-1) +@activities.command("delete") +@click.argument("ids", nargs=-1) def delete_activities(ids): - """ Delete activities specified by IDS """ + """Delete activities specified by IDS""" - results = get_activities(ids) - - click.secho('Deleting:', fg='red') - - for r in results: - sql = "SELECT COUNT(id) FROM facts WHERE activity_id = ?" - count = c.execute(sql, (r[0],)).fetchone()[0] - click.echo('@{0[0]}: {0[2]} » {0[1]} ({1} facts)'.format(r, count)) - - click.confirm('Do you want to continue?', abort=True) - - sql = 'DELETE FROM activities ' - - sql = sql + 'WHERE id IN ({seq})'.format( - seq=','.join(['?'] * len(ids)) - ) - - c.execute(sql, ids) - conn.commit() - - click.secho('Deleted {0} activities'.format(len(ids)), fg='green') - - -@activities.command() -@click.argument('category_id') -@click.argument('ids', nargs=-1) -def move(category_id, ids): - """ Move activities to another category """ - category = get_categories((category_id,))[0] - results = get_activities(ids) - - click.secho('Moving to "@{0[0]}: {0[1]}":'.format(category), fg='green') - - for r in results: - click.secho('@{0[3]}: {0[2]} » @{0[0]}: {0[1]}'.format(r), fg='blue') - - click.confirm('Do you want to continue?', abort=True) - - sql = ''' - UPDATE - activities - SET - category_id = ? - ''' - - sql = sql + 'WHERE id IN ({seq})'.format( - seq=','.join(['?'] * len(ids)) - ) - - c.execute(sql, (category[0], *ids)) - conn.commit() - - click.secho('Moved {0} activities'.format(len(ids)), fg='green') - - -@activities.command() -@click.argument('ids', nargs=-1) -def list_facts(ids): - """ Show facts for activities """ - - results = get_activities(ids) - - for r in results: - click.secho( - '@{0[0]}: {0[1]}'.format(r), fg='green' + activities = ( + HamsterActivity.select( + HamsterActivity, + HamsterCategory.name, + fn.Count(HamsterFact.id).alias("facts_count"), ) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .switch(HamsterActivity) + .join(HamsterFact, JOIN.LEFT_OUTER) + .group_by(HamsterActivity) + .where(HamsterActivity.id.in_(ids)) + ) - sql = ''' - SELECT - start_time, - activities.name - FROM - facts - LEFT JOIN - activities - ON - facts.activity_id = activities.id - WHERE - activities.id = ? - ''' + click.secho("Deleting:", fg="red") - results = c.execute(sql, (r[0],)) + for a in activities: + category_name = a.category.name if a.category_id != -1 else "" + click.echo(f"@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)") - for r in results: - click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue') + click.confirm("Do you want to continue?", abort=True) + + [a.delete_instance() for a in activities] + + click.secho("Deleted {0} activities".format(len(ids)), fg="green") @activities.command() -@click.argument('from_id') -@click.argument('to_id') +@click.argument("category_id") +@click.argument("ids", nargs=-1) +def move(category_id, ids): + """Move activities to another category""" + category = HamsterCategory.get(id=category_id) + activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) + + click.secho(f'Moving to "@{category.id}: {category.name}":', fg="green") + + for a in activities: + category_name = a.category.name if a.category_id != -1 else "" + click.secho(f"@{a.category_id}: {category_name} » @{a.id}: {a.name}", fg="blue") + + click.confirm("Do you want to continue?", abort=True) + + for a in activities: + a.category = category + a.save() + + click.secho("Moved {0} activities".format(len(ids)), fg="green") + + +@activities.command() +@click.argument("ids", nargs=-1) +def list_facts(ids): + """Show facts for activities""" + activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) + + for a in activities: + click.secho(f"@{a.id}: {a.name}", fg="green") + + for f in a.facts: + click.secho(f"@{f.id}, {f.start_time}", fg="blue") + + +@activities.command() +@click.argument("from_id") +@click.argument("to_id") def move_facts(from_id, to_id): - """ Move facts from one activity to another """ - from_activity = get_activities((from_id,))[0] - to_activity = get_activities((to_id,))[0] + """Move facts from one activity to another""" + from_activity = HamsterActivity.get(id=from_id) + to_activity = HamsterActivity.get(id=to_id) + + from_category_name = ( + from_activity.category.name if from_activity.category_id != -1 else "" + ) + to_category_name = ( + to_activity.category.name if to_activity.category_id != -1 else "" + ) click.secho( - 'Moving facts from "@{0[2]} » @{0[0]}: {0[1]}" to "@{1[2]} » @{1[0]}: {1[1]}"'.format( - from_activity, to_activity - ), fg='green' + f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"', + fg="green", ) - sql = ''' - SELECT - start_time, - activities.name - FROM - facts - LEFT JOIN - activities - ON - facts.activity_id = activities.id - WHERE - activities.id = ? - ''' + for f in from_activity.facts: + click.secho(f"@{f.id}, {f.start_time}", fg="blue") - results = c.execute(sql, (from_id,)) + click.confirm("Do you want to continue?", abort=True) - for r in results: - click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue') - - click.confirm('Do you want to continue?', abort=True) - - c.execute( - 'UPDATE facts SET activity_id = ? WHERE activity_id = ?', - (to_id, from_id) + count = ( + HamsterFact.update(activity_id=to_activity.id) + .where(HamsterFact.activity == from_activity) + .execute() ) - conn.commit() - - click.secho('Moved {0} facts'.format(results.rowcount), fg='green') + click.secho("Moved {0} facts".format(count), fg="green") click.confirm( - 'Would you like to delete @{0[2]} » @{0[0]}: {0[1]}?'.format( - from_activity), - abort=True + f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?', + abort=True, ) - delete_activities((from_id,)) + from_activity.delete_instance() @activities.command() def find_duplicates(): - """ Show activities which are not unique in their categories """ + """Show activities which are not unique in their categories""" - sql = ''' - SELECT - categories.id, - categories.name, - activities.id, - activities.name, - COUNT(activities.id) c - FROM - activities - LEFT JOIN - categories - ON - activities.category_id = categories.id - GROUP BY - activities.name, - activities.category_id - HAVING c > 1 - ''' + non_unique_activities = ( + HamsterActivity.select( + HamsterActivity, + HamsterCategory.id, + fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), + ) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .group_by(HamsterActivity.category_id, HamsterActivity.name) + .having(fn.COUNT(HamsterActivity.id) > 1) + ) - results = c.execute(sql) - - for r in results: + for activity in non_unique_activities: click.secho( - '@{0[0]}: {0[1]} » @{0[2]}: {0[3]} ({0[4]})'.format(r), fg='blue') + f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}", + fg="blue", + ) @cli.group() @@ -403,80 +318,83 @@ def _get_kimai_mapping_file(path, category_search=None): try: return open(path) except FileNotFoundError: - click.confirm( - 'Mapping file {} not found, create it?:'.format(path), - abort=True - ) - mapping_file = open(path, 'w') + click.confirm("Mapping file {} not found, create it?:".format(path), abort=True) + mapping_file = open(path, "w") mapping_writer = csv.writer(mapping_file) - mapping_writer.writerow([ - 'FROM category', - 'FROM activity', - 'TO Customer', - 'TO Project', - 'TO Activity', - 'TO Tag', - 'TO Note' - ]) + mapping_writer.writerow( + [ + "FROM category", + "FROM activity", + "TO Customer", + "TO Project", + "TO Activity", + "TO Tag", + "TO Note", + ] + ) - results = get_activities(category_search=category_search) + activities = HamsterActivity.select(HamsterActivity, HamsterCategory).join( + HamsterCategory, JOIN.LEFT_OUTER + ) - for r in results: - mapping_writer.writerow([ - r[2], r[1] - ]) + for a in activities: + mapping_writer.writerow( + [a.category.name if a.category_id != -1 else "", a.name] + ) mapping_file.close() return open(path) @kimai.command() -@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) -@click.argument('username') -@click.argument('api_key') -@click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors') -@click.option('--ignore-activities', is_flag=True, help='Ignore missing activities') -def sync(username, api_key, just_errors, ignore_activities, mapping_path=None): +@click.option( + "--mapping-path", + help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", + multiple=True, +) +@click.argument("username") +@click.argument("api_key", envvar="KIMAI_API_KEY") +@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors") +@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities") +def check(username, api_key, just_errors, ignore_activities, mapping_path=None): """ - Download customer / project / activity data from Kimai + Check customer / project / activity data from Kimai """ - kimai_api_url = 'https://kimai.autonomic.zone/api' + kimai_api_url = "https://kimai.autonomic.zone/api" - if type(mapping_path) == tuple: - mapping_files = [] - for mapping_path_item in mapping_path: - mapping_file = _get_kimai_mapping_file(mapping_path_item) - next(mapping_file) - mapping_files.append(mapping_file) - mapping_reader = csv.reader(chain(*mapping_files)) - else: - if mapping_path is None: - mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' - mapping_file = _get_kimai_mapping_file(mapping_path) - mapping_reader = csv.reader(mapping_file) + if len(mapping_path) == 0: + mapping_path = (HAMSTER_DIR / "mapping.kimai.csv",) + + mapping_files = [] + for mapping_path_item in mapping_path: + if not Path(mapping_path_item).exists(): + raise click.UsageError(f"{mapping_path_item} does not exist") + + mapping_file = _get_kimai_mapping_file(mapping_path_item) + next(mapping_file) + mapping_files.append(mapping_file) + + mapping_reader = csv.reader(chain(*mapping_files)) next(mapping_reader) - mapping_data = [ - [row[2], row[3], row[4]] - for row in mapping_reader - ] + mapping_data = [[row[2], row[3], row[4]] for row in mapping_reader] mapping_file.close() - auth_headers = { - 'X-AUTH-USER': username, - 'X-AUTH-TOKEN': api_key - } + auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key} customers = requests.get( - f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json() + f"{kimai_api_url}/customers?visible=3", headers=auth_headers + ).json() projects = requests.get( - f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json() + f"{kimai_api_url}/projects?visible=3", headers=auth_headers + ).json() activities = requests.get( - f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json() + f"{kimai_api_url}/activities?visible=3", headers=auth_headers + ).json() found_customers = [] found_projects = [] @@ -484,94 +402,115 @@ def sync(username, api_key, just_errors, ignore_activities, mapping_path=None): for row in mapping_data: # Check if each mapping still exists in Kimai - - matching_customers = list( - filter(lambda x: x['name'] == row[0], customers)) + matching_customers = list(filter(lambda x: x["name"] == row[0], customers)) if row[0] in found_customers: just_errors or click.secho( - "Skipping existing customer '{0}'".format(row[0]), fg='green') + "Skipping existing customer '{0}'".format(row[0]), fg="green" + ) else: if len(matching_customers) > 1: click.secho( - "More than one match for customer '{0}'".format(row[0]), fg='red') + "More than one match for customer '{0}'".format(row[0]), fg="red" + ) continue elif len(matching_customers) < 1: - click.secho("Missing customer '{0}'".format( - row[0]), fg='yellow') + click.secho("Missing customer '{0}'".format(row[0]), fg="yellow") continue else: just_errors or click.secho( - "Found customer '{0}'".format(row[0]), fg='green') + "Found customer '{0}'".format(row[0]), fg="green" + ) found_customers.append(row[0]) - project_str = ':'.join(row[0:2]) - matching_projects = list(filter( - lambda x: x['name'] == row[1] and - x['customer'] == matching_customers[0]['id'], - projects) + project_str = ":".join(row[0:2]) + matching_projects = list( + filter( + lambda x: x["name"] == row[1] + and x["customer"] == matching_customers[0]["id"], + projects, + ) ) if project_str in found_projects: just_errors or click.secho( - "Skipping existing project '{0}'".format(project_str), fg='green') + "Skipping existing project '{0}'".format(project_str), fg="green" + ) else: if len(matching_projects) > 1: - click.secho("More than one match for project '{0}'".format( - project_str), fg='red') + click.secho( + "More than one match for project '{0}'".format(project_str), + fg="red", + ) continue elif len(matching_projects) < 1: - click.secho("Missing project '{0}'".format( - project_str), fg='yellow') + click.secho("Missing project '{0}'".format(project_str), fg="yellow") continue else: just_errors or click.secho( - "Found project '{0}'".format(project_str), fg='green') + "Found project '{0}'".format(project_str), fg="green" + ) found_projects.append(project_str) if ignore_activities: continue - activity_str = ':'.join(row) + activity_str = ":".join(row) if activity_str in found_activities: just_errors or click.secho( - "Skipping existing activity '{0}'".format(activity_str), fg='green') + "Skipping existing activity '{0}'".format(activity_str), fg="green" + ) else: - matching_activities = list(filter( - lambda x: x['name'] == row[2] - and x['project'] == matching_projects[0]['id'], - activities - )) + matching_activities = list( + filter( + lambda x: x["name"] == row[2] + and x["project"] == matching_projects[0]["id"], + activities, + ) + ) if len(matching_activities) > 1: - click.secho("More than one match for activity '{0}'".format( - activity_str), fg='red') + click.secho( + "More than one match for activity '{0}'".format(activity_str), + fg="red", + ) elif len(matching_activities) < 1: - click.secho("Missing activity '{0}'".format( - activity_str), fg='yellow') + click.secho("Missing activity '{0}'".format(activity_str), fg="yellow") else: just_errors or click.secho( - "Found activity '{0}'".format(activity_str), fg='green') + "Found activity '{0}'".format(activity_str), fg="green" + ) found_activities.append(activity_str) -@kimai.command('import') -@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) -@click.option('--output', help='Output file (default kimai.csv)') -@click.option('--category-search', help='Category search string') -@click.option('--after', help='Only show time entries after this date') -@click.option('--show-missing', help='Just report on the missing entries', is_flag=True) -@click.argument('username') -def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False): +@kimai.command("csv") +@click.option( + "--mapping-path", + help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", + multiple=True, +) +@click.option("--output", help="Output file (default kimai.csv)") +@click.option("--category-search", help="Category search string") +@click.option("--after", help="Only show time entries after this date") +@click.option("--show-missing", help="Just report on the missing entries", is_flag=True) +@click.argument("username") +def _csv( + username, + mapping_path=None, + output=None, + category_search=None, + after=None, + show_missing=False, +): """ Export time tracking data in Kimai format """ if mapping_path is None: - mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' + mapping_path = HAMSTER_DIR / "mapping.kimai.csv" if output is None: - timestamp = datetime.now().strftime('%F') - output = f'kimai_{timestamp}.csv' + timestamp = datetime.now().strftime("%F") + output = f"kimai_{timestamp}.csv" if type(mapping_path) == tuple: mapping_files = [] @@ -586,7 +525,7 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte mapping_reader = csv.reader(mapping_file) mapping = { - '{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4], row[5]] + "{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]] for row in mapping_reader } @@ -596,12 +535,18 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte else: mapping_file.close() - output_file = open(output, 'w') + output_file = open(output, "w") output_writer = csv.writer(output_file) args = [] - sql = ''' + facts = ( + HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory) + .join(HamsterActivity) + .join(HamsterCategory, JOIN.LEFT_OUTER) + ) + + sql = """ SELECT facts.id, facts.start_time, facts.end_time, facts.description, activities.id, activities.name, @@ -611,98 +556,260 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte LEFT JOIN activities ON facts.activity_id = activities.id LEFT JOIN - categories ON activities.category_id = categories.id ''' + categories ON activities.category_id = categories.id """ if category_search is not None: - sql = sql + " WHERE categories.name LIKE ?" - category_search = '%{0}%'.format(category_search) - args.append(category_search) + facts = facts.where(HamsterCategory.name.contains(category_search)) if after is not None: - if category_search is not None: - sql = sql + ' AND ' - else: - sql = sql + ' WHERE ' - sql = sql + f"DATE(facts.start_time) > DATE(?)" - args.append(after) - - results = c.execute(sql, args) - results = c.fetchall() + facts = facts.where(HamsterFact.start_time >= after) if not show_missing: - output_writer.writerow([ - "Date", - "From", - "To", - "Duration", - "Rate", - "User", - "Customer", - "Project", - "Activity", - "Description", - "Exported", - "Tags", - "HourlyRate", - "FixedRate", - "InternalRate" - ]) + output_writer.writerow( + [ + "Date", + "From", + "To", + "Duration", + "Rate", + "User", + "Customer", + "Project", + "Activity", + "Description", + "Exported", + "Tags", + "HourlyRate", + "FixedRate", + "InternalRate", + ] + ) - for fact in results: - k = '{0}:{1}'.format(fact[6], fact[5]) + for fact in facts: + k = f"{fact.activity.category.name}:{fact.activity.name}" try: mapping_ = mapping[k] except KeyError: if show_missing: - output_writer.writerow([fact[6], fact[5]]) - click.secho( - "Can't find mapping for '{0}', skipping".format(k), fg='yellow') + output_writer.writerow( + [fact.activity.category.name, fact.activity.name] + ) + click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow") continue if show_missing: continue - if fact[1] is None or fact[2] is None: - click.secho("Missing duration data '{0}-{1}', skipping".format( - fact[1], - fact[2] - ), fg='yellow') + if fact.start_time is None or fact.end_time is None: + click.secho( + f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping", + fg="yellow", + ) continue if len(mapping_) < 5: mapping_.append(None) date_start, date_end = ( - datetime.strptime(fact[2].split('.')[0], '%Y-%m-%d %H:%M:%S'), - datetime.strptime(fact[1].split('.')[0], '%Y-%m-%d %H:%M:%S') + datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"), + datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"), ) - duration = ( - date_start - date_end - ).seconds / 3600 + duration = (date_start - date_end).seconds / 3600 - output_writer.writerow([ - date_start.strftime('%Y-%m-%d'), - date_start.strftime('%H:%M'), - '', # To (time) - duration, - '', # Rate - username, - mapping_[0], - mapping_[1], - mapping_[2], - fact[3] or mapping_[4] or '', - '0', # Exported - mapping_[3], - '', # Hourly rate - '', # Fixed rate - ]) + output_writer.writerow( + [ + date_start.strftime("%Y-%m-%d"), + date_start.strftime("%H:%M"), + "", # To (time) + duration, + "", # Rate + username, + mapping_[0], + mapping_[1], + mapping_[2], + fact.description or mapping_[4] or "", + "0", # Exported + mapping_[3], + "", # Hourly rate + "", # Fixed rate + ] + ) output_file.close() +@kimai.command("import") +@click.argument("search") +@click.argument("after") +@click.argument("before") +def _import(search, after, before): + api = KimaiAPI() + + SEARCH = "auto" + + facts = ( + HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory) + .join(HamsterActivity, JOIN.LEFT_OUTER) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .where( + (HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d")) + & (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d")) + & HamsterCategory.name.contains(SEARCH) + ) + ) + + has_errors = False + + # check data + for f in facts: + mappings = f.activity.mappings + if len(mappings) == 0: + print( + f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping" + ) + has_errors = True + continue + if len(mappings) > 1: + print( + f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings" + ) + has_errors = True + continue + if ( + mappings[0].kimai_activity.project is None + and not mappings[0].kimai_project.allow_global_activities + ): + click.secho( + f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red", + ) + has_errors = True + continue + if f.imports.count() > 0: + click.secho( + f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)", + fg="yellow", + ) + continue + + if has_errors: + sys.exit(1) + + # upload data + for f in facts: + try: + mapping = f.activity.mappings[0] + except IndexError: + print( + f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})" + ) + continue + + if f.imports.count() > 0: + print( + f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})" + ) + continue + + t = Timesheet( + api, + activity=mapping.kimai_activity, + project=mapping.kimai_project, + begin=f.start_time, + end=f.end_time, + description=f.description + if f.description != "" + else mapping.kimai_description, + # tags=f.tags if f.tags != '' else mapping.kimai_tags + ) + r = t.upload().json() + if r.get("code", 200) != 200: + print(r) + print(f"{f.id} ({f.activity.category.name} » {f.activity.name})") + from pdb import set_trace + + set_trace() + + else: + HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save() + print(f'Created Kimai timesheet {r["id"]}') + + +@kimai.group("db") +def db_(): + pass + + +@db_.command() +def init(): + db.create_tables( + [ + KimaiCustomer, + KimaiProject, + KimaiActivity, + HamsterActivityKimaiMapping, + HamsterFactKimaiImport, + ] + ) + + +@db_.command() +def reset(): + HamsterActivityKimaiMapping.delete().execute() + + +@db_.command("sync") +def kimai_db_sync(): + sync() + + +@db_.command() +@click.option( + "-g", + "--global", + "global_", + help="Does this file contain mappings to global activties", + is_flag=True, +) +@click.option("--mapping-path", help="Mapping file") +def mapping2db(mapping_path=None, global_=False): + mapping_file = _get_kimai_mapping_file(mapping_path, None) + next(mapping_file) + mapping_reader = csv.reader(mapping_file) + + for row in mapping_reader: + hamster_category = HamsterCategory.get(name=row[0]) + hamster_activity = HamsterActivity.get( + name=row[1], category_id=hamster_category.id + ) + kimai_customer = KimaiCustomer.get(name=row[2]) + kimai_project = KimaiProject.get(name=row[3], customer_id=kimai_customer.id) + + try: + kimai_activity = KimaiActivity.get(name=row[4], project_id=kimai_project.id) + except KimaiActivity.DoesNotExist: + kimai_activity = KimaiActivity.get(name=row[4], project_id=None) + + HamsterActivityKimaiMapping.create( + hamster_activity=hamster_activity, + kimai_customer=kimai_customer, + kimai_project=kimai_project, + kimai_activity=kimai_activity, + kimai_description=row[6], + kimai_tags=row[5], + ) + + +@cli.command() +def app(): + from .app import HamsterToolsApp + + app = HamsterToolsApp() + app.run() + + @cli.command() def hamster(): - click.echo('🐹') + click.echo("🐹") if __name__ == "__main__": diff --git a/hamstertools/app.py b/hamstertools/app.py new file mode 100644 index 0000000..3e036e7 --- /dev/null +++ b/hamstertools/app.py @@ -0,0 +1,40 @@ +from textual.app import App + +from .db import db +from .kimaiapi import KimaiAPI + +from .screens.hamster import HamsterScreen +from .screens.kimai import KimaiScreen + + +class HamsterToolsApp(App): + CSS_PATH = "app.tcss" + + BINDINGS = [ + ("h", "switch_mode('hamster')", "Hamster"), + ("k", "switch_mode('kimai')", "Kimai"), + ("q", "quit", "Quit"), + ] + + api_ = None + + @property + def api(self) -> KimaiAPI: + if self.api_ is None: + self.api_ = KimaiAPI() + return self.api_ + + def __init__(self): + self.MODES = { + "hamster": HamsterScreen(), + "kimai": KimaiScreen(), + } + + super().__init__() + + def on_mount(self) -> None: + self.switch_mode("hamster") + + def action_quit(self) -> None: + db.close() + self.exit() diff --git a/hamstertools/app.tcss b/hamstertools/app.tcss new file mode 100644 index 0000000..43f9aca --- /dev/null +++ b/hamstertools/app.tcss @@ -0,0 +1,86 @@ +DataTable { + height: 90%; +} + +DataTable .datatable--cursor { + background: grey; +} + +DataTable:focus .datatable--cursor { + background: orange; +} + +#filter { + display: none; +} + +#filter Input { + width: 50%; +} + +ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen { + align: center middle; +} + +ActivityEditScreen > Vertical, +ActivityMappingScreen > Vertical { + padding: 0 1; + width: 80; + height: 30; + border: thick $background 80%; + background: $surface; +} + +ActivityEditScreen > Vertical { + height: 10; +} + +ActivityMappingScreen Horizontal { + align: left middle; + width: auto; +} + +ActivityEditScreen Horizontal { + width: 80; +} + +ActivityEditScreen Label, +ActivityMappingScreen Label { + padding: 0 1; + width: auto; + border: blank; +} + +ActivityMappingScreen AutoComplete { + width: 80; +} + +#description, #tags { + width: 30; +} + +ActivityEditScreen Input { + width: 60; +} + +#dialog { + grid-size: 2; + grid-gutter: 1 2; + grid-rows: 1fr 3; + padding: 0 1; + width: 60; + height: 11; + border: thick $background 80%; + background: $surface; +} + +#question { + column-span: 2; + height: 1fr; + width: 1fr; + content-align: center middle; +} + +Button { + width: 100%; +} diff --git a/hamstertools/db.py b/hamstertools/db.py new file mode 100644 index 0000000..b0bc465 --- /dev/null +++ b/hamstertools/db.py @@ -0,0 +1,95 @@ +from datetime import datetime + +from peewee import ( + SqliteDatabase, + Model, + CharField, + ForeignKeyField, + DateTimeField, + SmallIntegerField, + BooleanField, +) + + +db = SqliteDatabase(None) + + +class HamsterCategory(Model): + name = CharField() + + class Meta: + database = db + table_name = "categories" + + +class HamsterActivity(Model): + name = CharField() + category = ForeignKeyField(HamsterCategory, backref="activities") + + class Meta: + database = db + table_name = "activities" + + +class HamsterFact(Model): + activity = ForeignKeyField(HamsterActivity, backref="facts") + start_time = DateTimeField() + end_time = DateTimeField(null=True) + description = CharField() + + class Meta: + database = db + table_name = "facts" + + +class KimaiCustomer(Model): + visible = BooleanField(default=True) + name = CharField() + + class Meta: + database = db + table_name = "kimai_customers" + + +class KimaiProject(Model): + name = CharField() + customer = ForeignKeyField(KimaiCustomer, backref="projects") + visible = BooleanField(default=True) + allow_global_activities = BooleanField(default=True) + + class Meta: + database = db + table_name = "kimai_projects" + + +class KimaiActivity(Model): + name = CharField() + project = ForeignKeyField(KimaiProject, backref="activities", null=True) + visible = BooleanField(default=True) + + class Meta: + database = db + table_name = "kimai_activities" + + +class HamsterActivityKimaiMapping(Model): + hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings") + kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings") + kimai_project = ForeignKeyField(KimaiProject, backref="mappings") + kimai_activity = ForeignKeyField(KimaiActivity, backref="mappings") + kimai_description = CharField() + kimai_tags = CharField() + + class Meta: + database = db + table_name = "hamster_kimai_mappings" + + +class HamsterFactKimaiImport(Model): + hamster_fact = ForeignKeyField(HamsterFact, backref="imports") + kimai_id = SmallIntegerField() + imported = DateTimeField(default=datetime.now) + + class Meta: + database = db + table_name = "hamster_fact_kimai_imports" diff --git a/hamstertools/kimaiapi.py b/hamstertools/kimaiapi.py new file mode 100644 index 0000000..2b49900 --- /dev/null +++ b/hamstertools/kimaiapi.py @@ -0,0 +1,224 @@ +from datetime import datetime +import requests +import requests_cache +import os + +from dataclasses import dataclass, field + + +class NotFound(Exception): + pass + + +class KimaiAPI(object): + # temporary hardcoded config + KIMAI_API_URL = "https://kimai.autonomic.zone/api" + # KIMAI_API_URL = "https://kimaitest.autonomic.zone/api" + + KIMAI_USERNAME = "3wordchant" + KIMAI_API_KEY = os.environ["KIMAI_API_KEY"] + + auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY} + + def __init__(self): + # requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800) + self.customers_json = self.get("customers", {"visible": 3}) + self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1}) + self.activities_json = self.get("activities", {"visible": 3}) + self.user_json = self.get("users/me") + + def get(self, endpoint, params=None): + result = requests.get( + f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers + ).json() + try: + if result["code"] != 200: + raise NotFound() + except (KeyError, TypeError): + pass + return result + + def post(self, endpoint, data): + return requests.post( + f"{self.KIMAI_API_URL}/{endpoint}", json=data, headers=self.auth_headers + ) + + +class BaseAPI(object): + def __init__(self, api, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + +@dataclass +class Customer(BaseAPI): + api: KimaiAPI = field(repr=False) + id: int + name: str + visible: bool = field(default=True) + + @staticmethod + def list(api): + return [ + Customer(api, c["id"], c["name"], c["visible"]) for c in api.customers_json + ] + + @staticmethod + def get_by_id(api, id): + for c in api.customers_json: + if c["id"] == id: + return Customer(api, c["id"], c["name"], c["visible"]) + raise NotFound() + + +@dataclass +class Project(BaseAPI): + api: KimaiAPI = field(repr=False) + id: int + name: str + customer: Customer + allow_global_activities: bool = field(default=True) + visible: bool = field(default=True) + + @staticmethod + def list(api): + return [ + Project( + api, + p["id"], + p["name"], + Customer.get_by_id(api, p["customer"]), + p["globalActivities"], + p["visible"], + ) + for p in api.projects_json + ] + + @staticmethod + def get_by_id(api, id, none=False): + for p in api.projects_json: + if p["id"] == id: + return Project( + api, + p["id"], + p["name"], + Customer.get_by_id(api, p["customer"]), + p["globalActivities"], + p["visible"], + ) + if not none: + raise NotFound() + + +@dataclass +class Activity(BaseAPI): + api: KimaiAPI = field(repr=False) + id: int + name: str + project: Project + visible: bool = field(default=True) + + @staticmethod + def list(api): + return [ + Activity( + api, + a["id"], + a["name"], + Project.get_by_id(api, a["project"], none=True), + a["visible"], + ) + for a in api.activities_json + ] + + @staticmethod + def get_by_id(api, id, none=False): + for a in api.activities_json: + if a["id"] == id: + return Activity( + api, + a["id"], + a["name"], + Project.get_by_id(api, a["project"], none), + a["visible"], + ) + if not none: + raise NotFound() + + +@dataclass +class Timesheet(BaseAPI): + api: KimaiAPI = field(repr=False) + activity: Activity + project: Project + begin: datetime + end: datetime + id: int = field(default=None) + description: str = field(default="") + tags: str = field(default="") + + @staticmethod + def list(api): + return [ + Timesheet( + api, + Activity.get_by_id(api, t["activity"], none=True), + Project.get_by_id(api, t["project"], none=True), + t["begin"], + t["end"], + t["id"], + t["description"], + t["tags"], + ) + for t in api.get( + "timesheets", + ) + ] + + @staticmethod + def list_by(api, **kwargs): + kwargs["size"] = 10000 + return [ + Timesheet( + api, + Activity.get_by_id(api, t["activity"], none=True), + Project.get_by_id(api, t["project"], none=True), + t["begin"], + t["end"], + t["id"], + t["description"], + t["tags"], + ) + for t in api.get("timesheets", params=kwargs) + ] + + @staticmethod + def get_by_id(api, id, none=False): + t = api.get( + f"timesheets/{id}", + ) + return Timesheet( + api, + Activity.get_by_id(api, t["activity"], none=True), + Project.get_by_id(api, t["project"], none=True), + t["begin"], + t["end"], + t["id"], + t["description"], + t["tags"], + ) + + def upload(self): + return self.api.post( + "timesheets", + { + "begin": self.begin.isoformat(), + "end": self.end.isoformat(), + "project": self.project.id, + "activity": self.activity.id, + "description": self.description, + # FIXME: support multiple users + # "user": self., + "tags": self.tags, + }, + ) diff --git a/hamstertools/screens/__init__.py b/hamstertools/screens/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hamstertools/screens/hamster.py b/hamstertools/screens/hamster.py new file mode 100644 index 0000000..313bd8b --- /dev/null +++ b/hamstertools/screens/hamster.py @@ -0,0 +1,610 @@ +from datetime import datetime + +from textual import on +from textual.app import ComposeResult +from textual.binding import Binding +from textual.coordinate import Coordinate +from textual.containers import Horizontal, Vertical, Grid +from textual.events import DescendantBlur +from textual.screen import Screen, ModalScreen +from textual.widgets import ( + Header, + Footer, + DataTable, + Input, + Label, + Checkbox, + TabbedContent, + TabPane, + Button +) + +from peewee import fn, JOIN + +from textual_autocomplete import AutoComplete, Dropdown, DropdownItem + +from ..db import ( + HamsterCategory, + HamsterActivity, + HamsterFact, + KimaiProject, + KimaiCustomer, + KimaiActivity, + HamsterActivityKimaiMapping, +) + +from .list import ListPane + + +class ActivityEditScreen(ModalScreen): + BINDINGS = [ + ("escape", "cancel", "Cancel"), + ("ctrl+s", "save", "Save"), + ] + + category_id = None + category_name = "" + + def _get_categories(self, input_state): + categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()] + return ActivityMappingScreen._filter_dropdowns(categories, input_state.value) + + def __init__(self, category, activity): + if category is not None: + self.category_id = category.id + self.category_name = category.name + self.activity_name = activity.name + super().__init__() + + def compose(self) -> ComposeResult: + yield Vertical( + Horizontal( + Label("Category:"), + AutoComplete( + Input( + placeholder="Type to search...", + id="category", + value=self.category_name, + ), + Dropdown(items=self._get_categories), + ), + ), + Horizontal( + Label("Activity:"), Input(value=self.activity_name, id="activity") + ), + ) + + @on(Input.Submitted, "#category") + def category_submitted(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.category_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + self.query_one("#activity").focus() + + @on(DescendantBlur, "#category") + def category_blur(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.category_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + + def action_cancel(self): + self.dismiss(None) + + def action_save(self): + self.dismiss( + { + "category": self.category_id, + "activity": self.query_one("#activity").value, + } + ) + + +class ActivityMappingScreen(ModalScreen): + BINDINGS = [ + ("ctrl+g", "global", "Toggle global"), + ("ctrl+s", "save", "Save"), + ("escape", "cancel", "Cancel"), + ] + + customer_id = None + project_id = None + activity_id = None + customer = "" + project = "" + activity = "" + description = "" + tags = "" + + def __init__(self, category, activity, mapping=None): + self.hamster_category = category + self.hamster_activity = activity + + if mapping is not None: + self.customer_id = mapping.kimai_customer_id + self.customer = mapping.kimai_customer.name + self.project_id = mapping.kimai_project_id + self.project = mapping.kimai_project.name + self.activity_id = mapping.kimai_activity_id + self.activity = mapping.kimai_activity.name + self.description = mapping.kimai_description + self.tags = mapping.kimai_tags + + super().__init__() + + @staticmethod + def _filter_dropdowns(options, value): + matches = [c for c in options if value.lower() in c.main.plain.lower()] + return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower())) + + def _get_customers(self, input_state): + customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()] + return ActivityMappingScreen._filter_dropdowns(customers, input_state.value) + + def _get_projects(self, input_state): + projects = [ + DropdownItem(p.name, str(p.id)) + for p in KimaiProject.select().where( + KimaiProject.customer_id == self.customer_id + ) + ] + return ActivityMappingScreen._filter_dropdowns(projects, input_state.value) + + def _get_activities(self, input_state): + activities = KimaiActivity.select() + + if self.query_one("#global").value: + activities = activities.where( + KimaiActivity.project_id.is_null(), + ) + else: + activities = activities.where(KimaiActivity.project_id == self.project_id) + + return ActivityMappingScreen._filter_dropdowns( + [DropdownItem(a.name, str(a.id)) for a in activities], input_state.value + ) + + def compose(self) -> ComposeResult: + yield Vertical( + Horizontal( + Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"), + ), + Horizontal( + Label("Customer"), + AutoComplete( + Input( + placeholder="Type to search...", + id="customer", + value=self.customer, + ), + Dropdown(items=self._get_customers), + ), + ), + Horizontal( + Label("Project"), + AutoComplete( + Input( + placeholder="Type to search...", + id="project", + value=self.project, + ), + Dropdown(items=self._get_projects), + ), + ), + Horizontal( + Label("Activity"), + AutoComplete( + Input( + placeholder="Type to search...", + id="activity", + value=self.activity, + ), + Dropdown(items=self._get_activities), + ), + ), + Horizontal( + Label("Description"), + Input(id="description", value=self.description), + ), + Horizontal( + Label("Tags"), + Input(id="tags", value=self.tags), + ), + Horizontal(Checkbox("Global", id="global")), + ) + + @on(Input.Submitted, "#customer") + def customer_submitted(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.customer_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + self.query_one("#project").focus() + + @on(DescendantBlur, "#customer") + def customer_blur(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.customer_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + + @on(Input.Submitted, "#project") + def project_submitted(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) + self.query_one("#activity").focus() + + @on(DescendantBlur, "#project") + def project_blur(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) + + @on(Input.Submitted, "#activity") + def activity_submitted(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.activity_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + self.query_one("#activity").focus() + + @on(DescendantBlur, "#activity") + def activity_blur(self, event): + if event.control.parent.dropdown.selected_item is not None: + self.activity_id = str( + event.control.parent.dropdown.selected_item.left_meta + ) + + def action_global(self): + self.query_one("#global").value = not self.query_one("#global").value + + def action_save(self): + self.dismiss( + { + "kimai_customer_id": self.customer_id, + "kimai_project_id": self.project_id, + "kimai_activity_id": self.activity_id, + "kimai_description": self.query_one("#description").value, + "kimai_tags": self.query_one("#tags").value, + "global": self.query_one("#global").value, + } + ) + + def action_cancel(self): + self.dismiss(None) + + +class ActivityDeleteConfirmScreen(ModalScreen): + BINDINGS = [ + ("escape", "cancel", "Cancel"), + ] + + def compose(self) -> ComposeResult: + yield Grid( + Label("Are you sure you want to delete this activity?", id="question"), + Button("Confirm", variant="error", id="confirm"), + Button("Cancel", variant="primary", id="cancel"), + id="dialog", + ) + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "quit": + self.dismiss(True) + else: + self.dismiss(False) + + def action_cancel(self): + self.dismiss(False) + + +class ActivityList(ListPane): + BINDINGS = [ + ("s", "sort", "Sort"), + ("r", "refresh", "Refresh"), + ("/", "filter", "Search"), + ("d", "delete", "Delete"), + ("f", "move_facts", "Move"), + ("e", "edit", "Edit"), + ("m", "mapping", "Mapping"), + Binding(key="escape", action="cancelfilter", show=False), + ] + + def _refresh(self): + self.table.clear() + + facts_count_query = ( + HamsterFact.select( + HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count") + ) + .group_by(HamsterFact.activity_id) + .alias("facts_count_query") + ) + + mappings_count_query = ( + HamsterActivityKimaiMapping.select( + HamsterActivityKimaiMapping.hamster_activity_id, + fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"), + ) + .group_by(HamsterActivityKimaiMapping.hamster_activity_id) + .alias("mappings_count_query") + ) + + activities = ( + HamsterActivity.select( + HamsterActivity, + HamsterCategory.id, + HamsterFact.start_time, + fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), + fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"), + fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias( + "mappings_count" + ), + ) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .switch(HamsterActivity) + .join(HamsterFact, JOIN.LEFT_OUTER) + .switch(HamsterActivity) + .join( + facts_count_query, + JOIN.LEFT_OUTER, + on=(HamsterActivity.id == facts_count_query.c.activity_id), + ) + .switch(HamsterActivity) + .join( + mappings_count_query, + JOIN.LEFT_OUTER, + on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id), + ) + .group_by(HamsterActivity) + ) + + filter_search = self.query_one("#filter #search").value + if filter_search is not None: + activities = activities.where( + HamsterActivity.name.contains(filter_search) + | HamsterCategory.name.contains(filter_search) + ) + + filter_date = self.query_one("#filter #date").value + if filter_date is not None: + try: + activities = activities.where( + HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") + ) + except ValueError: + pass + + self.table.add_rows( + [ + [ + activity.category_id, + activity.category_name, + activity.id, + activity.name, + activity.facts_count, + activity.mappings_count, + ] + for activity in activities + ] + ) + + self.table.sort(*self.sort) + + def on_mount(self) -> None: + self.table = self.query_one(DataTable) + self.table.cursor_type = "row" + self.columns = self.table.add_columns( + "category id", "category", "activity id", "activity", "entries", "mappings" + ) + self.sort = (self.columns[1], self.columns[3]) + self._refresh() + + def action_delete(self) -> None: + # get the keys for the row and column under the cursor. + row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) + + activity_id = self.table.get_cell_at( + Coordinate(self.table.cursor_coordinate.row, 2), + ) + + activity = HamsterActivity.get(id=activity_id) + + def check_delete(delete: bool) -> None: + """Called when QuitScreen is dismissed.""" + if delete: + activity.delete_instance() + + # supply the row key to `remove_row` to delete the row. + self.table.remove_row(row_key) + + if activity.facts.count() > 0: + self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete) + else: + check_delete(True) + + def action_move_facts(self) -> None: + row_idx: int = self.table.cursor_row + row_cells = self.table.get_row_at(row_idx) + self.move_from_activity = HamsterActivity.get(id=row_cells[2]) + for col_idx, cell_value in enumerate(row_cells): + cell_coordinate = Coordinate(row_idx, col_idx) + self.table.update_cell_at( + cell_coordinate, + f"[red]{cell_value}[/red]", + ) + self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1) + + def on_data_table_row_selected(self, event): + if getattr(self, "move_from_activity", None) is not None: + move_to_activity = HamsterActivity.get( + self.table.get_cell_at(Coordinate(event.cursor_row, 2)) + ) + HamsterFact.update({HamsterFact.activity: move_to_activity}).where( + HamsterFact.activity == self.move_from_activity + ).execute() + self._refresh() + del self.move_from_activity + + def action_edit(self): + row_idx: int = self.table.cursor_row + row_cells = self.table.get_row_at(row_idx) + + try: + category = HamsterCategory.get(id=row_cells[0]) + except HamsterCategory.DoesNotExist: + category = None + activity = HamsterActivity.get(id=row_cells[2]) + + def handle_edit(properties): + if properties is None: + return + activity.name = properties["activity"] + activity.category_id = properties["category"] + activity.save() + self._refresh() + + self.app.push_screen( + ActivityEditScreen(category=category, activity=activity), handle_edit + ) + + def action_mapping(self): + selected_activity = ( + HamsterActivity.select( + HamsterActivity, + fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), + ) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .where( + HamsterActivity.id + == self.table.get_cell_at( + Coordinate(self.table.cursor_coordinate.row, 2), + ) + ) + .get() + ) + + mapping = None + try: + mapping = HamsterActivityKimaiMapping.get( + hamster_activity=selected_activity + ) + except HamsterActivityKimaiMapping.DoesNotExist: + pass + + def handle_mapping(mapping_data): + if mapping_data is None: + return + if mapping is not None: + mapping_ = mapping + for key, value in mapping_data.items(): + setattr(mapping_, key, value) + else: + mapping_ = HamsterActivityKimaiMapping.create( + hamster_activity=selected_activity, **mapping_data + ) + mapping_.save() + self._refresh() + + self.app.push_screen( + ActivityMappingScreen( + category=selected_activity.category_name, + activity=selected_activity.name, + mapping=mapping, + ), + handle_mapping, + ) + + +class CategoryList(ListPane): + BINDINGS = [ + ("s", "sort", "Sort"), + ("r", "refresh", "Refresh"), + ("/", "filter", "Search"), + ("d", "delete", "Delete category"), + Binding(key="escape", action="cancelfilter", show=False), + ] + + def _refresh(self): + self.table.clear() + + categories = ( + HamsterCategory.select( + HamsterCategory, + fn.Count(HamsterActivity.id).alias("activities_count"), + HamsterFact.start_time, + ) + .join(HamsterActivity, JOIN.LEFT_OUTER) + .join(HamsterFact, JOIN.LEFT_OUTER) + .group_by(HamsterCategory) + ) + + filter_search = self.query_one("#filter #search").value + if filter_search is not None: + categories = categories.where(HamsterCategory.name.contains(filter_search)) + + filter_date = self.query_one("#filter #date").value + if filter_date is not None: + try: + categories = categories.where( + HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") + ) + except ValueError: + pass + + self.table.add_rows( + [ + [ + category.id, + category.name, + category.activities_count, + ] + for category in categories + ] + ) + + self.table.sort(self.sort) + + def on_mount(self) -> None: + self.table = self.query_one(DataTable) + self.table.cursor_type = "row" + self.columns = self.table.add_columns("category id", "category", "activities") + self.sort = self.columns[1] + self._refresh() + + def action_delete(self) -> None: + row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) + + category_id = self.table.get_cell_at( + Coordinate(self.table.cursor_coordinate.row, 0), + ) + category = HamsterCategory.get(id=category_id) + category.delete_instance() + + self.table.remove_row(row_key) + + +class HamsterScreen(Screen): + BINDINGS = [ + ("c", "show_tab('categories')", "Categories"), + ("a", "show_tab('activities')", "Activities"), + ] + + SUB_TITLE = "Hamster" + + def compose(self) -> ComposeResult: + yield Header() + with TabbedContent(initial="activities"): + with TabPane("Categories", id="categories"): + yield CategoryList() + with TabPane("Activities", id="activities"): + yield ActivityList() + yield Footer() + + def on_mount(self) -> None: + self.query_one("TabbedContent Tabs").can_focus = False + self.query_one("#activities DataTable").focus() + + def action_show_tab(self, tab: str) -> None: + """Switch to a new tab.""" + self.get_child_by_type(TabbedContent).active = tab + self.query_one(f"#{tab} DataTable").focus() diff --git a/hamstertools/screens/kimai.py b/hamstertools/screens/kimai.py new file mode 100644 index 0000000..fb09f00 --- /dev/null +++ b/hamstertools/screens/kimai.py @@ -0,0 +1,193 @@ +from textual.app import ComposeResult +from textual.coordinate import Coordinate +from textual.binding import Binding +from textual.screen import Screen +from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer + +from peewee import fn, JOIN + +from ..utils import truncate +from ..sync import sync +from ..db import ( + KimaiProject, + KimaiCustomer, + KimaiActivity, +) +from ..kimaiapi import Timesheet as KimaiAPITimesheet + +from .list import ListPane + + +class KimaiCustomerList(ListPane): + pass + + +class KimaiProjectList(ListPane): + BINDINGS = [ + ("s", "sort", "Sort"), + ("r", "refresh", "Refresh"), + ("g", "get", "Get data"), + ("/", "filter", "Search"), + Binding(key="escape", action="cancelfilter", show=False), + ] + + def _refresh(self): + self.table.clear() + + filter_search = self.query_one("#filter #search").value + + projects = ( + KimaiProject.select( + KimaiProject, + KimaiCustomer.id, + fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"), + fn.Count(KimaiActivity.id).alias("activities_count"), + ) + .join(KimaiCustomer, JOIN.LEFT_OUTER) + .switch(KimaiProject) + .join(KimaiActivity, JOIN.LEFT_OUTER) + .where(KimaiActivity.project.is_null(False)) + .group_by(KimaiProject) + ) + + if filter_search: + projects = projects.where( + KimaiProject.name.contains(filter_search) + | KimaiCustomer.name.contains(filter_search) + ) + + self.table.add_rows( + [ + [ + str(project.customer_id) if project.customer_id is not None else "", + project.customer_name, + project.id, + project.name, + project.activities_count, + project.visible, + ] + for project in projects + ] + ) + + self.table.sort(*self.sort) + + def action_get(self) -> None: + sync() + self._refresh() + + def on_mount(self) -> None: + self.table = self.query_one(DataTable) + self.table.cursor_type = "row" + self.columns = self.table.add_columns( + "customer id", "customer", "project id", "project", "activities", "visible" + ) + self.sort = (self.columns[1], self.columns[3]) + self._refresh() + + +class KimaiActivityList(ListPane): + BINDINGS = [ + ("s", "sort", "Sort"), + ("r", "refresh", "Refresh"), + ("g", "get", "Get data"), + ("#", "count", "Count"), + ("/", "filter", "Search"), + Binding(key="escape", action="cancelfilter", show=False), + ] + + def _refresh(self): + self.table.clear() + + filter_search = self.query_one("#filter #search").value + + activities = ( + KimaiActivity.select( + KimaiActivity, + fn.COALESCE(KimaiProject.name, "").alias("project_name"), + fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"), + ) + .join(KimaiProject, JOIN.LEFT_OUTER) + .join(KimaiCustomer, JOIN.LEFT_OUTER) + .group_by(KimaiActivity) + ) + + if filter_search: + activities = activities.where(KimaiActivity.name.contains(filter_search)) + + self.table.add_rows( + [ + [ + # activity.project.customer_id if activity.project is not None else '', + # activity.customer_name, + str(activity.project_id) if activity.project_id is not None else "", + truncate(activity.project_name, 40), + activity.id, + truncate(activity.name, 40), + activity.visible, + "?", + ] + for activity in activities + ] + ) + + self.table.sort(*self.sort) + + def action_get(self) -> None: + sync() + self._refresh() + + def on_mount(self) -> None: + self.table = self.query_one(DataTable) + self.table.cursor_type = "row" + self.columns = self.table.add_columns( + # "customer id", + # "customer", + "project id", + "project", + "id", + "name", + "visible", + "times", + ) + self.sort = (self.columns[1], self.columns[3]) + self._refresh() + + def action_count(self) -> None: + row_idx: int = self.table.cursor_row + row_cells = self.table.get_row_at(row_idx) + + activity_id = row_cells[2] + count = len(KimaiAPITimesheet.list_by(self.app.api, activity=activity_id)) + + self.table.update_cell_at(Coordinate(row_idx, 5), count) + + +class KimaiScreen(Screen): + BINDINGS = [ + ("c", "show_tab('customers')", "Customers"), + ("p", "show_tab('projects')", "Projects"), + ("a", "show_tab('activities')", "Activities"), + ] + + SUB_TITLE = "Kimai" + + def compose(self) -> ComposeResult: + yield Header() + with TabbedContent(initial="projects"): + with TabPane("Customers", id="customers"): + yield KimaiCustomerList() + with TabPane("Projects", id="projects"): + yield KimaiProjectList() + with TabPane("Activities", id="activities"): + yield KimaiActivityList() + yield Footer() + + def on_mount(self) -> None: + self.query_one("TabbedContent Tabs").can_focus = False + self.query_one("#projects DataTable").focus() + + def action_show_tab(self, tab: str) -> None: + """Switch to a new tab.""" + self.get_child_by_type(TabbedContent).active = tab + self.query_one(f"#{tab} DataTable").focus() diff --git a/hamstertools/screens/list.py b/hamstertools/screens/list.py new file mode 100644 index 0000000..0ba6d6c --- /dev/null +++ b/hamstertools/screens/list.py @@ -0,0 +1,52 @@ +from datetime import datetime + +from textual import on +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical, Container +from textual.widgets import DataTable, Input + + +class ListPane(Container): + def compose(self) -> ComposeResult: + with Vertical(): + yield DataTable() + with Horizontal(id="filter"): + yield Input( + id="search", placeholder="Category/activity name contains text" + ) + yield Input( + id="date", + placeholder="After date, in {0} format".format( + datetime.now().strftime("%Y-%m-%d") + ), + ) + + def action_refresh(self) -> None: + self._refresh() + + def action_sort(self) -> None: + self.table.cursor_type = "column" + + def on_data_table_column_selected(self, event): + self.sort = (event.column_key,) + event.data_table.sort(*self.sort) + event.data_table.cursor_type = "row" + + def action_filter(self) -> None: + self.query_one("#filter").display = True + self._refresh() + self.query_one("#filter #search").focus() + + def on_input_submitted(self, event): + self.table.focus() + + def action_cancelfilter(self) -> None: + self.query_one("#filter").display = False + self.query_one("#filter #search").clear() + self.query_one("#filter #date").clear() + self.table.focus() + self._refresh() + + @on(Input.Changed, "#filter Input") + def filter(self, event): + self._refresh() diff --git a/hamstertools/sync.py b/hamstertools/sync.py new file mode 100644 index 0000000..fad0b42 --- /dev/null +++ b/hamstertools/sync.py @@ -0,0 +1,62 @@ +from .kimaiapi import ( + KimaiAPI, + Customer as KimaiAPICustomer, + Project as KimaiAPIProject, + Activity as KimaiAPIActivity, +) +from .db import ( + db, + KimaiProject, + KimaiCustomer, + KimaiActivity, +) + + +def sync() -> None: + api = KimaiAPI() + + KimaiCustomer.delete().execute() + KimaiProject.delete().execute() + KimaiActivity.delete().execute() + + customers = KimaiAPICustomer.list(api) + with db.atomic(): + KimaiCustomer.insert_many( + [ + { + "id": customer.id, + "name": customer.name, + "visible": customer.visible, + } + for customer in customers + ] + ).execute() + + projects = KimaiAPIProject.list(api) + with db.atomic(): + KimaiProject.insert_many( + [ + { + "id": project.id, + "name": project.name, + "customer_id": project.customer.id, + "allow_global_activities": project.allow_global_activities, + "visible": project.visible, + } + for project in projects + ] + ).execute() + + activities = KimaiAPIActivity.list(api) + with db.atomic(): + KimaiActivity.insert_many( + [ + { + "id": activity.id, + "name": activity.name, + "project_id": (activity.project.id if activity.project else None), + "visible": activity.visible, + } + for activity in activities + ] + ).execute() diff --git a/hamstertools/utils.py b/hamstertools/utils.py new file mode 100644 index 0000000..40e57cc --- /dev/null +++ b/hamstertools/utils.py @@ -0,0 +1,2 @@ +def truncate(string: str, length: int) -> str: + return string[: length - 2] + ".." if len(string) > 52 else string diff --git a/requirements.txt b/requirements.txt index 4525569..c66cd2b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ -click==8.0.3 requests==2.26.0 +peewee==3.17.0 +requests-cache==1.1.1 +textual==0.44.1 +textual-autocomplete==2.1.0b0 +textual-dev==1.2.1 diff --git a/scripts/apitest.py b/scripts/apitest.py new file mode 100644 index 0000000..99624e4 --- /dev/null +++ b/scripts/apitest.py @@ -0,0 +1,25 @@ +import os +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) + +from datetime import datetime, timedelta + +from hamstertools.kimai import KimaiAPI, Timesheet, Project, Activity + +api = KimaiAPI() + +# print(Timesheet.list(api)) + +t = Timesheet( + api, + activity=Activity.get_by_id(api, 613), + project=Project.get_by_id(api, 233), + begin=datetime.now() - timedelta(minutes=10), + end=datetime.now(), +) + +# r = t.upload() +# from pdb import set_trace; set_trace() + +print(Timesheet.get_by_id(api, 30683))