black reformat

This commit is contained in:
3wc 2023-10-29 13:40:03 +00:00
parent ccbbc80116
commit 6b8b4c380e
3 changed files with 369 additions and 303 deletions

View File

@ -9,11 +9,20 @@ import click
import requests import requests
from peewee import fn, JOIN from peewee import fn, JOIN
from .db import db, HamsterCategory, HamsterActivity, HamsterFact, KimaiCustomer, KimaiProject, KimaiActivity, HamsterKimaiMapping from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiCustomer,
KimaiProject,
KimaiActivity,
HamsterKimaiMapping,
)
HAMSTER_DIR = Path.home() / '.local/share/hamster' HAMSTER_DIR = Path.home() / ".local/share/hamster"
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db' # HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
HAMSTER_FILE = 'hamster-testing.db' HAMSTER_FILE = "hamster-testing.db"
db.init(HAMSTER_FILE) db.init(HAMSTER_FILE)
@ -28,8 +37,8 @@ def categories():
pass pass
@categories.command('list') @categories.command("list")
@click.option('--search', help='Search string') @click.option("--search", help="Search string")
def list_categories(search): def list_categories(search):
"""List / search categories""" """List / search categories"""
categories = HamsterCategory.select() categories = HamsterCategory.select()
@ -38,33 +47,37 @@ def list_categories(search):
categories = categories.where(HamsterCategory.name.contains(search)) categories = categories.where(HamsterCategory.name.contains(search))
for c in categories: for c in categories:
click.echo(f'@{c.id}: {c.name}') click.echo(f"@{c.id}: {c.name}")
@categories.command('delete') @categories.command("delete")
@click.argument('ids', nargs=-1) @click.argument("ids", nargs=-1)
def delete_categories(ids): def delete_categories(ids):
"""Delete categories specified by IDS""" """Delete categories specified by IDS"""
click.secho('Deleting:', fg='red') click.secho("Deleting:", fg="red")
categories = HamsterCategory.select( categories = (
HamsterCategory, HamsterCategory.select(
fn.Count(HamsterActivity.id).alias("activities_count") HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count")
).join(HamsterActivity, JOIN.LEFT_OUTER).group_by(HamsterCategory).where(HamsterCategory.id.in_(ids)) )
.join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
.where(HamsterCategory.id.in_(ids))
)
for c in categories: for c in categories:
click.echo(f'@{c.id}: {c.name} ({c.activities_count} activities)') click.echo(f"@{c.id}: {c.name} ({c.activities_count} activities)")
click.confirm('Do you want to continue?', abort=True) click.confirm("Do you want to continue?", abort=True)
count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute() count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute()
click.secho('Deleted {0} categories'.format(count), fg='green') click.secho("Deleted {0} categories".format(count), fg="green")
@categories.command('rename') @categories.command("rename")
@click.argument('id_', metavar='ID') @click.argument("id_", metavar="ID")
@click.argument('name') @click.argument("name")
def rename_category(id_, name): def rename_category(id_, name):
"""Rename a category""" """Rename a category"""
@ -76,46 +89,46 @@ def rename_category(id_, name):
category.save() category.save()
@categories.command('activities') @categories.command("activities")
@click.argument('ids', nargs=-1) @click.argument("ids", nargs=-1)
def list_category_activities(ids): def list_category_activities(ids):
"""Show activities for categories specified by ids""" """Show activities for categories specified by ids"""
activities = HamsterActivity.select( activities = (
HamsterActivity, HamsterActivity.select(HamsterActivity, HamsterCategory.name)
HamsterCategory.name .join(HamsterCategory, JOIN.LEFT_OUTER)
).join(HamsterCategory, JOIN.LEFT_OUTER).where(HamsterCategory.id.in_(ids)) .where(HamsterCategory.id.in_(ids))
)
for a in activities: for a in activities:
click.echo(f'@{a.id}: {a.category.name} » {a.name}') click.echo(f"@{a.id}: {a.category.name} » {a.name}")
@categories.command("tidy")
@categories.command('tidy')
def tidy_categories(): def tidy_categories():
"""Remove categories with no activities""" """Remove categories with no activities"""
subquery = ( subquery = (
HamsterCategory HamsterCategory.select(
.select(HamsterCategory, fn.COUNT(HamsterActivity.id).alias('activities_count')) HamsterCategory, fn.COUNT(HamsterActivity.id).alias("activities_count")
)
.join(HamsterActivity, JOIN.LEFT_OUTER) .join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory) .group_by(HamsterCategory)
.alias('subquery') .alias("subquery")
) )
categories = ( categories = (
HamsterCategory HamsterCategory.select()
.select()
.join(subquery, on=(HamsterCategory.id == subquery.c.id)) .join(subquery, on=(HamsterCategory.id == subquery.c.id))
.where(subquery.c.activities_count == 0) .where(subquery.c.activities_count == 0)
) )
click.echo('Found {0} empty categories:'.format(categories.count())) click.echo("Found {0} empty categories:".format(categories.count()))
for cat in categories: for cat in categories:
click.echo(f'@{cat.id}: {cat.name}') click.echo(f"@{cat.id}: {cat.name}")
click.confirm('Do you want to continue?', abort=True) click.confirm("Do you want to continue?", abort=True)
[cat.delete_instance() for cat in categories] [cat.delete_instance() for cat in categories]
@ -125,18 +138,16 @@ def activities():
pass pass
@activities.command('list') @activities.command("list")
@click.option('--search', help='Search string') @click.option("--search", help="Search string")
@click.option('--csv/--no-csv', 'csv_output', default=False, help='CSV output') @click.option("--csv/--no-csv", "csv_output", default=False, help="CSV output")
def list_activities(search, csv_output): def list_activities(search, csv_output):
"""List / search activities""" """List / search activities"""
activities = HamsterActivity.select( activities = (
HamsterActivity, HamsterActivity.select(HamsterActivity, HamsterCategory)
HamsterCategory .join(HamsterCategory, JOIN.LEFT_OUTER)
).join(HamsterCategory, JOIN.LEFT_OUTER).order_by( .order_by(HamsterCategory.name, HamsterActivity.name)
HamsterCategory.name,
HamsterActivity.name
) )
if search is not None: if search is not None:
@ -148,109 +159,114 @@ def list_activities(search, csv_output):
for a in activities: for a in activities:
category_name = a.category.name if a.category_id != -1 else "" category_name = a.category.name if a.category_id != -1 else ""
if csv_output: if csv_output:
csv_writer.writerow([ csv_writer.writerow([a.category_id, category_name, a.id, a.name])
a.category_id,
category_name,
a.id,
a.name
])
else: else:
click.echo(f'@{a.category_id}: {category_name} » {a.id}: {a.name}') click.echo(f"@{a.category_id}: {category_name} » {a.id}: {a.name}")
@activities.command('delete') @activities.command("delete")
@click.argument('ids', nargs=-1) @click.argument("ids", nargs=-1)
def delete_activities(ids): def delete_activities(ids):
"""Delete activities specified by IDS""" """Delete activities specified by IDS"""
activities = HamsterActivity.select( activities = (
HamsterActivity.select(
HamsterActivity, HamsterActivity,
HamsterCategory.name, HamsterCategory.name,
fn.Count(HamsterFact.id).alias("facts_count") fn.Count(HamsterFact.id).alias("facts_count"),
).join(HamsterCategory, )
JOIN.LEFT_OUTER).switch(HamsterActivity).join(HamsterFact, .join(HamsterCategory, JOIN.LEFT_OUTER)
JOIN.LEFT_OUTER).group_by(HamsterActivity).where( .switch(HamsterActivity)
HamsterActivity.id.in_(ids) .join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterActivity)
.where(HamsterActivity.id.in_(ids))
) )
click.secho('Deleting:', fg='red') click.secho("Deleting:", fg="red")
for a in activities: for a in activities:
category_name = a.category.name if a.category_id != -1 else "" category_name = a.category.name if a.category_id != -1 else ""
click.echo(f'@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)') click.echo(f"@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)")
click.confirm('Do you want to continue?', abort=True) click.confirm("Do you want to continue?", abort=True)
[a.delete_instance() for a in activities] [a.delete_instance() for a in activities]
click.secho('Deleted {0} activities'.format(len(ids)), fg='green') click.secho("Deleted {0} activities".format(len(ids)), fg="green")
@activities.command() @activities.command()
@click.argument('category_id') @click.argument("category_id")
@click.argument('ids', nargs=-1) @click.argument("ids", nargs=-1)
def move(category_id, ids): def move(category_id, ids):
"""Move activities to another category""" """Move activities to another category"""
category = HamsterCategory.get(id=category_id) category = HamsterCategory.get(id=category_id)
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
click.secho(f'Moving to "@{category.id}: {category.name}":', fg='green') click.secho(f'Moving to "@{category.id}: {category.name}":', fg="green")
for a in activities: for a in activities:
category_name = a.category.name if a.category_id != -1 else "" category_name = a.category.name if a.category_id != -1 else ""
click.secho(f'@{a.category_id}: {category_name} » @{a.id}: {a.name}', fg='blue') click.secho(f"@{a.category_id}: {category_name} » @{a.id}: {a.name}", fg="blue")
click.confirm('Do you want to continue?', abort=True) click.confirm("Do you want to continue?", abort=True)
for a in activities: for a in activities:
a.category = category a.category = category
a.save() a.save()
click.secho('Moved {0} activities'.format(len(ids)), fg='green') click.secho("Moved {0} activities".format(len(ids)), fg="green")
@activities.command() @activities.command()
@click.argument('ids', nargs=-1) @click.argument("ids", nargs=-1)
def list_facts(ids): def list_facts(ids):
"""Show facts for activities""" """Show facts for activities"""
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids)) activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
for a in activities: for a in activities:
click.secho( click.secho(f"@{a.id}: {a.name}", fg="green")
f'@{a.id}: {a.name}', fg='green'
)
for f in a.facts: for f in a.facts:
click.secho(f'@{f.id}, {f.start_time}', fg='blue') click.secho(f"@{f.id}, {f.start_time}", fg="blue")
@activities.command() @activities.command()
@click.argument('from_id') @click.argument("from_id")
@click.argument('to_id') @click.argument("to_id")
def move_facts(from_id, to_id): def move_facts(from_id, to_id):
"""Move facts from one activity to another""" """Move facts from one activity to another"""
from_activity = HamsterActivity.get(id=from_id) from_activity = HamsterActivity.get(id=from_id)
to_activity = HamsterActivity.get(id=to_id) to_activity = HamsterActivity.get(id=to_id)
from_category_name = from_activity.category.name if from_activity.category_id != -1 else "" from_category_name = (
to_category_name = to_activity.category.name if to_activity.category_id != -1 else "" from_activity.category.name if from_activity.category_id != -1 else ""
)
to_category_name = (
to_activity.category.name if to_activity.category_id != -1 else ""
)
click.secho( click.secho(
f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"', fg='green' f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"',
fg="green",
) )
for f in from_activity.facts: for f in from_activity.facts:
click.secho(f'@{f.id}, {f.start_time}', fg='blue') click.secho(f"@{f.id}, {f.start_time}", fg="blue")
click.confirm('Do you want to continue?', abort=True) click.confirm("Do you want to continue?", abort=True)
count = HamsterFact.update(activity_id=to_activity.id).where(HamsterFact.activity == from_activity).execute() count = (
HamsterFact.update(activity_id=to_activity.id)
.where(HamsterFact.activity == from_activity)
.execute()
)
click.secho('Moved {0} facts'.format(count), fg='green') click.secho("Moved {0} facts".format(count), fg="green")
click.confirm( click.confirm(
f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?', f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?',
abort=True abort=True,
) )
from_activity.delete_instance() from_activity.delete_instance()
@ -261,11 +277,10 @@ def find_duplicates():
"""Show activities which are not unique in their categories""" """Show activities which are not unique in their categories"""
non_unique_activities = ( non_unique_activities = (
HamsterActivity HamsterActivity.select(
.select(
HamsterActivity, HamsterActivity,
HamsterCategory.id, HamsterCategory.id,
fn.COALESCE(HamsterCategory.name, 'None').alias("category_name") fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
) )
.join(HamsterCategory, JOIN.LEFT_OUTER) .join(HamsterCategory, JOIN.LEFT_OUTER)
.group_by(HamsterActivity.category_id, HamsterActivity.name) .group_by(HamsterActivity.category_id, HamsterActivity.name)
@ -274,7 +289,9 @@ def find_duplicates():
for activity in non_unique_activities: for activity in non_unique_activities:
click.secho( click.secho(
f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}", fg='blue') f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}",
fg="blue",
)
@cli.group() @cli.group()
@ -286,50 +303,51 @@ def _get_kimai_mapping_file(path, category_search=None):
try: try:
return open(path) return open(path)
except FileNotFoundError: except FileNotFoundError:
click.confirm( click.confirm("Mapping file {} not found, create it?:".format(path), abort=True)
'Mapping file {} not found, create it?:'.format(path), mapping_file = open(path, "w")
abort=True
)
mapping_file = open(path, 'w')
mapping_writer = csv.writer(mapping_file) mapping_writer = csv.writer(mapping_file)
mapping_writer.writerow([ mapping_writer.writerow(
'FROM category', [
'FROM activity', "FROM category",
'TO Customer', "FROM activity",
'TO Project', "TO Customer",
'TO Activity', "TO Project",
'TO Tag', "TO Activity",
'TO Note' "TO Tag",
]) "TO Note",
]
)
activities = HamsterActivity.select( activities = HamsterActivity.select(HamsterActivity, HamsterCategory).join(
HamsterActivity, HamsterCategory, JOIN.LEFT_OUTER
HamsterCategory )
).join(HamsterCategory, JOIN.LEFT_OUTER)
for a in activities: for a in activities:
mapping_writer.writerow([ mapping_writer.writerow(
a.category.name if a.category_id != -1 else "", [a.category.name if a.category_id != -1 else "", a.name]
a.name )
])
mapping_file.close() mapping_file.close()
return open(path) return open(path)
@kimai.command() @kimai.command()
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) @click.option(
@click.argument('username') "--mapping-path",
@click.argument('api_key') help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
@click.option('--just-errors', 'just_errors', is_flag=True, help='Only display errors') multiple=True,
@click.option('--ignore-activities', is_flag=True, help='Ignore missing activities') )
@click.argument("username")
@click.argument("api_key")
@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors")
@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities")
def sync(username, api_key, just_errors, ignore_activities, mapping_path=None): def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
""" """
Download customer / project / activity data from Kimai Download customer / project / activity data from Kimai
""" """
kimai_api_url = 'https://kimai.autonomic.zone/api' kimai_api_url = "https://kimai.autonomic.zone/api"
if type(mapping_path) == tuple: if type(mapping_path) == tuple:
mapping_files = [] mapping_files = []
@ -340,30 +358,27 @@ def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
mapping_reader = csv.reader(chain(*mapping_files)) mapping_reader = csv.reader(chain(*mapping_files))
else: else:
if mapping_path is None: if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
mapping_file = _get_kimai_mapping_file(mapping_path) mapping_file = _get_kimai_mapping_file(mapping_path)
mapping_reader = csv.reader(mapping_file) mapping_reader = csv.reader(mapping_file)
next(mapping_reader) next(mapping_reader)
mapping_data = [ mapping_data = [[row[2], row[3], row[4]] for row in mapping_reader]
[row[2], row[3], row[4]]
for row in mapping_reader
]
mapping_file.close() mapping_file.close()
auth_headers = { auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key}
'X-AUTH-USER': username,
'X-AUTH-TOKEN': api_key
}
customers = requests.get( customers = requests.get(
f'{kimai_api_url}/customers?visible=3', headers=auth_headers).json() f"{kimai_api_url}/customers?visible=3", headers=auth_headers
).json()
projects = requests.get( projects = requests.get(
f'{kimai_api_url}/projects?visible=3', headers=auth_headers).json() f"{kimai_api_url}/projects?visible=3", headers=auth_headers
).json()
activities = requests.get( activities = requests.get(
f'{kimai_api_url}/activities?visible=3', headers=auth_headers).json() f"{kimai_api_url}/activities?visible=3", headers=auth_headers
).json()
found_customers = [] found_customers = []
found_projects = [] found_projects = []
@ -372,93 +387,115 @@ def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
for row in mapping_data: for row in mapping_data:
# Check if each mapping still exists in Kimai # Check if each mapping still exists in Kimai
matching_customers = list( matching_customers = list(filter(lambda x: x["name"] == row[0], customers))
filter(lambda x: x['name'] == row[0], customers))
if row[0] in found_customers: if row[0] in found_customers:
just_errors or click.secho( just_errors or click.secho(
"Skipping existing customer '{0}'".format(row[0]), fg='green') "Skipping existing customer '{0}'".format(row[0]), fg="green"
)
else: else:
if len(matching_customers) > 1: if len(matching_customers) > 1:
click.secho( click.secho(
"More than one match for customer '{0}'".format(row[0]), fg='red') "More than one match for customer '{0}'".format(row[0]), fg="red"
)
continue continue
elif len(matching_customers) < 1: elif len(matching_customers) < 1:
click.secho("Missing customer '{0}'".format( click.secho("Missing customer '{0}'".format(row[0]), fg="yellow")
row[0]), fg='yellow')
continue continue
else: else:
just_errors or click.secho( just_errors or click.secho(
"Found customer '{0}'".format(row[0]), fg='green') "Found customer '{0}'".format(row[0]), fg="green"
)
found_customers.append(row[0]) found_customers.append(row[0])
project_str = ':'.join(row[0:2]) project_str = ":".join(row[0:2])
matching_projects = list(filter( matching_projects = list(
lambda x: x['name'] == row[1] and filter(
x['customer'] == matching_customers[0]['id'], lambda x: x["name"] == row[1]
projects) and x["customer"] == matching_customers[0]["id"],
projects,
)
) )
if project_str in found_projects: if project_str in found_projects:
just_errors or click.secho( just_errors or click.secho(
"Skipping existing project '{0}'".format(project_str), fg='green') "Skipping existing project '{0}'".format(project_str), fg="green"
)
else: else:
if len(matching_projects) > 1: if len(matching_projects) > 1:
click.secho("More than one match for project '{0}'".format( click.secho(
project_str), fg='red') "More than one match for project '{0}'".format(project_str),
fg="red",
)
continue continue
elif len(matching_projects) < 1: elif len(matching_projects) < 1:
click.secho("Missing project '{0}'".format( click.secho("Missing project '{0}'".format(project_str), fg="yellow")
project_str), fg='yellow')
continue continue
else: else:
just_errors or click.secho( just_errors or click.secho(
"Found project '{0}'".format(project_str), fg='green') "Found project '{0}'".format(project_str), fg="green"
)
found_projects.append(project_str) found_projects.append(project_str)
if ignore_activities: if ignore_activities:
continue continue
activity_str = ':'.join(row) activity_str = ":".join(row)
if activity_str in found_activities: if activity_str in found_activities:
just_errors or click.secho( just_errors or click.secho(
"Skipping existing activity '{0}'".format(activity_str), fg='green') "Skipping existing activity '{0}'".format(activity_str), fg="green"
)
else: else:
matching_activities = list(filter( matching_activities = list(
lambda x: x['name'] == row[2] filter(
and x['project'] == matching_projects[0]['id'], lambda x: x["name"] == row[2]
activities and x["project"] == matching_projects[0]["id"],
)) activities,
)
)
if len(matching_activities) > 1: if len(matching_activities) > 1:
click.secho("More than one match for activity '{0}'".format( click.secho(
activity_str), fg='red') "More than one match for activity '{0}'".format(activity_str),
fg="red",
)
elif len(matching_activities) < 1: elif len(matching_activities) < 1:
click.secho("Missing activity '{0}'".format( click.secho("Missing activity '{0}'".format(activity_str), fg="yellow")
activity_str), fg='yellow')
else: else:
just_errors or click.secho( just_errors or click.secho(
"Found activity '{0}'".format(activity_str), fg='green') "Found activity '{0}'".format(activity_str), fg="green"
)
found_activities.append(activity_str) found_activities.append(activity_str)
@kimai.command('import') @kimai.command("import")
@click.option('--mapping-path', help='Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)', multiple=True) @click.option(
@click.option('--output', help='Output file (default kimai.csv)') "--mapping-path",
@click.option('--category-search', help='Category search string') help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
@click.option('--after', help='Only show time entries after this date') multiple=True,
@click.option('--show-missing', help='Just report on the missing entries', is_flag=True) )
@click.argument('username') @click.option("--output", help="Output file (default kimai.csv)")
def _import(username, mapping_path=None, output=None, category_search=None, after=None, show_missing=False): @click.option("--category-search", help="Category search string")
@click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username")
def _import(
username,
mapping_path=None,
output=None,
category_search=None,
after=None,
show_missing=False,
):
""" """
Export time tracking data in Kimai format Export time tracking data in Kimai format
""" """
if mapping_path is None: if mapping_path is None:
mapping_path = HAMSTER_DIR / 'mapping.kimai.csv' mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
if output is None: if output is None:
timestamp = datetime.now().strftime('%F') timestamp = datetime.now().strftime("%F")
output = f'kimai_{timestamp}.csv' output = f"kimai_{timestamp}.csv"
if type(mapping_path) == tuple: if type(mapping_path) == tuple:
mapping_files = [] mapping_files = []
@ -473,7 +510,7 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
mapping_reader = csv.reader(mapping_file) mapping_reader = csv.reader(mapping_file)
mapping = { mapping = {
'{0}:{1}'.format(row[0], row[1]): [row[2], row[3], row[4], row[5]] "{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
for row in mapping_reader for row in mapping_reader
} }
@ -483,18 +520,18 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
else: else:
mapping_file.close() mapping_file.close()
output_file = open(output, 'w') output_file = open(output, "w")
output_writer = csv.writer(output_file) output_writer = csv.writer(output_file)
args = [] args = []
facts = HamsterFact.select( facts = (
HamsterFact, HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
HamsterActivity, .join(HamsterActivity)
HamsterCategory .join(HamsterCategory, JOIN.LEFT_OUTER)
).join(HamsterActivity).join(HamsterCategory, JOIN.LEFT_OUTER) )
sql = ''' sql = """
SELECT SELECT
facts.id, facts.start_time, facts.end_time, facts.description, facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name, activities.id, activities.name,
@ -504,7 +541,7 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
LEFT JOIN LEFT JOIN
activities ON facts.activity_id = activities.id activities ON facts.activity_id = activities.id
LEFT JOIN LEFT JOIN
categories ON activities.category_id = categories.id ''' categories ON activities.category_id = categories.id """
if category_search is not None: if category_search is not None:
facts = facts.where(HamsterCategory.name.contains(category_search)) facts = facts.where(HamsterCategory.name.contains(category_search))
@ -513,7 +550,8 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
facts = facts.where(HamsterFact.start_time >= after) facts = facts.where(HamsterFact.start_time >= after)
if not show_missing: if not show_missing:
output_writer.writerow([ output_writer.writerow(
[
"Date", "Date",
"From", "From",
"To", "To",
@ -528,67 +566,71 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
"Tags", "Tags",
"HourlyRate", "HourlyRate",
"FixedRate", "FixedRate",
"InternalRate" "InternalRate",
]) ]
)
for fact in facts: for fact in facts:
k = f'{fact.activity.category.name}:{fact.activity.name}' k = f"{fact.activity.category.name}:{fact.activity.name}"
try: try:
mapping_ = mapping[k] mapping_ = mapping[k]
except KeyError: except KeyError:
if show_missing: if show_missing:
output_writer.writerow([fact.activity.category.name, fact.activity.name]) output_writer.writerow(
click.secho( [fact.activity.category.name, fact.activity.name]
"Can't find mapping for '{0}', skipping".format(k), fg='yellow') )
click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow")
continue continue
if show_missing: if show_missing:
continue continue
if fact.start_time is None or fact.end_time is None: if fact.start_time is None or fact.end_time is None:
click.secho(f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping", fg='yellow') click.secho(
f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping",
fg="yellow",
)
continue continue
if len(mapping_) < 5: if len(mapping_) < 5:
mapping_.append(None) mapping_.append(None)
date_start, date_end = ( date_start, date_end = (
datetime.strptime(fact.start_time.split('.')[0], '%Y-%m-%d %H:%M:%S'), datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
datetime.strptime(fact.end_time.split('.')[0], '%Y-%m-%d %H:%M:%S') datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
) )
duration = ( duration = (date_start - date_end).seconds / 3600
date_start - date_end
).seconds / 3600
output_writer.writerow([ output_writer.writerow(
date_start.strftime('%Y-%m-%d'), [
date_start.strftime('%H:%M'), date_start.strftime("%Y-%m-%d"),
'', # To (time) date_start.strftime("%H:%M"),
"", # To (time)
duration, duration,
'', # Rate "", # Rate
username, username,
mapping_[0], mapping_[0],
mapping_[1], mapping_[1],
mapping_[2], mapping_[2],
fact.description or mapping_[4] or '', fact.description or mapping_[4] or "",
'0', # Exported "0", # Exported
mapping_[3], mapping_[3],
'', # Hourly rate "", # Hourly rate
'', # Fixed rate "", # Fixed rate
]) ]
)
output_file.close() output_file.close()
@kimai.group('db') @kimai.group("db")
def db_(): def db_():
pass pass
@db_.command() @db_.command()
def init(): def init():
db.create_tables([KimaiCustomer, KimaiProject, KimaiActivity, db.create_tables([KimaiCustomer, KimaiProject, KimaiActivity, HamsterKimaiMapping])
HamsterKimaiMapping])
@db_.command() @db_.command()
@ -597,8 +639,14 @@ def reset():
@db_.command() @db_.command()
@click.option('-g', '--global', 'global_', help='Does this file contain mappings to global activties', is_flag=True) @click.option(
@click.option('--mapping-path', help='Mapping file') "-g",
"--global",
"global_",
help="Does this file contain mappings to global activties",
is_flag=True,
)
@click.option("--mapping-path", help="Mapping file")
def mapping2db(mapping_path=None, global_=False): def mapping2db(mapping_path=None, global_=False):
mapping_file = _get_kimai_mapping_file(mapping_path, None) mapping_file = _get_kimai_mapping_file(mapping_path, None)
next(mapping_file) next(mapping_file)
@ -607,17 +655,14 @@ def mapping2db(mapping_path=None, global_=False):
for row in mapping_reader: for row in mapping_reader:
hamster_category = HamsterCategory.get(name=row[0]) hamster_category = HamsterCategory.get(name=row[0])
hamster_activity = HamsterActivity.get(name=row[1], hamster_activity = HamsterActivity.get(
category_id=hamster_category.id) name=row[1], category_id=hamster_category.id
)
kimai_customer = KimaiCustomer.get(name=row[2]) kimai_customer = KimaiCustomer.get(name=row[2])
kimai_project = KimaiProject.get(name=row[3], kimai_project = KimaiProject.get(name=row[3], customer_id=kimai_customer.id)
customer_id=kimai_customer.id)
try: try:
kimai_activity = KimaiActivity.get( kimai_activity = KimaiActivity.get(name=row[4], project_id=kimai_project.id)
name=row[4],
project_id=kimai_project.id
)
except KimaiActivity.DoesNotExist: except KimaiActivity.DoesNotExist:
kimai_activity = KimaiActivity.get( kimai_activity = KimaiActivity.get(
name=row[4], name=row[4],
@ -629,20 +674,21 @@ def mapping2db(mapping_path=None, global_=False):
kimai_project=kimai_project, kimai_project=kimai_project,
kimai_activity=kimai_activity, kimai_activity=kimai_activity,
kimai_description=row[6], kimai_description=row[6],
kimai_tags=row[5] kimai_tags=row[5],
) )
@cli.command() @cli.command()
def app(): def app():
from .app import HamsterToolsApp from .app import HamsterToolsApp
app = HamsterToolsApp() app = HamsterToolsApp()
app.run() app.run()
@cli.command() @cli.command()
def hamster(): def hamster():
click.echo('🐹') click.echo("🐹")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -80,33 +80,45 @@ class ActivitiesScreen(ListScreen):
self.table.clear() self.table.clear()
facts_count_query = ( facts_count_query = (
HamsterFact HamsterFact.select(
.select(HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias('facts_count')) HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id) .group_by(HamsterFact.activity_id)
.alias('facts_count_query') .alias("facts_count_query")
) )
mappings_count_query = ( mappings_count_query = (
HamsterKimaiMapping HamsterKimaiMapping.select(
.select(HamsterKimaiMapping.hamster_activity_id, HamsterKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterKimaiMapping.id).alias('mappings_count')) fn.COUNT(HamsterKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterKimaiMapping.hamster_activity_id) .group_by(HamsterKimaiMapping.hamster_activity_id)
.alias('mappings_count_query') .alias("mappings_count_query")
) )
activities = ( activities = (
HamsterActivity.select( HamsterActivity.select(
HamsterActivity, HamsterActivity,
HamsterCategory.id, HamsterCategory.id,
fn.COALESCE(HamsterCategory.name, 'None').alias("category_name"), fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias('facts_count'), fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias('mappings_count') fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
) )
.join(HamsterCategory, JOIN.LEFT_OUTER) .join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity) .switch(HamsterActivity)
.join(facts_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == facts_count_query.c.activity_id)) .join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity) .switch(HamsterActivity)
.join(mappings_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id)) .join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity) .group_by(HamsterActivity)
) )
@ -172,9 +184,9 @@ class ActivitiesScreen(ListScreen):
move_to_activity = HamsterActivity.get( move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2)) self.table.get_cell_at(Coordinate(event.cursor_row, 2))
) )
HamsterFact.update({HamsterFact.activity: HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
move_to_activity}).where(HamsterFact.activity == HamsterFact.activity == self.move_from_activity
self.move_from_activity).execute() ).execute()
filter_input = self.query_one("#filter") filter_input = self.query_one("#filter")
self._refresh(filter_input.value) self._refresh(filter_input.value)
del self.move_from_activity del self.move_from_activity
@ -194,17 +206,14 @@ class CategoriesScreen(ListScreen):
categories = ( categories = (
HamsterCategory.select( HamsterCategory.select(
HamsterCategory, HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count")
fn.Count(HamsterActivity.id).alias("activities_count")
) )
.join(HamsterActivity, JOIN.LEFT_OUTER) .join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory) .group_by(HamsterCategory)
) )
if filter_query: if filter_query:
categories = categories.where( categories = categories.where(HamsterCategory.name.contains(filter_query))
HamsterCategory.name.contains(filter_query)
)
self.table.add_rows( self.table.add_rows(
[ [
@ -256,7 +265,7 @@ class KimaiScreen(ListScreen):
KimaiProject.select( KimaiProject.select(
KimaiProject, KimaiProject,
KimaiCustomer, KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count") fn.Count(KimaiActivity.id).alias("activities_count"),
) )
.join(KimaiCustomer, JOIN.LEFT_OUTER) .join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject) .switch(KimaiProject)
@ -278,7 +287,7 @@ class KimaiScreen(ListScreen):
project.customer.name, project.customer.name,
project.id, project.id,
project.name, project.name,
project.activities_count project.activities_count,
] ]
for project in projects for project in projects
] ]
@ -295,26 +304,37 @@ class KimaiScreen(ListScreen):
customers = KimaiAPICustomer.list(api) customers = KimaiAPICustomer.list(api)
with db.atomic(): with db.atomic():
KimaiCustomer.insert_many([{ KimaiCustomer.insert_many(
'id': customer.id, [{"id": customer.id, "name": customer.name} for customer in customers]
'name': customer.name ).execute()
} for customer in customers]).execute()
projects = KimaiAPIProject.list(api) projects = KimaiAPIProject.list(api)
with db.atomic(): with db.atomic():
KimaiProject.insert_many([{ KimaiProject.insert_many(
'id': project.id, [
'name': project.name, {
'customer_id': project.customer.id "id": project.id,
} for project in projects]).execute() "name": project.name,
"customer_id": project.customer.id,
}
for project in projects
]
).execute()
activities = KimaiAPIActivity.list(api) activities = KimaiAPIActivity.list(api)
with db.atomic(): with db.atomic():
KimaiActivity.insert_many([{ KimaiActivity.insert_many(
'id': activity.id, [
'name': activity.name, {
'project_id': (activity.project and activity.project.id or None) "id": activity.id,
} for activity in activities]).execute() "name": activity.name,
"project_id": (
activity.project and activity.project.id or None
),
}
for activity in activities
]
).execute()
self._refresh() self._refresh()

View File

@ -3,7 +3,7 @@ from peewee import SqliteDatabase, Model, CharField, ForeignKeyField, DateTimeFi
from textual.logging import TextualHandler from textual.logging import TextualHandler
logger = logging.getLogger('peewee') logger = logging.getLogger("peewee")
logger.addHandler(TextualHandler()) logger.addHandler(TextualHandler())
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -15,27 +15,27 @@ class HamsterCategory(Model):
class Meta: class Meta:
database = db database = db
table_name = 'categories' table_name = "categories"
class HamsterActivity(Model): class HamsterActivity(Model):
name = CharField() name = CharField()
category = ForeignKeyField(HamsterCategory, backref='activities') category = ForeignKeyField(HamsterCategory, backref="activities")
class Meta: class Meta:
database = db database = db
table_name = 'activities' table_name = "activities"
class HamsterFact(Model): class HamsterFact(Model):
activity = ForeignKeyField(HamsterActivity, backref='facts') activity = ForeignKeyField(HamsterActivity, backref="facts")
start_time = DateTimeField() start_time = DateTimeField()
end_time = DateTimeField(null=True) end_time = DateTimeField(null=True)
description = CharField() description = CharField()
class Meta: class Meta:
database = db database = db
table_name = 'facts' table_name = "facts"
class KimaiCustomer(Model): class KimaiCustomer(Model):
@ -43,35 +43,35 @@ class KimaiCustomer(Model):
class Meta: class Meta:
database = db database = db
table_name = 'kimai_customers' table_name = "kimai_customers"
class KimaiProject(Model): class KimaiProject(Model):
name = CharField() name = CharField()
customer = ForeignKeyField(KimaiCustomer, backref='projects') customer = ForeignKeyField(KimaiCustomer, backref="projects")
class Meta: class Meta:
database = db database = db
table_name = 'kimai_projects' table_name = "kimai_projects"
class KimaiActivity(Model): class KimaiActivity(Model):
name = CharField() name = CharField()
project = ForeignKeyField(KimaiProject, backref='activities', null=True) project = ForeignKeyField(KimaiProject, backref="activities", null=True)
class Meta: class Meta:
database = db database = db
table_name = 'kimai_activities' table_name = "kimai_activities"
class HamsterKimaiMapping(Model): class HamsterKimaiMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref='mappings') hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref='mappings') kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")
kimai_project = ForeignKeyField(KimaiProject, backref='mappings') kimai_project = ForeignKeyField(KimaiProject, backref="mappings")
kimai_activity = ForeignKeyField(KimaiActivity, backref='mappings') kimai_activity = ForeignKeyField(KimaiActivity, backref="mappings")
kimai_description = CharField() kimai_description = CharField()
kimai_tags = CharField() kimai_tags = CharField()
class Meta: class Meta:
database = db database = db
table_name = 'hamster_kimai_mappings' table_name = "hamster_kimai_mappings"