hamster-tools/hamstertools/__init__.py
2023-11-18 11:23:54 +00:00

818 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3.7
import csv
import logging
from datetime import datetime
from itertools import chain
from pathlib import Path
import sys
import click
import requests
from peewee import fn, JOIN
from textual.logging import TextualHandler
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiCustomer,
KimaiProject,
KimaiActivity,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
)
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync
HAMSTER_DIR = Path.home() / ".local/share/hamster"
HAMSTER_FILE = HAMSTER_DIR / "hamster.db"
db.init(HAMSTER_FILE)
@click.group()
@click.option("-d", "--debug", is_flag=True)
def cli(debug):
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
peewee_logger = logging.getLogger("peewee")
peewee_logger.addHandler(TextualHandler())
peewee_logger.setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
@cli.group()
def categories():
pass
@categories.command("list")
@click.option("--search", help="Search string")
def list_categories(search):
"""List / search categories"""
categories = HamsterCategory.select()
if search is not None:
categories = categories.where(HamsterCategory.name.contains(search))
for c in categories:
click.echo(f"@{c.id}: {c.name}")
@categories.command("delete")
@click.argument("ids", nargs=-1)
def delete_categories(ids):
"""Delete categories specified by IDS"""
click.secho("Deleting:", fg="red")
categories = (
HamsterCategory.select(
HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count")
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
.where(HamsterCategory.id.in_(ids))
)
for c in categories:
click.echo(f"@{c.id}: {c.name} ({c.activities_count} activities)")
click.confirm("Do you want to continue?", abort=True)
count = HamsterCategory.delete().where(HamsterCategory.id.in_(ids)).execute()
click.secho("Deleted {0} categories".format(count), fg="green")
@categories.command("rename")
@click.argument("id_", metavar="ID")
@click.argument("name")
def rename_category(id_, name):
"""Rename a category"""
category = HamsterCategory.get(id=id_)
click.echo(f'Renaming @{category.id}: {category.name} to "{name}"')
category.name = name
category.save()
@categories.command("activities")
@click.argument("ids", nargs=-1)
def list_category_activities(ids):
"""Show activities for categories specified by ids"""
activities = (
HamsterActivity.select(HamsterActivity, HamsterCategory.name)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(HamsterCategory.id.in_(ids))
)
for a in activities:
click.echo(f"@{a.id}: {a.category.name} » {a.name}")
@categories.command("tidy")
def tidy_categories():
"""Remove categories with no activities"""
subquery = (
HamsterCategory.select(
HamsterCategory, fn.COUNT(HamsterActivity.id).alias("activities_count")
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
.alias("subquery")
)
categories = (
HamsterCategory.select()
.join(subquery, on=(HamsterCategory.id == subquery.c.id))
.where(subquery.c.activities_count == 0)
)
click.echo("Found {0} empty categories:".format(categories.count()))
for cat in categories:
click.echo(f"@{cat.id}: {cat.name}")
click.confirm("Do you want to continue?", abort=True)
[cat.delete_instance() for cat in categories]
@cli.group()
def activities():
pass
@activities.command("list")
@click.option("--search", help="Search string")
@click.option("--csv/--no-csv", "csv_output", default=False, help="CSV output")
def list_activities(search, csv_output):
"""List / search activities"""
activities = (
HamsterActivity.select(HamsterActivity, HamsterCategory)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.order_by(HamsterCategory.name, HamsterActivity.name)
)
if search is not None:
activities = activities.where(HamsterActivity.name.contains(search))
if csv_output:
csv_writer = csv.writer(sys.stdout)
for a in activities:
category_name = a.category.name if a.category_id != -1 else ""
if csv_output:
csv_writer.writerow([a.category_id, category_name, a.id, a.name])
else:
click.echo(f"@{a.category_id}: {category_name} » {a.id}: {a.name}")
@activities.command("delete")
@click.argument("ids", nargs=-1)
def delete_activities(ids):
"""Delete activities specified by IDS"""
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.name,
fn.Count(HamsterFact.id).alias("facts_count"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterActivity)
.where(HamsterActivity.id.in_(ids))
)
click.secho("Deleting:", fg="red")
for a in activities:
category_name = a.category.name if a.category_id != -1 else ""
click.echo(f"@{a.id}: {category_name} » {a.name} ({a.facts_count} facts)")
click.confirm("Do you want to continue?", abort=True)
[a.delete_instance() for a in activities]
click.secho("Deleted {0} activities".format(len(ids)), fg="green")
@activities.command()
@click.argument("category_id")
@click.argument("ids", nargs=-1)
def move(category_id, ids):
"""Move activities to another category"""
category = HamsterCategory.get(id=category_id)
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
click.secho(f'Moving to "@{category.id}: {category.name}":', fg="green")
for a in activities:
category_name = a.category.name if a.category_id != -1 else ""
click.secho(f"@{a.category_id}: {category_name} » @{a.id}: {a.name}", fg="blue")
click.confirm("Do you want to continue?", abort=True)
for a in activities:
a.category = category
a.save()
click.secho("Moved {0} activities".format(len(ids)), fg="green")
@activities.command()
@click.argument("ids", nargs=-1)
def list_facts(ids):
"""Show facts for activities"""
activities = HamsterActivity.select().where(HamsterActivity.id.in_(ids))
for a in activities:
click.secho(f"@{a.id}: {a.name}", fg="green")
for f in a.facts:
click.secho(f"@{f.id}, {f.start_time}", fg="blue")
@activities.command()
@click.argument("from_id")
@click.argument("to_id")
def move_facts(from_id, to_id):
"""Move facts from one activity to another"""
from_activity = HamsterActivity.get(id=from_id)
to_activity = HamsterActivity.get(id=to_id)
from_category_name = (
from_activity.category.name if from_activity.category_id != -1 else ""
)
to_category_name = (
to_activity.category.name if to_activity.category_id != -1 else ""
)
click.secho(
f'Moving facts from "{from_category_name} » @{from_activity.id}: {from_activity.name}" to "@{to_category_name} » @{to_activity.id}: {to_activity.name}"',
fg="green",
)
for f in from_activity.facts:
click.secho(f"@{f.id}, {f.start_time}", fg="blue")
click.confirm("Do you want to continue?", abort=True)
count = (
HamsterFact.update(activity_id=to_activity.id)
.where(HamsterFact.activity == from_activity)
.execute()
)
click.secho("Moved {0} facts".format(count), fg="green")
click.confirm(
f'Would you like to delete "{from_category_name} » @{from_activity.id}: {from_activity.name}?',
abort=True,
)
from_activity.delete_instance()
@activities.command()
def find_duplicates():
"""Show activities which are not unique in their categories"""
non_unique_activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.group_by(HamsterActivity.category_id, HamsterActivity.name)
.having(fn.COUNT(HamsterActivity.id) > 1)
)
for activity in non_unique_activities:
click.secho(
f"@{activity.category_id}: {activity.category_name} » @{activity.id}: {activity.name}",
fg="blue",
)
@cli.group()
def kimai():
pass
def _get_kimai_mapping_file(path, category_search=None):
try:
return open(path)
except FileNotFoundError:
click.confirm("Mapping file {} not found, create it?:".format(path), abort=True)
mapping_file = open(path, "w")
mapping_writer = csv.writer(mapping_file)
mapping_writer.writerow(
[
"FROM category",
"FROM activity",
"TO Customer",
"TO Project",
"TO Activity",
"TO Tag",
"TO Note",
]
)
activities = HamsterActivity.select(HamsterActivity, HamsterCategory).join(
HamsterCategory, JOIN.LEFT_OUTER
)
for a in activities:
mapping_writer.writerow(
[a.category.name if a.category_id != -1 else "", a.name]
)
mapping_file.close()
return open(path)
@kimai.command()
@click.option(
"--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
multiple=True,
)
@click.argument("username")
@click.argument("api_key", envvar="KIMAI_API_KEY")
@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors")
@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities")
def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
"""
Check customer / project / activity data from Kimai
"""
kimai_api_url = "https://kimai.autonomic.zone/api"
if len(mapping_path) == 0:
mapping_path = (HAMSTER_DIR / "mapping.kimai.csv",)
mapping_files = []
for mapping_path_item in mapping_path:
if not Path(mapping_path_item).exists():
raise click.UsageError(f"{mapping_path_item} does not exist")
mapping_file = _get_kimai_mapping_file(mapping_path_item)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
next(mapping_reader)
mapping_data = [[row[2], row[3], row[4]] for row in mapping_reader]
mapping_file.close()
auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key}
customers = requests.get(
f"{kimai_api_url}/customers?visible=3", headers=auth_headers
).json()
projects = requests.get(
f"{kimai_api_url}/projects?visible=3", headers=auth_headers
).json()
activities = requests.get(
f"{kimai_api_url}/activities?visible=3", headers=auth_headers
).json()
found_customers = []
found_projects = []
found_activities = []
for row in mapping_data:
# Check if each mapping still exists in Kimai
matching_customers = list(filter(lambda x: x["name"] == row[0], customers))
if row[0] in found_customers:
just_errors or click.secho(
"Skipping existing customer '{0}'".format(row[0]), fg="green"
)
else:
if len(matching_customers) > 1:
click.secho(
"More than one match for customer '{0}'".format(row[0]), fg="red"
)
continue
elif len(matching_customers) < 1:
click.secho("Missing customer '{0}'".format(row[0]), fg="yellow")
continue
else:
just_errors or click.secho(
"Found customer '{0}'".format(row[0]), fg="green"
)
found_customers.append(row[0])
project_str = ":".join(row[0:2])
matching_projects = list(
filter(
lambda x: x["name"] == row[1]
and x["customer"] == matching_customers[0]["id"],
projects,
)
)
if project_str in found_projects:
just_errors or click.secho(
"Skipping existing project '{0}'".format(project_str), fg="green"
)
else:
if len(matching_projects) > 1:
click.secho(
"More than one match for project '{0}'".format(project_str),
fg="red",
)
continue
elif len(matching_projects) < 1:
click.secho("Missing project '{0}'".format(project_str), fg="yellow")
continue
else:
just_errors or click.secho(
"Found project '{0}'".format(project_str), fg="green"
)
found_projects.append(project_str)
if ignore_activities:
continue
activity_str = ":".join(row)
if activity_str in found_activities:
just_errors or click.secho(
"Skipping existing activity '{0}'".format(activity_str), fg="green"
)
else:
matching_activities = list(
filter(
lambda x: x["name"] == row[2]
and x["project"] == matching_projects[0]["id"],
activities,
)
)
if len(matching_activities) > 1:
click.secho(
"More than one match for activity '{0}'".format(activity_str),
fg="red",
)
elif len(matching_activities) < 1:
click.secho("Missing activity '{0}'".format(activity_str), fg="yellow")
else:
just_errors or click.secho(
"Found activity '{0}'".format(activity_str), fg="green"
)
found_activities.append(activity_str)
@kimai.command("csv")
@click.option(
"--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
multiple=True,
)
@click.option("--output", help="Output file (default kimai.csv)")
@click.option("--category-search", help="Category search string")
@click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username")
def _csv(
username,
mapping_path=None,
output=None,
category_search=None,
after=None,
show_missing=False,
):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
if output is None:
timestamp = datetime.now().strftime("%F")
output = f"kimai_{timestamp}.csv"
if type(mapping_path) == tuple:
mapping_files = []
for mapping_path_item in mapping_path:
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
else:
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
mapping = {
"{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
for row in mapping_reader
}
if type(mapping_path) == tuple:
for mapping_file in mapping_files:
mapping_file.close()
else:
mapping_file.close()
output_file = open(output, "w")
output_writer = csv.writer(output_file)
args = []
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity)
.join(HamsterCategory, JOIN.LEFT_OUTER)
)
sql = """
SELECT
facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name,
categories.name, categories.id
FROM
facts
LEFT JOIN
activities ON facts.activity_id = activities.id
LEFT JOIN
categories ON activities.category_id = categories.id """
if category_search is not None:
facts = facts.where(HamsterCategory.name.contains(category_search))
if after is not None:
facts = facts.where(HamsterFact.start_time >= after)
if not show_missing:
output_writer.writerow(
[
"Date",
"From",
"To",
"Duration",
"Rate",
"User",
"Customer",
"Project",
"Activity",
"Description",
"Exported",
"Tags",
"HourlyRate",
"FixedRate",
"InternalRate",
]
)
for fact in facts:
k = f"{fact.activity.category.name}:{fact.activity.name}"
try:
mapping_ = mapping[k]
except KeyError:
if show_missing:
output_writer.writerow(
[fact.activity.category.name, fact.activity.name]
)
click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow")
continue
if show_missing:
continue
if fact.start_time is None or fact.end_time is None:
click.secho(
f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping",
fg="yellow",
)
continue
if len(mapping_) < 5:
mapping_.append(None)
date_start, date_end = (
datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
)
duration = (date_start - date_end).seconds / 3600
output_writer.writerow(
[
date_start.strftime("%Y-%m-%d"),
date_start.strftime("%H:%M"),
"", # To (time)
duration,
"", # Rate
username,
mapping_[0],
mapping_[1],
mapping_[2],
fact.description or mapping_[4] or "",
"0", # Exported
mapping_[3],
"", # Hourly rate
"", # Fixed rate
]
)
output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity}",
fg="red",
)
has_errors = True
continue
if f.imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description
if f.description != ""
else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
)
r = t.upload().json()
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
@kimai.group("db")
def db_():
pass
@db_.command()
def init():
db.create_tables(
[
KimaiCustomer,
KimaiProject,
KimaiActivity,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
]
)
@db_.command()
def reset():
HamsterActivityKimaiMapping.delete().execute()
@db_.command("sync")
def kimai_db_sync():
sync()
@db_.command()
@click.option(
"-g",
"--global",
"global_",
help="Does this file contain mappings to global activties",
is_flag=True,
)
@click.option("--mapping-path", help="Mapping file")
def mapping2db(mapping_path=None, global_=False):
mapping_file = _get_kimai_mapping_file(mapping_path, None)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
for row in mapping_reader:
hamster_category = HamsterCategory.get(name=row[0])
hamster_activity = HamsterActivity.get(
name=row[1], category_id=hamster_category.id
)
kimai_customer = KimaiCustomer.get(name=row[2])
kimai_project = KimaiProject.get(name=row[3], customer_id=kimai_customer.id)
try:
kimai_activity = KimaiActivity.get(name=row[4], project_id=kimai_project.id)
except KimaiActivity.DoesNotExist:
kimai_activity = KimaiActivity.get(name=row[4], project_id=None)
HamsterActivityKimaiMapping.create(
hamster_activity=hamster_activity,
kimai_customer=kimai_customer,
kimai_project=kimai_project,
kimai_activity=kimai_activity,
kimai_description=row[6],
kimai_tags=row[5],
)
@cli.command()
def app():
from .app import HamsterToolsApp
app = HamsterToolsApp()
app.run()
@cli.command()
def hamster():
click.echo("🐹")
if __name__ == "__main__":
cli()