Many changes to Kimai stuff:

- Move `import kimai` to `kimai import`
 - Add `kimai sync` to check which Customers / Projects / Activities are
   missing, using the API
 - Switch to duration in hours instead of seconds
 - Add `--after` option to `kimai import` to limit time records by date
This commit is contained in:
3wc 2021-04-06 11:43:28 +02:00
parent b345f27583
commit 78c243fc8b
1 changed files with 179 additions and 63 deletions

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3.7 #!/usr/bin/env python3.7
import sys
import click
import csv import csv
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import sys
import click
import requests
import sqlite3 import sqlite3
HAMSTER_DIR = Path.home() / '.local/share/hamster-applet' HAMSTER_DIR = Path.home() / '.local/share/hamster-applet'
@ -16,9 +17,9 @@ c = conn.cursor()
def get_categories(ids=None, search=None): def get_categories(ids=None, search=None):
sql = ''' sql = '''
SELECT SELECT
id, name id, name
FROM FROM
categories ''' categories '''
args = [] args = []
@ -43,9 +44,9 @@ def get_categories(ids=None, search=None):
def get_activities(ids=None, search=None, category_search=None): def get_activities(ids=None, search=None, category_search=None):
sql = ''' sql = '''
SELECT SELECT
activities.id, activities.name, categories.name, categories.id activities.id, activities.name, categories.name, categories.id
FROM FROM
activities activities
LEFT JOIN LEFT JOIN
categories categories
@ -84,7 +85,7 @@ def cli():
@cli.group() @cli.group()
def categories(): def categories():
pass pass
@categories.command('list') @categories.command('list')
@click.option('--search', help='Search string') @click.option('--search', help='Search string')
@ -128,15 +129,15 @@ def delete_categories(ids):
def list_category_activities(ids): def list_category_activities(ids):
""" Show activities for categories specified by IDS """ """ Show activities for categories specified by IDS """
sql = ''' sql = '''
SELECT SELECT
activities.id, activities.name, categories.name activities.id, activities.name, categories.name
FROM FROM
activities activities
LEFT JOIN LEFT JOIN
categories categories
ON ON
activities.category_id = categories.id activities.category_id = categories.id
WHERE WHERE
categories.id IN ({seq}) categories.id IN ({seq})
'''.format( '''.format(
seq=','.join(['?'] * len(ids)) seq=','.join(['?'] * len(ids))
@ -324,9 +325,9 @@ def find_duplicates():
categories categories
ON ON
activities.category_id = categories.id activities.category_id = categories.id
GROUP BY GROUP BY
activities.name, activities.name,
activities.category_id activities.category_id
HAVING c > 1 HAVING c > 1
''' '''
@ -337,34 +338,19 @@ def find_duplicates():
@cli.group() @cli.group()
def export(): def kimai():
pass pass
@export.command() def _get_kimai_mapping_file(path):
@click.option('--mapping', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)')
@click.option('--output', help='Output file (default kimai.csv)')
@click.option('--category-search', help='Category search string')
@click.argument('username')
def kimai(username, mapping=None, output=None, category_search=None):
"""
Export time tracking data in Kimai format
"""
if mapping is None:
mapping = HAMSTER_DIR / 'mapping.kimai.csv'
if output is None:
output = 'kimai.csv'
try: try:
mapping_file = open(mapping) return open(path)
mapping_reader = csv.reader(mapping_file)
except FileNotFoundError: except FileNotFoundError:
click.confirm( click.confirm(
'Mapping file {} not found, create it?:'.format(mapping), 'Mapping file {} not found, create it?:'.format(path),
abort=True abort=True
) )
mapping_file = open(mapping, 'w') mapping_file = open(path, 'w')
mapping_writer = csv.writer(mapping_file) mapping_writer = csv.writer(mapping_file)
mapping_writer.writerow([ mapping_writer.writerow([
@ -383,12 +369,128 @@ def kimai(username, mapping=None, output=None, category_search=None):
]) ])
mapping_file.close() mapping_file.close()
return return open(path)
@kimai.command()
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)')
@click.argument('username')
@click.argument('api_key')
def sync(username, api_key, mapping_path=None):
"""
Download customer / project / activity data from Kimai
"""
kimai_api_url = 'https://kimai.autonomic.zone/api'
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file)
next(mapping_reader)
mapping_data = [
[row[2], row[3], row[4]]
for row in mapping_reader
]
mapping_file.close()
auth_headers = {
'X-AUTH-USER': username,
'X-AUTH-TOKEN': api_key
}
customers = requests.get(f'{kimai_api_url}/customers', headers=auth_headers).json()
projects = requests.get(f'{kimai_api_url}/projects', headers=auth_headers).json()
activities = requests.get(f'{kimai_api_url}/activities', headers=auth_headers).json()
found_customers = []
found_projects = []
found_activities = []
for row in mapping_data:
# Check if each mapping still exists in Kimai
matching_customers = list(filter(lambda x: x['name'] == row[0], customers))
if row[0] in found_customers:
click.secho("Skipping existing customer '{0}'".format(row[0]), fg='green')
else:
if len(matching_customers) > 1:
click.secho("More than one match for customer '{0}'".format(row[0]), fg='red')
continue
elif len(matching_customers) < 1:
click.secho("Missing customer '{0}'".format(row[0]), fg='yellow')
continue
else:
click.secho("Found customer '{0}'".format(row[0]), fg='green')
found_customers.append(row[0])
project_str = ':'.join(row[0:2])
matching_projects = list(filter(
lambda x: x['name'] == row[1] and
x['customer'] == matching_customers[0]['id'],
projects)
)
if project_str in found_projects:
click.secho("Skipping existing project '{0}'".format(project_str), fg='green')
else:
if len(matching_projects) > 1:
click.secho("More than one match for project '{0}'".format(project_str), fg='red')
continue
elif len(matching_projects) < 1:
click.secho("Missing project '{0}'".format(project_str), fg='yellow')
continue
else:
click.secho("Found project '{0}'".format(project_str), fg='green')
found_projects.append(project_str)
activity_str = ':'.join(row)
if activity_str in found_activities:
click.secho("Skipping existing activity '{0}'".format(activity_str), fg='green')
else:
matching_activities = list(filter(
lambda x: x['name'] == row[2]
and x['project'] == matching_projects[0]['id'],
activities
))
if len(matching_activities) > 1:
click.secho("More than one match for activity '{0}'".format(activity_str), fg='red')
elif len(matching_activities) < 1:
click.secho("Missing activity '{0}'".format(activity_str), fg='yellow')
else:
click.secho("Found activity '{0}'".format(activity_str), fg='green')
found_activities.append(activity_str)
@kimai.command('import')
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)')
@click.option('--output', help='Output file (default kimai.csv)')
@click.option('--category-search', help='Category search string')
@click.option('--after', help='Only show time entries after this date')
@click.option('--show-missing', help='Just report on the missing entries', is_flag=True)
@click.argument('username')
def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv'
if output is None:
timestamp = datetime.now().strftime('%F')
output = f'kimai_{timestamp}.csv'
mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file)
next(mapping_reader) next(mapping_reader)
mapping = { mapping = {
'{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4]] '{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4]]
for row in mapping_reader for row in mapping_reader
} }
@ -400,15 +502,15 @@ def kimai(username, mapping=None, output=None, category_search=None):
args = [] args = []
sql = ''' sql = '''
SELECT SELECT
facts.id, facts.start_time, facts.end_time, facts.description, facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name, activities.id, activities.name,
categories.name, categories.id categories.name, categories.id
FROM FROM
facts facts
LEFT JOIN LEFT JOIN
activities ON facts.activity_id = activities.id activities ON facts.activity_id = activities.id
LEFT JOIN LEFT JOIN
categories ON activities.category_id = categories.id ''' categories ON activities.category_id = categories.id '''
if category_search is not None: if category_search is not None:
@ -416,34 +518,48 @@ def kimai(username, mapping=None, output=None, category_search=None):
category_search = '%{0}%'.format(category_search) category_search = '%{0}%'.format(category_search)
args.append(category_search) args.append(category_search)
if after is not None:
if category_search is not None:
sql = sql + ' AND '
else:
sql = sql + ' WHERE '
sql = sql + f"DATE(facts.start_time) > DATE(?)"
args.append(after)
results = c.execute(sql, args) results = c.execute(sql, args)
results = c.fetchall() results = c.fetchall()
output_writer.writerow([ if not show_missing:
"Date", output_writer.writerow([
"From", "Date",
"To", "From",
"Duration", "To",
"Rate", "Duration",
"User", "Rate",
"Customer", "User",
"Project", "Customer",
"Activity", "Project",
"Description", "Activity",
"Exported", "Description",
"Tags", "Exported",
"Hourly rate", "Tags",
"Fixed rate" "Hourly rate",
]) "Fixed rate"
])
for fact in results: for fact in results:
k = '{0}:{1}'.format(fact[6], fact[5]) k = '{0}:{1}'.format(fact[6], fact[5])
try: try:
mapping_ = mapping[k] mapping_ = mapping[k]
except KeyError: except KeyError:
if show_missing:
output_writer.writerow([fact[6], fact[5]])
click.secho("Can't find mapping for '{0}', skipping".format(k), fg='yellow') click.secho("Can't find mapping for '{0}', skipping".format(k), fg='yellow')
continue continue
if show_missing:
continue
if fact[1] is None or fact[2] is None: if fact[1] is None or fact[2] is None:
click.secho("Missing duration data '{0}-{1}', skipping".format( click.secho("Missing duration data '{0}-{1}', skipping".format(
fact[1], fact[1],
@ -457,7 +573,7 @@ def kimai(username, mapping=None, output=None, category_search=None):
) )
duration = ( duration = (
date_start - date_end date_start - date_end
).seconds ).seconds / 3600
output_writer.writerow([ output_writer.writerow([
date_start.strftime('%Y-%m-%d'), date_start.strftime('%Y-%m-%d'),