#!/usr/bin/env python3.7 import csv from datetime import datetime from itertools import chain from pathlib import Path import sys import click import requests import sqlite3 HAMSTER_DIR = Path.home() / '.local/share/hamster' HAMSTER_FILE = HAMSTER_DIR / 'hamster.db' conn = sqlite3.connect(HAMSTER_FILE) c = conn.cursor() def get_categories(ids=None, search=None): sql = ''' SELECT id, name FROM categories ''' args = [] if ids is not None: sql = sql + 'WHERE id IN ({seq})'.format( seq=','.join(['?'] * len(ids)) ) args = args + list(ids) if search is not None: sql = sql + " WHERE name LIKE ?" search = '%{0}%'.format(search) args.append(search) results = c.execute(sql, args) results = c.fetchall() return results def get_activities(ids=None, search=None, category_search=None): sql = ''' SELECT activities.id, activities.name, categories.name, categories.id FROM activities LEFT JOIN categories ON activities.category_id = categories.id ''' args = [] if ids is not None: sql = sql + 'WHERE activities.id IN ({seq})'.format( seq=','.join(['?'] * len(ids)) ) args = args + list(ids) if search is not None: sql = sql + " WHERE activities.name LIKE ?" search = '%{0}%'.format(search) args.append(search) if category_search is not None: sql = sql + " WHERE categories.name LIKE ?" category_search = '%{0}%'.format(category_search) args.append(category_search) results = c.execute(sql, args) results = c.fetchall() return results @click.group() def cli(): pass @cli.group() def categories(): pass @categories.command('list') @click.option('--search', help='Search string') def list_categories(search): """ List / search categories """ results = get_categories(search=search) for r in results: click.echo('@{0[0]}: {0[1]}'.format(r)) @categories.command('delete') @click.argument('ids', nargs=-1) def delete_categories(ids): """ Delete categories specified by IDS """ click.secho('Deleting:', fg='red') results = get_categories(ids) for r in results: sql = 'select count(id) from activities where category_id = ?' count = c.execute(sql, (r[0],)).fetchone()[0] click.echo('@{0[0]}: {0[1]} ({1} activities)'.format(r, count)) click.confirm('Do you want to continue?', abort=True) for r in results: sql = 'DELETE FROM activities WHERE category_id = ?' c.execute(sql, (r[0],)) sql = 'DELETE FROM categories ' sql = sql + 'WHERE id IN ({seq})'.format( seq=','.join(['?'] * len(ids)) ) c.execute(sql, ids) conn.commit() click.secho('Deleted {0} categories'.format(len(ids)), fg='green') @categories.command('rename') @click.argument('id_', metavar='ID') @click.argument('name') def rename_category(id_, name): """ Rename a category """ r = get_categories((id_,))[0] click.echo('Renaming @{0[0]}: {0[1]} to "{1}"'.format(r, name)) sql = 'UPDATE categories SET name = ? WHERE id = ?' c.execute(sql, (name, r[0])) conn.commit() @categories.command('activities') @click.argument('ids', nargs=-1) def list_category_activities(ids): """ Show activities for categories specified by IDS """ sql = ''' SELECT activities.id, activities.name, categories.name FROM activities LEFT JOIN categories ON activities.category_id = categories.id WHERE categories.id IN ({seq}) '''.format( seq=','.join(['?'] * len(ids)) ) results = c.execute(sql, ids) for r in results: click.echo('@{0[0]}: {0[2]} » {0[1]}'.format(r)) @categories.command('tidy') def tidy_categories(): """ Remove categories with no activities """ sql = 'SELECT categories.id, categories.name FROM categories LEFT JOIN activities ON categories.id = activities.category_id WHERE activities.id IS NULL' categories = c.execute(sql).fetchall() click.echo('Found {0} empty categories:'.format(len(categories))) for cat in categories: click.echo('@{0[0]}: {0[1]}'.format(cat)) click.confirm('Do you want to continue?', abort=True) sql = 'DELETE FROM categories ' sql = sql + 'WHERE id IN ({seq})'.format( seq=','.join(['?'] * len(categories)) ) c.execute(sql, [cat[0] for cat in categories]) conn.commit() @cli.group() def activities(): pass @activities.command('list') @click.option('--search', help='Search string') @click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output') def list_activities(search, csv_output): """ List / search activities """ results = get_activities(search=search) results.sort(key=lambda t: (t[2], t[1])) if csv_output: csv_writer = csv.writer(sys.stdout) for r in results: if csv_output: csv_writer.writerow([r[3], r[2], r[0], r[1]]) else: click.echo('@{0[3]}: {0[2]} » {0[0]}: {0[1]}'.format(r)) @activities.command('delete') @click.argument('ids', nargs=-1) def delete_activities(ids): """ Delete activities specified by IDS """ results = get_activities(ids) click.secho('Deleting:', fg='red') for r in results: sql = "SELECT COUNT(id) FROM facts WHERE activity_id = ?" count = c.execute(sql, (r[0],)).fetchone()[0] click.echo('@{0[0]}: {0[2]} » {0[1]} ({1} facts)'.format(r, count)) click.confirm('Do you want to continue?', abort=True) sql = 'DELETE FROM activities ' sql = sql + 'WHERE id IN ({seq})'.format( seq=','.join(['?'] * len(ids)) ) c.execute(sql, ids) conn.commit() click.secho('Deleted {0} activities'.format(len(ids)), fg='green') @activities.command() @click.argument('category_id') @click.argument('ids', nargs=-1) def move(category_id, ids): """ Move activities to another category """ category = get_categories((category_id,))[0] results = get_activities(ids) click.secho('Moving to "@{0[0]}: {0[1]}":'.format(category), fg='green') for r in results: click.secho('@{0[3]}: {0[2]} » @{0[0]}: {0[1]}'.format(r), fg='blue') click.confirm('Do you want to continue?', abort=True) sql = ''' UPDATE activities SET category_id = ? ''' sql = sql + 'WHERE id IN ({seq})'.format( seq=','.join(['?'] * len(ids)) ) c.execute(sql, (category[0], *ids)) conn.commit() click.secho('Moved {0} activities'.format(len(ids)), fg='green') @activities.command() @click.argument('ids', nargs=-1) def list_facts(ids): """ Show facts for activities """ results = get_activities(ids) for r in results: click.secho( '@{0[0]}: {0[1]}'.format(r), fg='green' ) sql = ''' SELECT start_time, activities.name FROM facts LEFT JOIN activities ON facts.activity_id = activities.id WHERE activities.id = ? ''' results = c.execute(sql, (r[0],)) for r in results: click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue') @activities.command() @click.argument('from_id') @click.argument('to_id') def move_facts(from_id, to_id): """ Move facts from one activity to another """ from_activity = get_activities((from_id,))[0] to_activity = get_activities((to_id,))[0] click.secho( 'Moving facts from "@{0[2]} » @{0[0]}: {0[1]}" to "@{1[2]} » @{1[0]}: {1[1]}"'.format( from_activity, to_activity ), fg='green' ) sql = ''' SELECT start_time, activities.name FROM facts LEFT JOIN activities ON facts.activity_id = activities.id WHERE activities.id = ? ''' results = c.execute(sql, (from_id,)) for r in results: click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue') click.confirm('Do you want to continue?', abort=True) c.execute( 'UPDATE facts SET activity_id = ? WHERE activity_id = ?', (to_id, from_id) ) conn.commit() click.secho('Moved {0} facts'.format(results.rowcount), fg='green') click.confirm( 'Would you like to delete @{0[2]} » @{0[0]}: {0[1]}?'.format( from_activity), abort=True ) delete_activities((from_id,)) @activities.command() def find_duplicates(): """ Show activities which are not unique in their categories """ sql = ''' SELECT categories.id, categories.name, activities.id, activities.name, COUNT(activities.id) c FROM activities LEFT JOIN categories ON activities.category_id = categories.id GROUP BY activities.name, activities.category_id HAVING c > 1 ''' results = c.execute(sql) for r in results: click.secho( '@{0[0]}: {0[1]} » @{0[2]}: {0[3]} ({0[4]})'.format(r), fg='blue') @cli.group() def kimai(): pass def _get_kimai_mapping_file(path, category_search=None): try: return open(path) except FileNotFoundError: click.confirm( 'Mapping file {} not found, create it?:'.format(path), abort=True ) mapping_file = open(path, 'w') mapping_writer = csv.writer(mapping_file) mapping_writer.writerow([ 'FROM category', 'FROM activity', 'TO Customer', 'TO Project', 'TO Activity', 'TO Tag', 'TO Note' ]) results = get_activities(category_search=category_search) for r in results: mapping_writer.writerow([ r[2], r[1] ]) mapping_file.close() return open(path) @kimai.command() @click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) @click.argument('username') @click.argument('api_key') @click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors') @click.option('--ignore-activities', is_flag=True, help='Ignore missing activities') def sync(username, api_key, just_errors, ignore_activities, mapping_path=None): """ Download customer / project / activity data from Kimai """ kimai_api_url = 'https://kimai.autonomic.zone/api' if type(mapping_path) == tuple: mapping_files = [] for mapping_path_item in mapping_path: mapping_file = _get_kimai_mapping_file(mapping_path_item) next(mapping_file) mapping_files.append(mapping_file) mapping_reader = csv.reader(chain(*mapping_files)) else: if mapping_path is None: mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' mapping_file = _get_kimai_mapping_file(mapping_path) mapping_reader = csv.reader(mapping_file) next(mapping_reader) mapping_data = [ [row[2], row[3], row[4]] for row in mapping_reader ] mapping_file.close() auth_headers = { 'X-AUTH-USER': username, 'X-AUTH-TOKEN': api_key } customers = requests.get( f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json() projects = requests.get( f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json() activities = requests.get( f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json() found_customers = [] found_projects = [] found_activities = [] for row in mapping_data: # Check if each mapping still exists in Kimai matching_customers = list( filter(lambda x: x['name'] == row[0], customers)) if row[0] in found_customers: just_errors or click.secho( "Skipping existing customer '{0}'".format(row[0]), fg='green') else: if len(matching_customers) > 1: click.secho( "More than one match for customer '{0}'".format(row[0]), fg='red') continue elif len(matching_customers) < 1: click.secho("Missing customer '{0}'".format( row[0]), fg='yellow') continue else: just_errors or click.secho( "Found customer '{0}'".format(row[0]), fg='green') found_customers.append(row[0]) project_str = ':'.join(row[0:2]) matching_projects = list(filter( lambda x: x['name'] == row[1] and x['customer'] == matching_customers[0]['id'], projects) ) if project_str in found_projects: just_errors or click.secho( "Skipping existing project '{0}'".format(project_str), fg='green') else: if len(matching_projects) > 1: click.secho("More than one match for project '{0}'".format( project_str), fg='red') continue elif len(matching_projects) < 1: click.secho("Missing project '{0}'".format( project_str), fg='yellow') continue else: just_errors or click.secho( "Found project '{0}'".format(project_str), fg='green') found_projects.append(project_str) if ignore_activities: continue activity_str = ':'.join(row) if activity_str in found_activities: just_errors or click.secho( "Skipping existing activity '{0}'".format(activity_str), fg='green') else: matching_activities = list(filter( lambda x: x['name'] == row[2] and x['project'] == matching_projects[0]['id'], activities )) if len(matching_activities) > 1: click.secho("More than one match for activity '{0}'".format( activity_str), fg='red') elif len(matching_activities) < 1: click.secho("Missing activity '{0}'".format( activity_str), fg='yellow') else: just_errors or click.secho( "Found activity '{0}'".format(activity_str), fg='green') found_activities.append(activity_str) @kimai.command('import') @click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) @click.option('--output', help='Output file (default kimai.csv)') @click.option('--category-search', help='Category search string') @click.option('--after', help='Only show time entries after this date') @click.option('--show-missing', help='Just report on the missing entries', is_flag=True) @click.argument('username') def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False): """ Export time tracking data in Kimai format """ if mapping_path is None: mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' if output is None: timestamp = datetime.now().strftime('%F') output = f'kimai_{timestamp}.csv' if type(mapping_path) == tuple: mapping_files = [] for mapping_path_item in mapping_path: mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search) next(mapping_file) mapping_files.append(mapping_file) mapping_reader = csv.reader(chain(*mapping_files)) else: mapping_file = _get_kimai_mapping_file(mapping_path, category_search) next(mapping_file) mapping_reader = csv.reader(mapping_file) mapping = { '{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4], row[5]] for row in mapping_reader } if type(mapping_path) == tuple: for mapping_file in mapping_files: mapping_file.close() else: mapping_file.close() output_file = open(output, 'w') output_writer = csv.writer(output_file) args = [] sql = ''' SELECT facts.id, facts.start_time, facts.end_time, facts.description, activities.id, activities.name, categories.name, categories.id FROM facts LEFT JOIN activities ON facts.activity_id = activities.id LEFT JOIN categories ON activities.category_id = categories.id ''' if category_search is not None: sql = sql + " WHERE categories.name LIKE ?" category_search = '%{0}%'.format(category_search) args.append(category_search) if after is not None: if category_search is not None: sql = sql + ' AND ' else: sql = sql + ' WHERE ' sql = sql + f"DATE(facts.start_time) > DATE(?)" args.append(after) results = c.execute(sql, args) results = c.fetchall() if not show_missing: output_writer.writerow([ "Date", "From", "To", "Duration", "Rate", "User", "Customer", "Project", "Activity", "Description", "Exported", "Tags", "HourlyRate", "FixedRate", "InternalRate" ]) for fact in results: k = '{0}:{1}'.format(fact[6], fact[5]) try: mapping_ = mapping[k] except KeyError: if show_missing: output_writer.writerow([fact[6], fact[5]]) click.secho( "Can't find mapping for '{0}', skipping".format(k), fg='yellow') continue if show_missing: continue if fact[1] is None or fact[2] is None: click.secho("Missing duration data '{0}-{1}', skipping".format( fact[1], fact[2] ), fg='yellow') continue if len(mapping_) < 5: mapping_.append(None) date_start, date_end = ( datetime.strptime(fact[2].split('.')[0], '%Y-%m-%d %H:%M:%S'), datetime.strptime(fact[1].split('.')[0], '%Y-%m-%d %H:%M:%S') ) duration = ( date_start - date_end ).seconds / 3600 output_writer.writerow([ date_start.strftime('%Y-%m-%d'), date_start.strftime('%H:%M'), '', # To (time) duration, '', # Rate username, mapping_[0], mapping_[1], mapping_[2], fact[3] or mapping_[4] or '', '0', # Exported mapping_[3], '', # Hourly rate '', # Fixed rate ]) output_file.close() @cli.command() def hamster(): click.echo('🐹') if __name__ == "__main__": cli()