hamster-tools/hamstertools/__init__.py

694 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3.7
import csv
from datetime import datetime
from itertools import chain
from pathlib import Path
import sys
import click
import requests
from peewee import fn, JOIN
from .db import db, HamsterCategory, HamsterActivity, HamsterFact, KimaiCustomer, KimaiProject, KimaiActivity, HamsterKimaiMapping
# HAMSTER_DIR = Path.home() / '.local/share/hamster'
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
HAMSTER_FILE = 'hamster-testing.db'
db.init(HAMSTER_FILE)
@click.group()
def cli():
pass
@cli.group()
def categories():
pass
@categories.command('list')
@click.option('--search', help='Search string')
def list_categories(search):
""" List / search categories """
categories = HamsterCategory.select()
if search is not None:
categories = categories.where(HamsterCategory.name.contains(search))
for c in categories:
click.echo(f'@{c.id}: {c.name}')
@categories.command('delete')
@click.argument('ids', nargs=-1)
def delete_categories(ids):
""" Delete categories specified by IDS """
click.secho('Deleting:', fg='red')
categories = HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count")
).join(HamsterActivity, JOIN.LEFT_OUTER).group_by(HamsterActivity).where(HamsterCategory.id.in_(ids))
for c in categories:
click.echo(f'@{c.id}: {c.name} ({c.activities_count} activities)')
click.confirm('Do you want to continue?', abort=True)
count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute()
click.secho('Deleted {0} categories'.format(count), fg='green')
@categories.command('rename')
@click.argument('id_', metavar='ID')
@click.argument('name')
def rename_category(id_, name):
""" Rename a category """
r = get_categories((id_,))[0]
click.echo('Renaming @{0[0]}: {0[1]} to "{1}"'.format(r, name))
sql = 'UPDATE categories SET name = ? WHERE id = ?'
c.execute(sql, (name, r[0]))
conn.commit()
@categories.command('activities')
@click.argument('ids', nargs=-1)
def list_category_activities(ids):
""" Show activities for categories specified by IDS """
sql = '''
SELECT
activities.id, activities.name, categories.name
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
WHERE
categories.id IN ({seq})
'''.format(
seq=','.join(['?'] * len(ids))
)
results = c.execute(sql, ids)
for r in results:
click.echo('@{0[0]}: {0[2]} » {0[1]}'.format(r))
@categories.command('tidy')
def tidy_categories():
""" Remove categories with no activities """
sql = 'SELECT categories.id, categories.name FROM categories LEFT JOIN activities ON categories.id = activities.category_id WHERE activities.id IS NULL'
categories = c.execute(sql).fetchall()
click.echo('Found {0} empty categories:'.format(len(categories)))
for cat in categories:
click.echo('@{0[0]}: {0[1]}'.format(cat))
click.confirm('Do you want to continue?', abort=True)
sql = 'DELETE FROM categories '
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(categories))
)
c.execute(sql, [cat[0] for cat in categories])
conn.commit()
@cli.group()
def activities():
pass
@activities.command('list')
@click.option('--search', help='Search string')
@click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output')
def list_activities(search, csv_output):
""" List / search activities """
results = get_activities(search=search)
results.sort(key=lambda t: (t[2], t[1]))
if csv_output:
csv_writer = csv.writer(sys.stdout)
for r in results:
if csv_output:
csv_writer.writerow([r[3], r[2], r[0], r[1]])
else:
click.echo('@{0[3]}: {0[2]} » {0[0]}: {0[1]}'.format(r))
@activities.command('delete')
@click.argument('ids', nargs=-1)
def delete_activities(ids):
""" Delete activities specified by IDS """
results = get_activities(ids)
click.secho('Deleting:', fg='red')
for r in results:
sql = "SELECT COUNT(id) FROM facts WHERE activity_id = ?"
count = c.execute(sql, (r[0],)).fetchone()[0]
click.echo('@{0[0]}: {0[2]} » {0[1]} ({1} facts)'.format(r, count))
click.confirm('Do you want to continue?', abort=True)
sql = 'DELETE FROM activities '
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
c.execute(sql, ids)
conn.commit()
click.secho('Deleted {0} activities'.format(len(ids)), fg='green')
@activities.command()
@click.argument('category_id')
@click.argument('ids', nargs=-1)
def move(category_id, ids):
""" Move activities to another category """
category = get_categories((category_id,))[0]
results = get_activities(ids)
click.secho('Moving to "@{0[0]}: {0[1]}":'.format(category), fg='green')
for r in results:
click.secho('@{0[3]}: {0[2]} » @{0[0]}: {0[1]}'.format(r), fg='blue')
click.confirm('Do you want to continue?', abort=True)
sql = '''
UPDATE
activities
SET
category_id = ?
'''
sql = sql + 'WHERE id IN ({seq})'.format(
seq=','.join(['?'] * len(ids))
)
c.execute(sql, (category[0], *ids))
conn.commit()
click.secho('Moved {0} activities'.format(len(ids)), fg='green')
@activities.command()
@click.argument('ids', nargs=-1)
def list_facts(ids):
""" Show facts for activities """
results = get_activities(ids)
for r in results:
click.secho(
'@{0[0]}: {0[1]}'.format(r), fg='green'
)
sql = '''
SELECT
start_time,
activities.name
FROM
facts
LEFT JOIN
activities
ON
facts.activity_id = activities.id
WHERE
activities.id = ?
'''
results = c.execute(sql, (r[0],))
for r in results:
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
@activities.command()
@click.argument('from_id')
@click.argument('to_id')
def move_facts(from_id, to_id):
""" Move facts from one activity to another """
from_activity = get_activities((from_id,))[0]
to_activity = get_activities((to_id,))[0]
click.secho(
'Moving facts from "@{0[2]} » @{0[0]}: {0[1]}" to "@{1[2]} » @{1[0]}: {1[1]}"'.format(
from_activity, to_activity
), fg='green'
)
sql = '''
SELECT
start_time,
activities.name
FROM
facts
LEFT JOIN
activities
ON
facts.activity_id = activities.id
WHERE
activities.id = ?
'''
results = c.execute(sql, (from_id,))
for r in results:
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
click.confirm('Do you want to continue?', abort=True)
c.execute(
'UPDATE facts SET activity_id = ? WHERE activity_id = ?',
(to_id, from_id)
)
conn.commit()
click.secho('Moved {0} facts'.format(results.rowcount), fg='green')
click.confirm(
'Would you like to delete @{0[2]} » @{0[0]}: {0[1]}?'.format(
from_activity),
abort=True
)
delete_activities((from_id,))
@activities.command()
def find_duplicates():
""" Show activities which are not unique in their categories """
sql = '''
SELECT
categories.id,
categories.name,
activities.id,
activities.name,
COUNT(activities.id) c
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
GROUP BY
activities.name,
activities.category_id
HAVING c > 1
'''
results = c.execute(sql)
for r in results:
click.secho(
'@{0[0]}: {0[1]} » @{0[2]}: {0[3]} ({0[4]})'.format(r), fg='blue')
@cli.group()
def kimai():
pass
def _get_kimai_mapping_file(path, category_search=None):
try:
return open(path)
except FileNotFoundError:
click.confirm(
'Mapping file {} not found, create it?:'.format(path),
abort=True
)
mapping_file = open(path, 'w')
mapping_writer = csv.writer(mapping_file)
mapping_writer.writerow([
'FROM category',
'FROM activity',
'TO Customer',
'TO Project',
'TO Activity',
'TO Tag',
'TO Note'
])
results = get_activities(category_search=category_search)
for r in results:
mapping_writer.writerow([
r[2], r[1]
])
mapping_file.close()
return open(path)
@kimai.command()
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True)
@click.argument('username')
@click.argument('api_key')
@click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors')
@click.option('--ignore-activities', is_flag=True, help='Ignore missing activities')
def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
"""
Download customer / project / activity data from Kimai
"""
kimai_api_url = 'https://kimai.autonomic.zone/api'
if type(mapping_path) == tuple:
mapping_files = []
for mapping_path_item in mapping_path:
mapping_file = _get_kimai_mapping_file(mapping_path_item)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
else:
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file)
next(mapping_reader)
mapping_data = [
[row[2], row[3], row[4]]
for row in mapping_reader
]
mapping_file.close()
auth_headers = {
'X-AUTH-USER': username,
'X-AUTH-TOKEN': api_key
}
customers = requests.get(
f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json()
projects = requests.get(
f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json()
activities = requests.get(
f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json()
found_customers = []
found_projects = []
found_activities = []
for row in mapping_data:
# Check if each mapping still exists in Kimai
matching_customers = list(
filter(lambda x: x['name'] == row[0], customers))
if row[0] in found_customers:
just_errors or click.secho(
"Skipping existing customer '{0}'".format(row[0]), fg='green')
else:
if len(matching_customers) > 1:
click.secho(
"More than one match for customer '{0}'".format(row[0]), fg='red')
continue
elif len(matching_customers) < 1:
click.secho("Missing customer '{0}'".format(
row[0]), fg='yellow')
continue
else:
just_errors or click.secho(
"Found customer '{0}'".format(row[0]), fg='green')
found_customers.append(row[0])
project_str = ':'.join(row[0:2])
matching_projects = list(filter(
lambda x: x['name'] == row[1] and
x['customer'] == matching_customers[0]['id'],
projects)
)
if project_str in found_projects:
just_errors or click.secho(
"Skipping existing project '{0}'".format(project_str), fg='green')
else:
if len(matching_projects) > 1:
click.secho("More than one match for project '{0}'".format(
project_str), fg='red')
continue
elif len(matching_projects) < 1:
click.secho("Missing project '{0}'".format(
project_str), fg='yellow')
continue
else:
just_errors or click.secho(
"Found project '{0}'".format(project_str), fg='green')
found_projects.append(project_str)
if ignore_activities:
continue
activity_str = ':'.join(row)
if activity_str in found_activities:
just_errors or click.secho(
"Skipping existing activity '{0}'".format(activity_str), fg='green')
else:
matching_activities = list(filter(
lambda x: x['name'] == row[2]
and x['project'] == matching_projects[0]['id'],
activities
))
if len(matching_activities) > 1:
click.secho("More than one match for activity '{0}'".format(
activity_str), fg='red')
elif len(matching_activities) < 1:
click.secho("Missing activity '{0}'".format(
activity_str), fg='yellow')
else:
just_errors or click.secho(
"Found activity '{0}'".format(activity_str), fg='green')
found_activities.append(activity_str)
@kimai.command('import')
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True)
@click.option('--output', help='Output file (default kimai.csv)')
@click.option('--category-search', help='Category search string')
@click.option('--after', help='Only show time entries after this date')
@click.option('--show-missing', help='Just report on the missing entries', is_flag=True)
@click.argument('username')
def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
if output is None:
timestamp = datetime.now().strftime('%F')
output = f'kimai_{timestamp}.csv'
if type(mapping_path) == tuple:
mapping_files = []
for mapping_path_item in mapping_path:
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
else:
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
mapping = {
'{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
for row in mapping_reader
}
if type(mapping_path) == tuple:
for mapping_file in mapping_files:
mapping_file.close()
else:
mapping_file.close()
output_file = open(output, 'w')
output_writer = csv.writer(output_file)
args = []
sql = '''
SELECT
facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name,
categories.name, categories.id
FROM
facts
LEFT JOIN
activities ON facts.activity_id = activities.id
LEFT JOIN
categories ON activities.category_id = categories.id '''
if category_search is not None:
sql = sql + " WHERE categories.name LIKE ?"
category_search = '%{0}%'.format(category_search)
args.append(category_search)
if after is not None:
if category_search is not None:
sql = sql + ' AND '
else:
sql = sql + ' WHERE '
sql = sql + f"DATE(facts.start_time) > DATE(?)"
args.append(after)
results = c.execute(sql, args)
results = c.fetchall()
if not show_missing:
output_writer.writerow([
"Date",
"From",
"To",
"Duration",
"Rate",
"User",
"Customer",
"Project",
"Activity",
"Description",
"Exported",
"Tags",
"HourlyRate",
"FixedRate",
"InternalRate"
])
for fact in results:
k = '{0}:{1}'.format(fact[6], fact[5])
try:
mapping_ = mapping[k]
except KeyError:
if show_missing:
output_writer.writerow([fact[6], fact[5]])
click.secho(
"Can't find mapping for '{0}', skipping".format(k), fg='yellow')
continue
if show_missing:
continue
if fact[1] is None or fact[2] is None:
click.secho("Missing duration data '{0}-{1}', skipping".format(
fact[1],
fact[2]
), fg='yellow')
continue
if len(mapping_) < 5:
mapping_.append(None)
date_start, date_end = (
datetime.strptime(fact[2].split('.')[0], '%Y-%m-%d %H:%M:%S'),
datetime.strptime(fact[1].split('.')[0], '%Y-%m-%d %H:%M:%S')
)
duration = (
date_start - date_end
).seconds / 3600
output_writer.writerow([
date_start.strftime('%Y-%m-%d'),
date_start.strftime('%H:%M'),
'', # To (time)
duration,
'', # Rate
username,
mapping_[0],
mapping_[1],
mapping_[2],
fact[3] or mapping_[4] or '',
'0', # Exported
mapping_[3],
'', # Hourly rate
'', # Fixed rate
])
output_file.close()
@kimai.command()
def dbinit():
db.create_tables([KimaiCustomer, KimaiProject, KimaiActivity,
HamsterKimaiMapping])
@kimai.command()
def dbreset():
HamsterKimaiMapping.delete().execute()
@kimai.command()
@click.option('-g', '--global', 'global_', help='Does this file contain mappings to global activties', is_flag=True)
@click.option('--mapping-path', help='Mapping file')
def csv2db(mapping_path=None, global_=False):
mapping_file = _get_kimai_mapping_file(mapping_path, None)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
for row in mapping_reader:
hamster_category = HamsterCategory.get(name=row[0])
hamster_activity = HamsterActivity.get(name=row[1])
kimai_customer = KimaiCustomer.get(name=row[2])
kimai_project = KimaiProject.get(name=row[3],
customer_id=kimai_customer.id)
try:
kimai_activity = KimaiActivity.get(
name=row[4],
project_id=kimai_project.id
)
except KimaiActivity.DoesNotExist:
kimai_activity = KimaiActivity.get(
name=row[4],
)
HamsterKimaiMapping.create(
hamster_activity=hamster_activity,
kimai_customer=kimai_customer,
kimai_project=kimai_project,
kimai_activity=kimai_activity,
kimai_description=row[6],
kimai_tags=row[5]
)
@cli.command()
def app():
from .app import HamsterToolsApp
app = HamsterToolsApp()
app.run()
@cli.command()
def hamster():
click.echo('🐹')
if __name__ == "__main__":
cli()