719 lines
19 KiB
Python
Executable File
719 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3.7
|
|
import csv
|
|
from datetime import datetime
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
import sys
|
|
|
|
import click
|
|
import requests
|
|
import sqlite3
|
|
|
|
# HAMSTER_DIR = Path.home() / '.local/share/hamster'
|
|
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
|
|
HAMSTER_FILE = 'hamster-testing.db'
|
|
|
|
conn = sqlite3.connect(HAMSTER_FILE)
|
|
c = conn.cursor()
|
|
|
|
|
|
def get_categories(ids=None, search=None):
|
|
sql = '''
|
|
SELECT
|
|
id, name
|
|
FROM
|
|
categories '''
|
|
|
|
args = []
|
|
|
|
if ids is not None:
|
|
sql = sql + 'WHERE id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
args = args + list(ids)
|
|
|
|
if search is not None:
|
|
sql = sql + " WHERE name LIKE ?"
|
|
search = '%{0}%'.format(search)
|
|
args.append(search)
|
|
|
|
results = c.execute(sql, args)
|
|
|
|
results = c.fetchall()
|
|
|
|
return results
|
|
|
|
|
|
def get_activities(ids=None, search=None, category_search=None):
|
|
sql = '''
|
|
SELECT
|
|
activities.id, activities.name, categories.name, categories.id
|
|
FROM
|
|
activities
|
|
LEFT JOIN
|
|
categories
|
|
ON
|
|
activities.category_id = categories.id '''
|
|
|
|
args = []
|
|
|
|
if ids is not None:
|
|
sql = sql + 'WHERE activities.id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
args = args + list(ids)
|
|
|
|
if search is not None:
|
|
sql = sql + " WHERE activities.name LIKE ?"
|
|
search = '%{0}%'.format(search)
|
|
args.append(search)
|
|
|
|
if category_search is not None:
|
|
sql = sql + " WHERE categories.name LIKE ?"
|
|
category_search = '%{0}%'.format(category_search)
|
|
args.append(category_search)
|
|
|
|
results = c.execute(sql, args)
|
|
results = c.fetchall()
|
|
|
|
return results
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
pass
|
|
|
|
|
|
@cli.group()
|
|
def categories():
|
|
pass
|
|
|
|
|
|
@categories.command('list')
|
|
@click.option('--search', help='Search string')
|
|
def list_categories(search):
|
|
""" List / search categories """
|
|
results = get_categories(search=search)
|
|
|
|
for r in results:
|
|
click.echo('@{0[0]}: {0[1]}'.format(r))
|
|
|
|
|
|
@categories.command('delete')
|
|
@click.argument('ids', nargs=-1)
|
|
def delete_categories(ids):
|
|
""" Delete categories specified by IDS """
|
|
click.secho('Deleting:', fg='red')
|
|
|
|
results = get_categories(ids)
|
|
|
|
for r in results:
|
|
sql = 'select count(id) from activities where category_id = ?'
|
|
count = c.execute(sql, (r[0],)).fetchone()[0]
|
|
click.echo('@{0[0]}: {0[1]} ({1} activities)'.format(r, count))
|
|
|
|
click.confirm('Do you want to continue?', abort=True)
|
|
|
|
for r in results:
|
|
sql = 'DELETE FROM activities WHERE category_id = ?'
|
|
c.execute(sql, (r[0],))
|
|
|
|
sql = 'DELETE FROM categories '
|
|
|
|
sql = sql + 'WHERE id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
|
|
c.execute(sql, ids)
|
|
conn.commit()
|
|
|
|
click.secho('Deleted {0} categories'.format(len(ids)), fg='green')
|
|
|
|
|
|
@categories.command('rename')
|
|
@click.argument('id_', metavar='ID')
|
|
@click.argument('name')
|
|
def rename_category(id_, name):
|
|
""" Rename a category """
|
|
|
|
r = get_categories((id_,))[0]
|
|
|
|
click.echo('Renaming @{0[0]}: {0[1]} to "{1}"'.format(r, name))
|
|
|
|
sql = 'UPDATE categories SET name = ? WHERE id = ?'
|
|
|
|
c.execute(sql, (name, r[0]))
|
|
conn.commit()
|
|
|
|
|
|
@categories.command('activities')
|
|
@click.argument('ids', nargs=-1)
|
|
def list_category_activities(ids):
|
|
""" Show activities for categories specified by IDS """
|
|
sql = '''
|
|
SELECT
|
|
activities.id, activities.name, categories.name
|
|
FROM
|
|
activities
|
|
LEFT JOIN
|
|
categories
|
|
ON
|
|
activities.category_id = categories.id
|
|
WHERE
|
|
categories.id IN ({seq})
|
|
'''.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
|
|
results = c.execute(sql, ids)
|
|
|
|
for r in results:
|
|
click.echo('@{0[0]}: {0[2]} » {0[1]}'.format(r))
|
|
|
|
|
|
@categories.command('tidy')
|
|
def tidy_categories():
|
|
""" Remove categories with no activities """
|
|
|
|
sql = 'SELECT categories.id, categories.name FROM categories LEFT JOIN activities ON categories.id = activities.category_id WHERE activities.id IS NULL'
|
|
categories = c.execute(sql).fetchall()
|
|
|
|
click.echo('Found {0} empty categories:'.format(len(categories)))
|
|
|
|
for cat in categories:
|
|
click.echo('@{0[0]}: {0[1]}'.format(cat))
|
|
|
|
click.confirm('Do you want to continue?', abort=True)
|
|
|
|
sql = 'DELETE FROM categories '
|
|
|
|
sql = sql + 'WHERE id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(categories))
|
|
)
|
|
|
|
c.execute(sql, [cat[0] for cat in categories])
|
|
conn.commit()
|
|
|
|
|
|
@cli.group()
|
|
def activities():
|
|
pass
|
|
|
|
|
|
@activities.command('list')
|
|
@click.option('--search', help='Search string')
|
|
@click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output')
|
|
def list_activities(search, csv_output):
|
|
""" List / search activities """
|
|
|
|
results = get_activities(search=search)
|
|
|
|
results.sort(key=lambda t: (t[2], t[1]))
|
|
|
|
if csv_output:
|
|
csv_writer = csv.writer(sys.stdout)
|
|
|
|
for r in results:
|
|
if csv_output:
|
|
csv_writer.writerow([r[3], r[2], r[0], r[1]])
|
|
else:
|
|
click.echo('@{0[3]}: {0[2]} » {0[0]}: {0[1]}'.format(r))
|
|
|
|
|
|
@activities.command('delete')
|
|
@click.argument('ids', nargs=-1)
|
|
def delete_activities(ids):
|
|
""" Delete activities specified by IDS """
|
|
|
|
results = get_activities(ids)
|
|
|
|
click.secho('Deleting:', fg='red')
|
|
|
|
for r in results:
|
|
sql = "SELECT COUNT(id) FROM facts WHERE activity_id = ?"
|
|
count = c.execute(sql, (r[0],)).fetchone()[0]
|
|
click.echo('@{0[0]}: {0[2]} » {0[1]} ({1} facts)'.format(r, count))
|
|
|
|
click.confirm('Do you want to continue?', abort=True)
|
|
|
|
sql = 'DELETE FROM activities '
|
|
|
|
sql = sql + 'WHERE id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
|
|
c.execute(sql, ids)
|
|
conn.commit()
|
|
|
|
click.secho('Deleted {0} activities'.format(len(ids)), fg='green')
|
|
|
|
|
|
@activities.command()
|
|
@click.argument('category_id')
|
|
@click.argument('ids', nargs=-1)
|
|
def move(category_id, ids):
|
|
""" Move activities to another category """
|
|
category = get_categories((category_id,))[0]
|
|
results = get_activities(ids)
|
|
|
|
click.secho('Moving to "@{0[0]}: {0[1]}":'.format(category), fg='green')
|
|
|
|
for r in results:
|
|
click.secho('@{0[3]}: {0[2]} » @{0[0]}: {0[1]}'.format(r), fg='blue')
|
|
|
|
click.confirm('Do you want to continue?', abort=True)
|
|
|
|
sql = '''
|
|
UPDATE
|
|
activities
|
|
SET
|
|
category_id = ?
|
|
'''
|
|
|
|
sql = sql + 'WHERE id IN ({seq})'.format(
|
|
seq=','.join(['?'] * len(ids))
|
|
)
|
|
|
|
c.execute(sql, (category[0], *ids))
|
|
conn.commit()
|
|
|
|
click.secho('Moved {0} activities'.format(len(ids)), fg='green')
|
|
|
|
|
|
@activities.command()
|
|
@click.argument('ids', nargs=-1)
|
|
def list_facts(ids):
|
|
""" Show facts for activities """
|
|
|
|
results = get_activities(ids)
|
|
|
|
for r in results:
|
|
click.secho(
|
|
'@{0[0]}: {0[1]}'.format(r), fg='green'
|
|
)
|
|
|
|
sql = '''
|
|
SELECT
|
|
start_time,
|
|
activities.name
|
|
FROM
|
|
facts
|
|
LEFT JOIN
|
|
activities
|
|
ON
|
|
facts.activity_id = activities.id
|
|
WHERE
|
|
activities.id = ?
|
|
'''
|
|
|
|
results = c.execute(sql, (r[0],))
|
|
|
|
for r in results:
|
|
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
|
|
|
|
|
|
@activities.command()
|
|
@click.argument('from_id')
|
|
@click.argument('to_id')
|
|
def move_facts(from_id, to_id):
|
|
""" Move facts from one activity to another """
|
|
from_activity = get_activities((from_id,))[0]
|
|
to_activity = get_activities((to_id,))[0]
|
|
|
|
click.secho(
|
|
'Moving facts from "@{0[2]} » @{0[0]}: {0[1]}" to "@{1[2]} » @{1[0]}: {1[1]}"'.format(
|
|
from_activity, to_activity
|
|
), fg='green'
|
|
)
|
|
|
|
sql = '''
|
|
SELECT
|
|
start_time,
|
|
activities.name
|
|
FROM
|
|
facts
|
|
LEFT JOIN
|
|
activities
|
|
ON
|
|
facts.activity_id = activities.id
|
|
WHERE
|
|
activities.id = ?
|
|
'''
|
|
|
|
results = c.execute(sql, (from_id,))
|
|
|
|
for r in results:
|
|
click.secho('@{0[0]}, {0[1]}'.format(r), fg='blue')
|
|
|
|
click.confirm('Do you want to continue?', abort=True)
|
|
|
|
c.execute(
|
|
'UPDATE facts SET activity_id = ? WHERE activity_id = ?',
|
|
(to_id, from_id)
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
click.secho('Moved {0} facts'.format(results.rowcount), fg='green')
|
|
|
|
click.confirm(
|
|
'Would you like to delete @{0[2]} » @{0[0]}: {0[1]}?'.format(
|
|
from_activity),
|
|
abort=True
|
|
)
|
|
|
|
delete_activities((from_id,))
|
|
|
|
|
|
@activities.command()
|
|
def find_duplicates():
|
|
""" Show activities which are not unique in their categories """
|
|
|
|
sql = '''
|
|
SELECT
|
|
categories.id,
|
|
categories.name,
|
|
activities.id,
|
|
activities.name,
|
|
COUNT(activities.id) c
|
|
FROM
|
|
activities
|
|
LEFT JOIN
|
|
categories
|
|
ON
|
|
activities.category_id = categories.id
|
|
GROUP BY
|
|
activities.name,
|
|
activities.category_id
|
|
HAVING c > 1
|
|
'''
|
|
|
|
results = c.execute(sql)
|
|
|
|
for r in results:
|
|
click.secho(
|
|
'@{0[0]}: {0[1]} » @{0[2]}: {0[3]} ({0[4]})'.format(r), fg='blue')
|
|
|
|
|
|
@cli.group()
|
|
def kimai():
|
|
pass
|
|
|
|
|
|
def _get_kimai_mapping_file(path, category_search=None):
|
|
try:
|
|
return open(path)
|
|
except FileNotFoundError:
|
|
click.confirm(
|
|
'Mapping file {} not found, create it?:'.format(path),
|
|
abort=True
|
|
)
|
|
mapping_file = open(path, 'w')
|
|
mapping_writer = csv.writer(mapping_file)
|
|
|
|
mapping_writer.writerow([
|
|
'FROM category',
|
|
'FROM activity',
|
|
'TO Customer',
|
|
'TO Project',
|
|
'TO Activity',
|
|
'TO Tag',
|
|
'TO Note'
|
|
])
|
|
|
|
results = get_activities(category_search=category_search)
|
|
|
|
for r in results:
|
|
mapping_writer.writerow([
|
|
r[2], r[1]
|
|
])
|
|
|
|
mapping_file.close()
|
|
return open(path)
|
|
|
|
|
|
@kimai.command()
|
|
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True)
|
|
@click.argument('username')
|
|
@click.argument('api_key')
|
|
@click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors')
|
|
@click.option('--ignore-activities', is_flag=True, help='Ignore missing activities')
|
|
def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
|
|
"""
|
|
Download customer / project / activity data from Kimai
|
|
"""
|
|
|
|
kimai_api_url = 'https://kimai.autonomic.zone/api'
|
|
|
|
if type(mapping_path) == tuple:
|
|
mapping_files = []
|
|
for mapping_path_item in mapping_path:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path_item)
|
|
next(mapping_file)
|
|
mapping_files.append(mapping_file)
|
|
mapping_reader = csv.reader(chain(*mapping_files))
|
|
else:
|
|
if mapping_path is None:
|
|
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
|
|
mapping_file = _get_kimai_mapping_file(mapping_path)
|
|
mapping_reader = csv.reader(mapping_file)
|
|
|
|
next(mapping_reader)
|
|
|
|
mapping_data = [
|
|
[row[2], row[3], row[4]]
|
|
for row in mapping_reader
|
|
]
|
|
|
|
mapping_file.close()
|
|
|
|
auth_headers = {
|
|
'X-AUTH-USER': username,
|
|
'X-AUTH-TOKEN': api_key
|
|
}
|
|
|
|
customers = requests.get(
|
|
f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json()
|
|
projects = requests.get(
|
|
f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json()
|
|
activities = requests.get(
|
|
f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json()
|
|
|
|
found_customers = []
|
|
found_projects = []
|
|
found_activities = []
|
|
|
|
for row in mapping_data:
|
|
# Check if each mapping still exists in Kimai
|
|
|
|
matching_customers = list(
|
|
filter(lambda x: x['name'] == row[0], customers))
|
|
if row[0] in found_customers:
|
|
just_errors or click.secho(
|
|
"Skipping existing customer '{0}'".format(row[0]), fg='green')
|
|
else:
|
|
if len(matching_customers) > 1:
|
|
click.secho(
|
|
"More than one match for customer '{0}'".format(row[0]), fg='red')
|
|
continue
|
|
elif len(matching_customers) < 1:
|
|
click.secho("Missing customer '{0}'".format(
|
|
row[0]), fg='yellow')
|
|
continue
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found customer '{0}'".format(row[0]), fg='green')
|
|
found_customers.append(row[0])
|
|
|
|
project_str = ':'.join(row[0:2])
|
|
matching_projects = list(filter(
|
|
lambda x: x['name'] == row[1] and
|
|
x['customer'] == matching_customers[0]['id'],
|
|
projects)
|
|
)
|
|
|
|
if project_str in found_projects:
|
|
just_errors or click.secho(
|
|
"Skipping existing project '{0}'".format(project_str), fg='green')
|
|
else:
|
|
if len(matching_projects) > 1:
|
|
click.secho("More than one match for project '{0}'".format(
|
|
project_str), fg='red')
|
|
continue
|
|
elif len(matching_projects) < 1:
|
|
click.secho("Missing project '{0}'".format(
|
|
project_str), fg='yellow')
|
|
continue
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found project '{0}'".format(project_str), fg='green')
|
|
found_projects.append(project_str)
|
|
|
|
if ignore_activities:
|
|
continue
|
|
|
|
activity_str = ':'.join(row)
|
|
if activity_str in found_activities:
|
|
just_errors or click.secho(
|
|
"Skipping existing activity '{0}'".format(activity_str), fg='green')
|
|
else:
|
|
matching_activities = list(filter(
|
|
lambda x: x['name'] == row[2]
|
|
and x['project'] == matching_projects[0]['id'],
|
|
activities
|
|
))
|
|
|
|
if len(matching_activities) > 1:
|
|
click.secho("More than one match for activity '{0}'".format(
|
|
activity_str), fg='red')
|
|
elif len(matching_activities) < 1:
|
|
click.secho("Missing activity '{0}'".format(
|
|
activity_str), fg='yellow')
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found activity '{0}'".format(activity_str), fg='green')
|
|
found_activities.append(activity_str)
|
|
|
|
|
|
@kimai.command('import')
|
|
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True)
|
|
@click.option('--output', help='Output file (default kimai.csv)')
|
|
@click.option('--category-search', help='Category search string')
|
|
@click.option('--after', help='Only show time entries after this date')
|
|
@click.option('--show-missing', help='Just report on the missing entries', is_flag=True)
|
|
@click.argument('username')
|
|
def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False):
|
|
"""
|
|
Export time tracking data in Kimai format
|
|
"""
|
|
|
|
if mapping_path is None:
|
|
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
|
|
|
|
if output is None:
|
|
timestamp = datetime.now().strftime('%F')
|
|
output = f'kimai_{timestamp}.csv'
|
|
|
|
if type(mapping_path) == tuple:
|
|
mapping_files = []
|
|
for mapping_path_item in mapping_path:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
|
|
next(mapping_file)
|
|
mapping_files.append(mapping_file)
|
|
mapping_reader = csv.reader(chain(*mapping_files))
|
|
else:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
|
|
next(mapping_file)
|
|
mapping_reader = csv.reader(mapping_file)
|
|
|
|
mapping = {
|
|
'{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
|
|
for row in mapping_reader
|
|
}
|
|
|
|
if type(mapping_path) == tuple:
|
|
for mapping_file in mapping_files:
|
|
mapping_file.close()
|
|
else:
|
|
mapping_file.close()
|
|
|
|
output_file = open(output, 'w')
|
|
output_writer = csv.writer(output_file)
|
|
|
|
args = []
|
|
|
|
sql = '''
|
|
SELECT
|
|
facts.id, facts.start_time, facts.end_time, facts.description,
|
|
activities.id, activities.name,
|
|
categories.name, categories.id
|
|
FROM
|
|
facts
|
|
LEFT JOIN
|
|
activities ON facts.activity_id = activities.id
|
|
LEFT JOIN
|
|
categories ON activities.category_id = categories.id '''
|
|
|
|
if category_search is not None:
|
|
sql = sql + " WHERE categories.name LIKE ?"
|
|
category_search = '%{0}%'.format(category_search)
|
|
args.append(category_search)
|
|
|
|
if after is not None:
|
|
if category_search is not None:
|
|
sql = sql + ' AND '
|
|
else:
|
|
sql = sql + ' WHERE '
|
|
sql = sql + f"DATE(facts.start_time) > DATE(?)"
|
|
args.append(after)
|
|
|
|
results = c.execute(sql, args)
|
|
results = c.fetchall()
|
|
|
|
if not show_missing:
|
|
output_writer.writerow([
|
|
"Date",
|
|
"From",
|
|
"To",
|
|
"Duration",
|
|
"Rate",
|
|
"User",
|
|
"Customer",
|
|
"Project",
|
|
"Activity",
|
|
"Description",
|
|
"Exported",
|
|
"Tags",
|
|
"HourlyRate",
|
|
"FixedRate",
|
|
"InternalRate"
|
|
])
|
|
|
|
for fact in results:
|
|
k = '{0}:{1}'.format(fact[6], fact[5])
|
|
try:
|
|
mapping_ = mapping[k]
|
|
except KeyError:
|
|
if show_missing:
|
|
output_writer.writerow([fact[6], fact[5]])
|
|
click.secho(
|
|
"Can't find mapping for '{0}', skipping".format(k), fg='yellow')
|
|
continue
|
|
|
|
if show_missing:
|
|
continue
|
|
|
|
if fact[1] is None or fact[2] is None:
|
|
click.secho("Missing duration data '{0}-{1}', skipping".format(
|
|
fact[1],
|
|
fact[2]
|
|
), fg='yellow')
|
|
continue
|
|
|
|
if len(mapping_) < 5:
|
|
mapping_.append(None)
|
|
|
|
date_start, date_end = (
|
|
datetime.strptime(fact[2].split('.')[0], '%Y-%m-%d %H:%M:%S'),
|
|
datetime.strptime(fact[1].split('.')[0], '%Y-%m-%d %H:%M:%S')
|
|
)
|
|
duration = (
|
|
date_start - date_end
|
|
).seconds / 3600
|
|
|
|
output_writer.writerow([
|
|
date_start.strftime('%Y-%m-%d'),
|
|
date_start.strftime('%H:%M'),
|
|
'', # To (time)
|
|
duration,
|
|
'', # Rate
|
|
username,
|
|
mapping_[0],
|
|
mapping_[1],
|
|
mapping_[2],
|
|
fact[3] or mapping_[4] or '',
|
|
'0', # Exported
|
|
mapping_[3],
|
|
'', # Hourly rate
|
|
'', # Fixed rate
|
|
])
|
|
|
|
output_file.close()
|
|
|
|
|
|
@cli.command()
|
|
def app():
|
|
from .app import HamsterToolsApp
|
|
#app = HamsterToolsApp(db_cursor=c, db_connection=conn)
|
|
app = HamsterToolsApp()
|
|
app.run()
|
|
|
|
|
|
@cli.command()
|
|
def hamster():
|
|
click.echo('🐹')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|