hamster-tools/hamstertools/__init__.py

636 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3.7
import csv
from datetime import datetime
from pathlib import Path
import sys
import click
import requests
import sqlite3
HAMSTER_DIR = Path.home() / '.local/share/hamster'
HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
conn = sqlite3.connect(HAMSTER_FILE)
c = conn.cursor()
def get_categories(ids=None, search=None):
sql = '''
SELECT
id, name
FROM
categories '''
args = []
if ids is not None:
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
args = args + list(ids)
if search is not None:
sql = sql + " WHERE name LIKE ?"
search = '%{0}%'.format(search)
args.append(search)
results = c.execute(sql, args)
results = c.fetchall()
return results
def get_activities(ids=None, search=None, category_search=None):
sql = '''
SELECT
activities.id, activities.name, categories.name, categories.id
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id '''
args = []
if ids is not None:
sql = sql + 'WHERE activities.id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
args = args + list(ids)
if search is not None:
sql = sql + " WHERE activities.name LIKE ?"
search = '%{0}%'.format(search)
args.append(search)
if category_search is not None:
sql = sql + " WHERE categories.name LIKE ?"
category_search = '%{0}%'.format(category_search)
args.append(category_search)
results = c.execute(sql, args)
results = c.fetchall()
return results
@click.group()
def cli():
pass
@cli.group()
def categories():
pass
@categories.command('list')
@click.option('--search', help='Search string')
def list_categories(search):
""" List / search categories """
results = get_categories(search=search)
for r in results:
click.echo('@{0[0]}: {0[1]}'.format(r))
@categories.command('delete')
@click.argument('ids', nargs=-1)
def delete_categories(ids):
""" Delete categories specified by IDS """
click.secho('Deleting:', fg='red')
results = get_categories(ids)
for r in results:
sql = 'SELECT COUNT(id) FROM activities WHERE category_id = ?'
count = c.execute(sql, (r[0],)).fetchone()[0]
click.echo('@{0[0]}: {0[1]} ({1} activities)'.format(r, count))
click.confirm('Do you want to continue?', abort=True)
for r in results:
sql = 'DELETE FROM activities WHERE category_id = ?'
c.execute(sql, (r[0],))
sql = 'DELETE FROM categories '
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
c.execute(sql, ids)
conn.commit()
click.secho('Deleted {0} categories'.format(len(ids)), fg='green')
@categories.command('rename')
@click.argument('id_', metavar='ID')
@click.argument('name')
def rename_category(id_, name):
""" Rename a category """
r = get_categories((id_,))[0]
click.echo('Renaming @{0[0]}: {0[1]} to "{1}"'.format(r, name))
sql = 'UPDATE categories SET name = ? WHERE id = ?'
c.execute(sql, (name, r[0]))
conn.commit()
@categories.command('activities')
@click.argument('ids', nargs=-1)
def list_category_activities(ids):
""" Show activities for categories specified by IDS """
sql = '''
SELECT
activities.id, activities.name, categories.name
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
WHERE
categories.id IN ({seq})
'''.format(
seq=','.join(['?'] * len(ids))
)
results = c.execute(sql, ids)
for r in results:
click.echo('@{0[0]}: {0[2]} » {0[1]}'.format(r))
@cli.group()
def activities():
pass
@activities.command('list')
@click.option('--search', help='Search string')
@click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output')
def list_activities(search, csv_output):
""" List / search activities """
results = get_activities(search=search)
if csv_output:
csv_writer = csv.writer(sys.stdout)
for r in results:
if csv_output:
csv_writer.writerow([r[3], r[2], r[0], r[1]])
else:
click.echo('@{0[3]}: {0[2]} » {0[0]}: {0[1]}'.format(r))
@activities.command('delete')
@click.argument('ids', nargs=-1)
def delete_activities(ids):
""" Delete activities specified by IDS """
results = get_activities(ids)
click.secho('Deleting:', fg='red')
for r in results:
sql = "SELECT COUNT(id) FROM facts WHERE activity_id = ?"
count = c.execute(sql, (r[0],)).fetchone()[0]
click.echo('@{0[0]}: {0[2]} » {0[1]} ({1} facts)'.format(r, count))
click.confirm('Do you want to continue?', abort=True)
sql = 'DELETE FROM activities '
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
c.execute(sql, ids)
conn.commit()
click.secho('Deleted {0} activities'.format(len(ids)), fg='green')
@activities.command()
@click.argument('category_id')
@click.argument('ids', nargs=-1)
def move(category_id, ids):
""" Move activities to another category """
category = get_categories((category_id,))[0]
results = get_activities(ids)
click.secho('Moving to "@{0[0]}: {0[1]}":'.format(category), fg='green')
for r in results:
click.secho('@{0[3]}: {0[2]} » @{0[0]}: {0[1]}'.format(r), fg='blue')
click.confirm('Do you want to continue?', abort=True)
sql = '''
UPDATE
activities
SET
category_id = ?
'''
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
c.execute(sql, (category[0], *ids))
conn.commit()
click.secho('Moved {0} activities'.format(len(ids)), fg='green')
@activities.command()
@click.argument('ids', nargs=-1)
def list_facts(ids):
""" Show facts for activities """
results = get_activities(ids)
for r in results:
click.secho(
'@{0[0]}: {0[1]}'.format(r), fg='green'
)
sql = '''
SELECT
start_time,
activities.name
FROM
facts
LEFT JOIN
activities
ON
facts.activity_id = activities.id
WHERE
activities.id = ?
'''
results = c.execute(sql, (r[0],))
for r in results:
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
@activities.command()
@click.argument('from_id')
@click.argument('to_id')
def move_facts(from_id, to_id):
""" Move facts from one activity to another """
from_activity = get_activities((from_id,))[0]
to_activity = get_activities((to_id,))[0]
click.secho(
'Moving facts from "@{0[2]} » @{0[0]}: {0[1]}" to "@{1[2]} » @{1[0]}: {1[1]}"'.format(
from_activity, to_activity
), fg='green'
)
sql = '''
SELECT
start_time,
activities.name
FROM
facts
LEFT JOIN
activities
ON
facts.activity_id = activities.id
WHERE
activities.id = ?
'''
results = c.execute(sql, (from_id,))
for r in results:
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
click.confirm('Do you want to continue?', abort=True)
c.execute(
'UPDATE facts SET activity_id = ? WHERE activity_id = ?',
(to_id, from_id)
)
conn.commit()
click.secho('Moved {0} facts'.format(results.rowcount), fg='green')
click.confirm(
'Would you like to delete @{0[2]} » @{0[0]}: {0[1]}?'.format(from_activity),
abort=True
)
delete_activities((from_id,))
@activities.command()
def find_duplicates():
""" Show activities which are not unique in their categories """
sql = '''
SELECT
categories.id,
categories.name,
activities.id,
activities.name,
COUNT(activities.id) c
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
GROUP BY
activities.name,
activities.category_id
HAVING c > 1
'''
results = c.execute(sql)
for r in results:
click.secho('@{0[0]}: {0[1]} » @{0[2]}: {0[3]} ({0[4]})'.format(r), fg='blue')
@cli.group()
def kimai():
pass
def _get_kimai_mapping_file(path):
try:
return open(path)
except FileNotFoundError:
click.confirm(
'Mapping file {} not found, create it?:'.format(path),
abort=True
)
mapping_file = open(path, 'w')
mapping_writer = csv.writer(mapping_file)
mapping_writer.writerow([
'FROM category',
'FROM activity',
'TO Customer',
'TO Project',
'TO Activity'
])
results = get_activities(category_search=category_search)
for r in results:
mapping_writer.writerow([
r[2], r[1]
])
mapping_file.close()
return open(path)
@kimai.command()
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)')
@click.argument('username')
@click.argument('api_key')
@click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors')
@click.option('--ignore-activities', is_flag=True, help='Ignore missing activities')
def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
"""
Download customer / project / activity data from Kimai
"""
kimai_api_url = 'https://kimai.autonomic.zone/api'
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file)
next(mapping_reader)
mapping_data = [
[row[2], row[3], row[4]]
for row in mapping_reader
]
mapping_file.close()
auth_headers = {
'X-AUTH-USER': username,
'X-AUTH-TOKEN': api_key
}
customers = requests.get(f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json()
projects = requests.get(f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json()
activities = requests.get(f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json()
found_customers = []
found_projects = []
found_activities = []
for row in mapping_data:
# Check if each mapping still exists in Kimai
matching_customers = list(filter(lambda x: x['name'] == row[0], customers))
if row[0] in found_customers:
just_errors or click.secho("Skipping existing customer '{0}'".format(row[0]), fg='green')
else:
if len(matching_customers) > 1:
click.secho("More than one match for customer '{0}'".format(row[0]), fg='red')
continue
elif len(matching_customers) < 1:
click.secho("Missing customer '{0}'".format(row[0]), fg='yellow')
continue
else:
just_errors or click.secho("Found customer '{0}'".format(row[0]), fg='green')
found_customers.append(row[0])
project_str = ':'.join(row[0:2])
matching_projects = list(filter(
lambda x: x['name'] == row[1] and
x['customer'] == matching_customers[0]['id'],
projects)
)
if project_str in found_projects:
just_errors or click.secho("Skipping existing project '{0}'".format(project_str), fg='green')
else:
if len(matching_projects) > 1:
click.secho("More than one match for project '{0}'".format(project_str), fg='red')
continue
elif len(matching_projects) < 1:
click.secho("Missing project '{0}'".format(project_str), fg='yellow')
continue
else:
just_errors or click.secho("Found project '{0}'".format(project_str), fg='green')
found_projects.append(project_str)
if ignore_activities:
continue
activity_str = ':'.join(row)
if activity_str in found_activities:
just_errors or click.secho("Skipping existing activity '{0}'".format(activity_str), fg='green')
else:
matching_activities = list(filter(
lambda x: x['name'] == row[2]
and x['project'] == matching_projects[0]['id'],
activities
))
if len(matching_activities) > 1:
click.secho("More than one match for activity '{0}'".format(activity_str), fg='red')
elif len(matching_activities) < 1:
click.secho("Missing activity '{0}'".format(activity_str), fg='yellow')
else:
just_errors or click.secho("Found activity '{0}'".format(activity_str), fg='green')
found_activities.append(activity_str)
@kimai.command('import')
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)')
@click.option('--output', help='Output file (default kimai.csv)')
@click.option('--category-search', help='Category search string')
@click.option('--after', help='Only show time entries after this date')
@click.option('--show-missing', help='Just report on the missing entries', is_flag=True)
@click.argument('username')
def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
if output is None:
timestamp = datetime.now().strftime('%F')
output = f'kimai_{timestamp}.csv'
mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file)
next(mapping_reader)
mapping = {
'{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4]]
for row in mapping_reader
}
mapping_file.close()
output_file = open(output, 'w')
output_writer = csv.writer(output_file)
args = []
sql = '''
SELECT
facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name,
categories.name, categories.id
FROM
facts
LEFT JOIN
activities ON facts.activity_id = activities.id
LEFT JOIN
categories ON activities.category_id = categories.id '''
if category_search is not None:
sql = sql + " WHERE categories.name LIKE ?"
category_search = '%{0}%'.format(category_search)
args.append(category_search)
if after is not None:
if category_search is not None:
sql = sql + ' AND '
else:
sql = sql + ' WHERE '
sql = sql + f"DATE(facts.start_time) > DATE(?)"
args.append(after)
results = c.execute(sql, args)
results = c.fetchall()
if not show_missing:
output_writer.writerow([
"Date",
"From",
"To",
"Duration",
"Rate",
"User",
"Customer",
"Project",
"Activity",
"Description",
"Exported",
"Tags",
"Hourly rate",
"Fixed rate"
])
for fact in results:
k = '{0}:{1}'.format(fact[6], fact[5])
try:
mapping_ = mapping[k]
except KeyError:
if show_missing:
output_writer.writerow([fact[6], fact[5]])
click.secho("Can't find mapping for '{0}', skipping".format(k), fg='yellow')
continue
if show_missing:
continue
if fact[1] is None or fact[2] is None:
click.secho("Missing duration data '{0}-{1}', skipping".format(
fact[1],
fact[2]
), fg='yellow')
continue
date_start, date_end = (
datetime.strptime(fact[2].split('.')[0], '%Y-%m-%d %H:%M:%S'),
datetime.strptime(fact[1].split('.')[0], '%Y-%m-%d %H:%M:%S')
)
duration = (
date_start - date_end
).seconds / 3600
output_writer.writerow([
date_start.strftime('%Y-%m-%d'),
date_start.strftime('%H:%M'),
'', # To (time)
duration,
'', # Rate
username,
mapping_[0],
mapping_[1],
mapping_[2],
fact[3] or '',
'0', # Exported
'', # Tags
'', # Hourly rate
'', # Fixed rate
])
output_file.close()
@cli.command()
def hamster():
click.echo('🐹')
if __name__ == "__main__":
cli()