696 lines
20 KiB
Python
Executable File
696 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3.7
|
|
import csv
|
|
from datetime import datetime
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
import sys
|
|
|
|
import click
|
|
import requests
|
|
from peewee import fn, JOIN
|
|
|
|
from .db import (
|
|
db,
|
|
HamsterCategory,
|
|
HamsterActivity,
|
|
HamsterFact,
|
|
KimaiCustomer,
|
|
KimaiProject,
|
|
KimaiActivity,
|
|
HamsterKimaiMapping,
|
|
)
|
|
|
|
HAMSTER_DIR = Path.home() / ".local/share/hamster"
|
|
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
|
|
HAMSTER_FILE = "hamster-testing.db"
|
|
|
|
db.init(HAMSTER_FILE)
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
pass
|
|
|
|
|
|
@cli.group()
|
|
def categories():
|
|
pass
|
|
|
|
|
|
@categories.command("list")
|
|
@click.option("--search", help="Search string")
|
|
def list_categories(search):
|
|
"""List / search categories"""
|
|
categories = HamsterCategory.select()
|
|
|
|
if search is not None:
|
|
categories = categories.where(HamsterCategory.name.contains(search))
|
|
|
|
for c in categories:
|
|
click.echo(f"@{c.id}: {c.name}")
|
|
|
|
|
|
@categories.command("delete")
|
|
@click.argument("ids", nargs=-1)
|
|
def delete_categories(ids):
|
|
"""Delete categories specified by IDS"""
|
|
click.secho("Deleting:", fg="red")
|
|
|
|
categories = (
|
|
HamsterCategory.select(
|
|
HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count")
|
|
)
|
|
.join(HamsterActivity, JOIN.LEFT_OUTER)
|
|
.group_by(HamsterCategory)
|
|
.where(HamsterCategory.id.in_(ids))
|
|
)
|
|
|
|
for c in categories:
|
|
click.echo(f"@{c.id}: {c.name} ({c.activities_count} activities)")
|
|
|
|
click.confirm("Do you want to continue?", abort=True)
|
|
|
|
count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute()
|
|
|
|
click.secho("Deleted {0} categories".format(count), fg="green")
|
|
|
|
|
|
@categories.command("rename")
|
|
@click.argument("id_", metavar="ID")
|
|
@click.argument("name")
|
|
def rename_category(id_, name):
|
|
"""Rename a category"""
|
|
|
|
category = HamsterCategory.get(id=id_)
|
|
|
|
click.echo(f'Renaming @{category.id}: {category.name} to "{name}"')
|
|
|
|
category.name = name
|
|
category.save()
|
|
|
|
|
|
@categories.command("activities")
|
|
@click.argument("ids", nargs=-1)
|
|
def list_category_activities(ids):
|
|
"""Show activities for categories specified by ids"""
|
|
|
|
activities = (
|
|
HamsterActivity.select(HamsterActivity, HamsterCategory.name)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.where(HamsterCategory.id.in_(ids))
|
|
)
|
|
|
|
for a in activities:
|
|
click.echo(f"@{a.id}: {a.category.name} » {a.name}")
|
|
|
|
|
|
@categories.command("tidy")
|
|
def tidy_categories():
|
|
"""Remove categories with no activities"""
|
|
|
|
subquery = (
|
|
HamsterCategory.select(
|
|
HamsterCategory, fn.COUNT(HamsterActivity.id).alias("activities_count")
|
|
)
|
|
.join(HamsterActivity, JOIN.LEFT_OUTER)
|
|
.group_by(HamsterCategory)
|
|
.alias("subquery")
|
|
)
|
|
|
|
categories = (
|
|
HamsterCategory.select()
|
|
.join(subquery, on=(HamsterCategory.id == subquery.c.id))
|
|
.where(subquery.c.activities_count == 0)
|
|
)
|
|
|
|
click.echo("Found {0} empty categories:".format(categories.count()))
|
|
|
|
for cat in categories:
|
|
click.echo(f"@{cat.id}: {cat.name}")
|
|
|
|
click.confirm("Do you want to continue?", abort=True)
|
|
|
|
[cat.delete_instance() for cat in categories]
|
|
|
|
|
|
@cli.group()
|
|
def activities():
|
|
pass
|
|
|
|
|
|
@activities.command("list")
|
|
@click.option("--search", help="Search string")
|
|
@click.option("--csv/--no-csv", "csv_output", default=False, help="CSV output")
|
|
def list_activities(search, csv_output):
|
|
"""List / search activities"""
|
|
|
|
activities = (
|
|
HamsterActivity.select(HamsterActivity, HamsterCategory)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.order_by(HamsterCategory.name, HamsterActivity.name)
|
|
)
|
|
|
|
if search is not None:
|
|
activities = activities.where(HamsterActivity.name.contains(search))
|
|
|
|
if csv_output:
|
|
csv_writer = csv.writer(sys.stdout)
|
|
|
|
for a in activities:
|
|
category_name = a.category.name if a.category_id != -1 else ""
|
|
if csv_output:
|
|
csv_writer.writerow([a.category_id, category_name, a.id, a.name])
|
|
else:
|
|
click.echo(f"@{a.category_id}: {category_name} » {a.id}: {a.name}")
|
|
|
|
|
|
@activities.command("delete")
|
|
@click.argument("ids", nargs=-1)
|
|
def delete_activities(ids):
|
|
"""Delete activities specified by IDS"""
|
|
|
|
activities = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
HamsterCategory.name,
|
|
fn.Count(HamsterFact.id).alias("facts_count"),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.switch(HamsterActivity)
|
|
.join(HamsterFact, JOIN.LEFT_OUTER)
|
|
.group_by(HamsterActivity)
|
|
.where(HamsterActivity.id.in_(ids))
|
|
)
|
|
|
|
click.secho("Deleting:", fg="red")
|
|
|
|
for a in activities:
|
|
category_name = a.category.name if a.category_id != -1 else ""
|
|
click.echo(f"@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)")
|
|
|
|
click.confirm("Do you want to continue?", abort=True)
|
|
|
|
[a.delete_instance() for a in activities]
|
|
|
|
click.secho("Deleted {0} activities".format(len(ids)), fg="green")
|
|
|
|
|
|
@activities.command()
|
|
@click.argument("category_id")
|
|
@click.argument("ids", nargs=-1)
|
|
def move(category_id, ids):
|
|
"""Move activities to another category"""
|
|
category = HamsterCategory.get(id=category_id)
|
|
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
|
|
|
|
click.secho(f'Moving to "@{category.id}: {category.name}":', fg="green")
|
|
|
|
for a in activities:
|
|
category_name = a.category.name if a.category_id != -1 else ""
|
|
click.secho(f"@{a.category_id}: {category_name} » @{a.id}: {a.name}", fg="blue")
|
|
|
|
click.confirm("Do you want to continue?", abort=True)
|
|
|
|
for a in activities:
|
|
a.category = category
|
|
a.save()
|
|
|
|
click.secho("Moved {0} activities".format(len(ids)), fg="green")
|
|
|
|
|
|
@activities.command()
|
|
@click.argument("ids", nargs=-1)
|
|
def list_facts(ids):
|
|
"""Show facts for activities"""
|
|
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
|
|
|
|
for a in activities:
|
|
click.secho(f"@{a.id}: {a.name}", fg="green")
|
|
|
|
for f in a.facts:
|
|
click.secho(f"@{f.id}, {f.start_time}", fg="blue")
|
|
|
|
|
|
@activities.command()
|
|
@click.argument("from_id")
|
|
@click.argument("to_id")
|
|
def move_facts(from_id, to_id):
|
|
"""Move facts from one activity to another"""
|
|
from_activity = HamsterActivity.get(id=from_id)
|
|
to_activity = HamsterActivity.get(id=to_id)
|
|
|
|
from_category_name = (
|
|
from_activity.category.name if from_activity.category_id != -1 else ""
|
|
)
|
|
to_category_name = (
|
|
to_activity.category.name if to_activity.category_id != -1 else ""
|
|
)
|
|
|
|
click.secho(
|
|
f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"',
|
|
fg="green",
|
|
)
|
|
|
|
for f in from_activity.facts:
|
|
click.secho(f"@{f.id}, {f.start_time}", fg="blue")
|
|
|
|
click.confirm("Do you want to continue?", abort=True)
|
|
|
|
count = (
|
|
HamsterFact.update(activity_id=to_activity.id)
|
|
.where(HamsterFact.activity == from_activity)
|
|
.execute()
|
|
)
|
|
|
|
click.secho("Moved {0} facts".format(count), fg="green")
|
|
|
|
click.confirm(
|
|
f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?',
|
|
abort=True,
|
|
)
|
|
|
|
from_activity.delete_instance()
|
|
|
|
|
|
@activities.command()
|
|
def find_duplicates():
|
|
"""Show activities which are not unique in their categories"""
|
|
|
|
non_unique_activities = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
HamsterCategory.id,
|
|
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.group_by(HamsterActivity.category_id, HamsterActivity.name)
|
|
.having(fn.COUNT(HamsterActivity.id) > 1)
|
|
)
|
|
|
|
for activity in non_unique_activities:
|
|
click.secho(
|
|
f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}",
|
|
fg="blue",
|
|
)
|
|
|
|
|
|
@cli.group()
|
|
def kimai():
|
|
pass
|
|
|
|
|
|
def _get_kimai_mapping_file(path, category_search=None):
|
|
try:
|
|
return open(path)
|
|
except FileNotFoundError:
|
|
click.confirm("Mapping file {} not found, create it?:".format(path), abort=True)
|
|
mapping_file = open(path, "w")
|
|
mapping_writer = csv.writer(mapping_file)
|
|
|
|
mapping_writer.writerow(
|
|
[
|
|
"FROM category",
|
|
"FROM activity",
|
|
"TO Customer",
|
|
"TO Project",
|
|
"TO Activity",
|
|
"TO Tag",
|
|
"TO Note",
|
|
]
|
|
)
|
|
|
|
activities = HamsterActivity.select(HamsterActivity, HamsterCategory).join(
|
|
HamsterCategory, JOIN.LEFT_OUTER
|
|
)
|
|
|
|
for a in activities:
|
|
mapping_writer.writerow(
|
|
[a.category.name if a.category_id != -1 else "", a.name]
|
|
)
|
|
|
|
mapping_file.close()
|
|
return open(path)
|
|
|
|
|
|
@kimai.command()
|
|
@click.option(
|
|
"--mapping-path",
|
|
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
|
|
multiple=True,
|
|
)
|
|
@click.argument("username")
|
|
@click.argument("api_key")
|
|
@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors")
|
|
@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities")
|
|
def sync(username, api_key, just_errors, ignore_activities, mapping_path=None):
|
|
"""
|
|
Download customer / project / activity data from Kimai
|
|
"""
|
|
|
|
kimai_api_url = "https://kimai.autonomic.zone/api"
|
|
|
|
if type(mapping_path) == tuple:
|
|
mapping_files = []
|
|
for mapping_path_item in mapping_path:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path_item)
|
|
next(mapping_file)
|
|
mapping_files.append(mapping_file)
|
|
mapping_reader = csv.reader(chain(*mapping_files))
|
|
else:
|
|
if mapping_path is None:
|
|
mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
|
|
mapping_file = _get_kimai_mapping_file(mapping_path)
|
|
mapping_reader = csv.reader(mapping_file)
|
|
|
|
next(mapping_reader)
|
|
|
|
mapping_data = [[row[2], row[3], row[4]] for row in mapping_reader]
|
|
|
|
mapping_file.close()
|
|
|
|
auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key}
|
|
|
|
customers = requests.get(
|
|
f"{kimai_api_url}/customers?visible=3", headers=auth_headers
|
|
).json()
|
|
projects = requests.get(
|
|
f"{kimai_api_url}/projects?visible=3", headers=auth_headers
|
|
).json()
|
|
activities = requests.get(
|
|
f"{kimai_api_url}/activities?visible=3", headers=auth_headers
|
|
).json()
|
|
|
|
found_customers = []
|
|
found_projects = []
|
|
found_activities = []
|
|
|
|
for row in mapping_data:
|
|
# Check if each mapping still exists in Kimai
|
|
|
|
matching_customers = list(filter(lambda x: x["name"] == row[0], customers))
|
|
if row[0] in found_customers:
|
|
just_errors or click.secho(
|
|
"Skipping existing customer '{0}'".format(row[0]), fg="green"
|
|
)
|
|
else:
|
|
if len(matching_customers) > 1:
|
|
click.secho(
|
|
"More than one match for customer '{0}'".format(row[0]), fg="red"
|
|
)
|
|
continue
|
|
elif len(matching_customers) < 1:
|
|
click.secho("Missing customer '{0}'".format(row[0]), fg="yellow")
|
|
continue
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found customer '{0}'".format(row[0]), fg="green"
|
|
)
|
|
found_customers.append(row[0])
|
|
|
|
project_str = ":".join(row[0:2])
|
|
matching_projects = list(
|
|
filter(
|
|
lambda x: x["name"] == row[1]
|
|
and x["customer"] == matching_customers[0]["id"],
|
|
projects,
|
|
)
|
|
)
|
|
|
|
if project_str in found_projects:
|
|
just_errors or click.secho(
|
|
"Skipping existing project '{0}'".format(project_str), fg="green"
|
|
)
|
|
else:
|
|
if len(matching_projects) > 1:
|
|
click.secho(
|
|
"More than one match for project '{0}'".format(project_str),
|
|
fg="red",
|
|
)
|
|
continue
|
|
elif len(matching_projects) < 1:
|
|
click.secho("Missing project '{0}'".format(project_str), fg="yellow")
|
|
continue
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found project '{0}'".format(project_str), fg="green"
|
|
)
|
|
found_projects.append(project_str)
|
|
|
|
if ignore_activities:
|
|
continue
|
|
|
|
activity_str = ":".join(row)
|
|
if activity_str in found_activities:
|
|
just_errors or click.secho(
|
|
"Skipping existing activity '{0}'".format(activity_str), fg="green"
|
|
)
|
|
else:
|
|
matching_activities = list(
|
|
filter(
|
|
lambda x: x["name"] == row[2]
|
|
and x["project"] == matching_projects[0]["id"],
|
|
activities,
|
|
)
|
|
)
|
|
|
|
if len(matching_activities) > 1:
|
|
click.secho(
|
|
"More than one match for activity '{0}'".format(activity_str),
|
|
fg="red",
|
|
)
|
|
elif len(matching_activities) < 1:
|
|
click.secho("Missing activity '{0}'".format(activity_str), fg="yellow")
|
|
else:
|
|
just_errors or click.secho(
|
|
"Found activity '{0}'".format(activity_str), fg="green"
|
|
)
|
|
found_activities.append(activity_str)
|
|
|
|
|
|
@kimai.command("import")
|
|
@click.option(
|
|
"--mapping-path",
|
|
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
|
|
multiple=True,
|
|
)
|
|
@click.option("--output", help="Output file (default kimai.csv)")
|
|
@click.option("--category-search", help="Category search string")
|
|
@click.option("--after", help="Only show time entries after this date")
|
|
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
|
|
@click.argument("username")
|
|
def _import(
|
|
username,
|
|
mapping_path=None,
|
|
output=None,
|
|
category_search=None,
|
|
after=None,
|
|
show_missing=False,
|
|
):
|
|
"""
|
|
Export time tracking data in Kimai format
|
|
"""
|
|
|
|
if mapping_path is None:
|
|
mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
|
|
|
|
if output is None:
|
|
timestamp = datetime.now().strftime("%F")
|
|
output = f"kimai_{timestamp}.csv"
|
|
|
|
if type(mapping_path) == tuple:
|
|
mapping_files = []
|
|
for mapping_path_item in mapping_path:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
|
|
next(mapping_file)
|
|
mapping_files.append(mapping_file)
|
|
mapping_reader = csv.reader(chain(*mapping_files))
|
|
else:
|
|
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
|
|
next(mapping_file)
|
|
mapping_reader = csv.reader(mapping_file)
|
|
|
|
mapping = {
|
|
"{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
|
|
for row in mapping_reader
|
|
}
|
|
|
|
if type(mapping_path) == tuple:
|
|
for mapping_file in mapping_files:
|
|
mapping_file.close()
|
|
else:
|
|
mapping_file.close()
|
|
|
|
output_file = open(output, "w")
|
|
output_writer = csv.writer(output_file)
|
|
|
|
args = []
|
|
|
|
facts = (
|
|
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
|
|
.join(HamsterActivity)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
)
|
|
|
|
sql = """
|
|
SELECT
|
|
facts.id, facts.start_time, facts.end_time, facts.description,
|
|
activities.id, activities.name,
|
|
categories.name, categories.id
|
|
FROM
|
|
facts
|
|
LEFT JOIN
|
|
activities ON facts.activity_id = activities.id
|
|
LEFT JOIN
|
|
categories ON activities.category_id = categories.id """
|
|
|
|
if category_search is not None:
|
|
facts = facts.where(HamsterCategory.name.contains(category_search))
|
|
|
|
if after is not None:
|
|
facts = facts.where(HamsterFact.start_time >= after)
|
|
|
|
if not show_missing:
|
|
output_writer.writerow(
|
|
[
|
|
"Date",
|
|
"From",
|
|
"To",
|
|
"Duration",
|
|
"Rate",
|
|
"User",
|
|
"Customer",
|
|
"Project",
|
|
"Activity",
|
|
"Description",
|
|
"Exported",
|
|
"Tags",
|
|
"HourlyRate",
|
|
"FixedRate",
|
|
"InternalRate",
|
|
]
|
|
)
|
|
|
|
for fact in facts:
|
|
k = f"{fact.activity.category.name}:{fact.activity.name}"
|
|
try:
|
|
mapping_ = mapping[k]
|
|
except KeyError:
|
|
if show_missing:
|
|
output_writer.writerow(
|
|
[fact.activity.category.name, fact.activity.name]
|
|
)
|
|
click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow")
|
|
continue
|
|
|
|
if show_missing:
|
|
continue
|
|
|
|
if fact.start_time is None or fact.end_time is None:
|
|
click.secho(
|
|
f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping",
|
|
fg="yellow",
|
|
)
|
|
continue
|
|
|
|
if len(mapping_) < 5:
|
|
mapping_.append(None)
|
|
|
|
date_start, date_end = (
|
|
datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
|
|
datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
|
|
)
|
|
duration = (date_start - date_end).seconds / 3600
|
|
|
|
output_writer.writerow(
|
|
[
|
|
date_start.strftime("%Y-%m-%d"),
|
|
date_start.strftime("%H:%M"),
|
|
"", # To (time)
|
|
duration,
|
|
"", # Rate
|
|
username,
|
|
mapping_[0],
|
|
mapping_[1],
|
|
mapping_[2],
|
|
fact.description or mapping_[4] or "",
|
|
"0", # Exported
|
|
mapping_[3],
|
|
"", # Hourly rate
|
|
"", # Fixed rate
|
|
]
|
|
)
|
|
|
|
output_file.close()
|
|
|
|
|
|
@kimai.group("db")
|
|
def db_():
|
|
pass
|
|
|
|
|
|
@db_.command()
|
|
def init():
|
|
db.create_tables([KimaiCustomer, KimaiProject, KimaiActivity, HamsterKimaiMapping])
|
|
|
|
|
|
@db_.command()
|
|
def reset():
|
|
HamsterKimaiMapping.delete().execute()
|
|
|
|
|
|
@db_.command()
|
|
@click.option(
|
|
"-g",
|
|
"--global",
|
|
"global_",
|
|
help="Does this file contain mappings to global activties",
|
|
is_flag=True,
|
|
)
|
|
@click.option("--mapping-path", help="Mapping file")
|
|
def mapping2db(mapping_path=None, global_=False):
|
|
mapping_file = _get_kimai_mapping_file(mapping_path, None)
|
|
next(mapping_file)
|
|
mapping_reader = csv.reader(mapping_file)
|
|
|
|
for row in mapping_reader:
|
|
|
|
hamster_category = HamsterCategory.get(name=row[0])
|
|
hamster_activity = HamsterActivity.get(
|
|
name=row[1], category_id=hamster_category.id
|
|
)
|
|
kimai_customer = KimaiCustomer.get(name=row[2])
|
|
kimai_project = KimaiProject.get(name=row[3], customer_id=kimai_customer.id)
|
|
|
|
try:
|
|
kimai_activity = KimaiActivity.get(name=row[4], project_id=kimai_project.id)
|
|
except KimaiActivity.DoesNotExist:
|
|
kimai_activity = KimaiActivity.get(
|
|
name=row[4],
|
|
)
|
|
|
|
HamsterKimaiMapping.create(
|
|
hamster_activity=hamster_activity,
|
|
kimai_customer=kimai_customer,
|
|
kimai_project=kimai_project,
|
|
kimai_activity=kimai_activity,
|
|
kimai_description=row[6],
|
|
kimai_tags=row[5],
|
|
)
|
|
|
|
|
|
@cli.command()
|
|
def app():
|
|
from .app import HamsterToolsApp
|
|
|
|
app = HamsterToolsApp()
|
|
app.run()
|
|
|
|
|
|
@cli.command()
|
|
def hamster():
|
|
click.echo("🐹")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|