Compare commits

...

5 Commits

Author SHA1 Message Date
3wc
90d74381df Add requirements 2025-08-24 23:11:53 -04:00
3wc
613d361c00 Handle in-progress fact import 2025-08-24 23:03:45 -04:00
3wc
a3ccc42a6e Add Clockify import, 4matting 2025-08-07 15:05:07 +01:00
3wc
68998be917 Remove ancient kimai csv command, add Clockify mapping2db 2025-08-07 14:28:12 +01:00
3wc
fb49a24ae1 Preliminary clockify support 2025-08-07 12:03:02 +01:00
11 changed files with 405 additions and 204 deletions

View File

@ -9,12 +9,15 @@ import sys
import tomllib
import click
from clockify_sdk import Clockify
import requests
from peewee import fn, JOIN
from textual.logging import TextualHandler
from xdg.BaseDirectory import xdg_config_home
from hamstertools.clockify import export_fact
from .db import (
KimaiTag,
@ -28,6 +31,9 @@ from .db import (
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
HamsterFactTag,
ClockifyProject,
HamsterClockifyMapping,
HamsterFactClockifyImport,
)
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync
@ -43,11 +49,8 @@ db.init(HAMSTER_FILE)
@click.group(context_settings={"auto_envvar_prefix": "HAMSTERTOOL"})
@click.option("-d", "--debug", is_flag=True)
@click.option("--config", default=CONFIG_FILE, type=click.Path())
@click.option("--kimai-api-url", envvar="KIMAI_API_URL")
@click.option("--kimai-username", envvar="KIMAI_USERNAME")
@click.option("--kimai-api-key", envvar="KIMAI_API_KEY")
@click.pass_context
def cli(ctx, config, debug, kimai_api_url=None, kimai_username=None, kimai_api_key=None):
def cli(ctx, config, debug):
file_config = {}
if os.path.exists(config):
with open(config, "rb") as f:
@ -64,11 +67,8 @@ def cli(ctx, config, debug, kimai_api_url=None, kimai_username=None, kimai_api_k
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
ctx.obj = KimaiAPI(
kimai_username if kimai_username is not None else file_config.get("kimai_username"),
kimai_api_key if kimai_api_key is not None else file_config.get("kimai_api_key"),
kimai_api_url if kimai_api_url is not None else file_config.get("kimai_api_url"),
)
ctx.ensure_object(dict)
ctx.obj["config"] = file_config
@cli.group()
@ -334,7 +334,23 @@ def find_duplicates():
@cli.group()
def kimai():
@click.option("--kimai-api-url", envvar="KIMAI_API_URL")
@click.option("--kimai-username", envvar="KIMAI_USERNAME")
@click.option("--kimai-api-key", envvar="KIMAI_API_KEY")
@click.pass_context
def kimai(ctx, kimai_api_url, kimai_username, kimai_api_key):
file_config = ctx.obj["config"]
ctx.obj["kimai"] = KimaiAPI(
kimai_username
if kimai_username is not None
else file_config.get("kimai_username"),
kimai_api_key
if kimai_api_key is not None
else file_config.get("kimai_api_key"),
kimai_api_url
if kimai_api_url is not None
else file_config.get("kimai_api_url"),
)
pass
@ -505,168 +521,13 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
found_activities.append(activity_str)
@kimai.command("csv")
@click.option(
"--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
multiple=True,
)
@click.option("--output", help="Output file (default kimai.csv)")
@click.option("--category-search", help="Category search string")
@click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username")
def _csv(
username,
mapping_path=None,
output=None,
category_search=None,
after=None,
show_missing=False,
):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
if output is None:
timestamp = datetime.now().strftime("%F")
output = f"kimai_{timestamp}.csv"
if type(mapping_path) == tuple:
mapping_files = []
for mapping_path_item in mapping_path:
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
else:
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
mapping = {
"{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
for row in mapping_reader
}
if type(mapping_path) == tuple:
for mapping_file in mapping_files:
mapping_file.close()
else:
mapping_file.close()
output_file = open(output, "w")
output_writer = csv.writer(output_file)
args = []
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity)
.join(HamsterCategory, JOIN.LEFT_OUTER)
)
sql = """
SELECT
facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name,
categories.name, categories.id
FROM
facts
LEFT JOIN
activities ON facts.activity_id = activities.id
LEFT JOIN
categories ON activities.category_id = categories.id """
if category_search is not None:
facts = facts.where(HamsterCategory.name.contains(category_search))
if after is not None:
facts = facts.where(HamsterFact.start_time >= after)
if not show_missing:
output_writer.writerow(
[
"Date",
"From",
"To",
"Duration",
"Rate",
"User",
"Customer",
"Project",
"Activity",
"Description",
"Exported",
"Tags",
"HourlyRate",
"FixedRate",
"InternalRate",
]
)
for fact in facts:
k = f"{fact.activity.category.name}:{fact.activity.name}"
try:
mapping_ = mapping[k]
except KeyError:
if show_missing:
output_writer.writerow(
[fact.activity.category.name, fact.activity.name]
)
click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow")
continue
if show_missing:
continue
if fact.start_time is None or fact.end_time is None:
click.secho(
f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping",
fg="yellow",
)
continue
if len(mapping_) < 5:
mapping_.append(None)
date_start, date_end = (
datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
)
duration = (date_start - date_end).seconds / 3600
output_writer.writerow(
[
date_start.strftime("%Y-%m-%d"),
date_start.strftime("%H:%M"),
"", # To (time)
duration,
"", # Rate
username,
mapping_[0],
mapping_[1],
mapping_[2],
fact.description or mapping_[4] or "",
"0", # Exported
mapping_[3],
"", # Hourly rate
"", # Fixed rate
]
)
output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI(username=KIMAI_USERNAME, api_key=KIMAI_API_KEY)
@click.pass_context
def kimai_import(ctx, search, after, before):
api = ctx.obj
SEARCH = "auto"
@ -705,7 +566,8 @@ def _import(search, after, before):
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red",
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})",
fg="red",
)
has_errors = True
continue
@ -744,11 +606,17 @@ def _import(search, after, before):
description=f.description
if f.description != ""
else mapping.kimai_description,
tags=",".join([t.tag.name for t in f.tags]) if len(f.tags) > 0 else mapping.kimai_tags
tags=",".join([t.tag.name for t in f.tags])
if len(f.tags) > 0
else mapping.kimai_tags,
)
r = t.upload().json()
if len(f.tags) > 0 or mapping.kimai_tags:
print(",".join([t.tag.name for t in f.tags]) if len(f.tags)> 0 else mapping.kimai_tags)
print(
",".join([t.tag.name for t in f.tags])
if len(f.tags) > 0
else mapping.kimai_tags
)
print(r["tags"])
if r.get("code", 200) != 200:
print(r)
@ -756,15 +624,15 @@ def _import(search, after, before):
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
print(f"Created Kimai timesheet {r['id']}")
@kimai.group("db")
def db_():
def kimai_db():
pass
@db_.command()
@kimai_db.command()
def init():
db.create_tables(
[
@ -778,20 +646,18 @@ def init():
)
@db_.command()
@kimai_db.command()
def reset():
HamsterActivityKimaiMapping.delete().execute()
@db_.command("sync")
@click.pass_obj
def kimai_db_sync(api):
sync(
api
)
@kimai_db.command("sync")
@click.pass_context
def kimai_db_sync(ctx):
sync(ctx.obj["kimai"])
@db_.command()
@kimai_db.command("mapping2db")
@click.option(
"-g",
"--global",
@ -800,7 +666,7 @@ def kimai_db_sync(api):
is_flag=True,
)
@click.option("--mapping-path", help="Mapping file")
def mapping2db(mapping_path=None, global_=False):
def kimai_mapping2db(mapping_path=None, global_=False):
mapping_file = _get_kimai_mapping_file(mapping_path, None)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
@ -828,14 +694,177 @@ def mapping2db(mapping_path=None, global_=False):
)
@kimai.command("app")
@click.pass_context
def kimai_app(ctx):
from .app import HamsterToolsAppKimai
app = HamsterToolsAppKimai(ctx.obj["kimai"])
app.run()
@cli.group()
@click.option("--api-key", envvar="CLOCKIFY_API_KEY", help="Clockify API key")
@click.option(
"--workspace-id", envvar="CLOCKIFY_WORKSPACE_ID", help="Clockify workspace ID"
)
@click.pass_context
def clockify(ctx, api_key, workspace_id):
file_config = ctx.obj["config"]
ctx.obj["clockify"] = Clockify(
api_key=api_key if api_key is not None else file_config.get("clockify_api_key"),
workspace_id=workspace_id
if workspace_id is not None
else file_config.get("clockify_workspace_id"),
)
@clockify.group("db")
def clockify_db():
pass
@clockify_db.command("sync")
@click.pass_context
def clockify_db_sync(ctx):
from .clockify import sync_projects
count = sync_projects(ctx.obj["clockify"], db)
click.echo(f"Synced {count} Clockify projects")
@clockify_db.command("init")
def clockify_db_init():
db.create_tables(
[
ClockifyProject,
HamsterClockifyMapping,
HamsterFactClockifyImport,
]
)
@clockify_db.command("reset")
def clockify_db_reset():
HamsterClockifyMapping.delete().execute()
@clockify_db.command("mapping2db")
@click.argument("mapping-path")
def clockify_mapping2db(mapping_path):
mapping_file = open(mapping_path, "r")
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
for row in mapping_reader:
hamster_category = HamsterCategory.get(name=row[0])
hamster_activity = HamsterActivity.get(
name=row[1], category_id=hamster_category.id
)
clockify_project = ClockifyProject.get(clockify_id=row[2])
HamsterClockifyMapping.create(
hamster_activity=hamster_activity,
clockify_project=clockify_project.clockify_id,
clockify_description=row[3],
)
@clockify.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
@click.pass_context
def clockify_import(ctx, search, after, before):
api = ctx.obj['clockify']
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterFact)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(search)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.clockify_mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if f.clockify_imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.clockify_imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.clockify_mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.clockify_imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.end_time is None:
print(
f"still in progress, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = export_fact(
api,
mapping.clockify_project_id,
f.start_time,
f.end_time,
f.description if f.description is not None else mapping.clockify_description,
)
HamsterFactClockifyImport.create(hamster_fact=f, clockify_time_entry_id=t).save()
print(f"Created Clockify timesheet {t}")
@clockify.command("app")
@click.pass_context
def clockify_app(ctx):
from .app import HamsterToolsAppClockify
app = HamsterToolsAppClockify(ctx.obj["clockify"])
app.run()
@cli.command()
@click.pass_obj
def app(kimai_api):
def app():
from .app import HamsterToolsApp
app = HamsterToolsApp(
kimai_api
)
app = HamsterToolsApp()
app.run()

View File

@ -1,10 +1,12 @@
from textual.app import App
from hamstertools import clockify
from .db import db
from .kimaiapi import KimaiAPI
from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
from .screens.clockify.projects import ClockifyProjectScreen
class HamsterToolsApp(App):
@ -12,18 +14,19 @@ class HamsterToolsApp(App):
BINDINGS = [
("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
def __init__(self, kimai_api=None):
def __init__(self, kimai_api=None, clockify_api=None):
self.add_mode("hamster", HamsterScreen())
self.add_mode("kimai", KimaiScreen())
# self.mode MODES = {
# "hamster": HamsterScreen(),
# "kimai": KimaiScreen(),
# }
self.api = kimai_api
if kimai_api is not None:
self.kimai_api = kimai_api
self.add_mode("kimai", KimaiScreen())
if clockify_api is not None:
self.clockify_api = clockify_api
self.add_mode("clockify", ClockifyProjectScreen())
super().__init__()
@ -33,3 +36,26 @@ class HamsterToolsApp(App):
async def action_quit(self) -> None:
db.close()
self.exit()
class HamsterToolsAppKimai(HamsterToolsApp):
BINDINGS = HamsterToolsApp.BINDINGS + [
("k", "switch_mode('kimai')", "Kimai"),
]
def __init__(self, kimai_api):
self.kimai_api = kimai_api
self.add_mode("kimai", KimaiScreen())
super().__init__()
class HamsterToolsAppClockify(HamsterToolsApp):
BINDINGS = HamsterToolsApp.BINDINGS + [
("c", "switch_mode('clockify_projects')", "Clockify"),
]
def __init__(self, clockify_api):
self.clockify_api = clockify_api
self.add_mode("clockify_projects", ClockifyProjectScreen())
super().__init__()

62
hamstertools/clockify.py Normal file
View File

@ -0,0 +1,62 @@
from dataclasses import dataclass, field
from datetime import datetime
from clockify_sdk import Clockify
class NotFound(Exception):
pass
@dataclass()
class Project:
api: Clockify = field(repr=False)
id: str
name: str
workspace_id: str
@staticmethod
def list(api):
projects = api.projects.get_all()
return [
Project(api, p['id'], p['name'], p['workspaceId'])
for p in projects
]
@staticmethod
def get_by_id(api, project_id):
return api.projects.get_by_id(project_id)
def sync_projects(api, db):
"""Sync Clockify projects to local database"""
from .db import ClockifyProject
# Fetch and store all projects
projects = Project.list(api)
with db.atomic():
# Delete all existing projects
ClockifyProject.delete().execute()
query = ClockifyProject.insert_many(
[
{
"clockify_id": project.id,
"name": project.name,
"workspace_id": project.workspace_id
}
for project in projects
]
)
query.execute()
return len(projects)
def export_fact(api, project_id, start, end, description=None):
"""Export a Hamster fact to Clockify as a time entry"""
time_entry = api.time_entries.create(
project_id=project_id,
start=start,
end=end,
description=description,
billable=True
)
return time_entry['id']

View File

@ -1,7 +1,6 @@
from datetime import datetime
from peewee import (
CompositeKey,
SqliteDatabase,
Model,
CharField,
@ -102,6 +101,7 @@ class KimaiTag(Model):
class HamsterActivityKimaiMapping(Model):
# TODO: Rename these backrefs to kimai_mappings
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")
kimai_project = ForeignKeyField(KimaiProject, backref="mappings")
@ -122,3 +122,34 @@ class HamsterFactKimaiImport(Model):
class Meta:
database = db
table_name = "hamster_fact_kimai_imports"
class ClockifyProject(Model):
clockify_id = CharField()
name = CharField()
workspace_id = CharField()
class Meta:
database = db
table_name = "clockify_projects"
class HamsterClockifyMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="clockify_mappings")
clockify_project = ForeignKeyField(ClockifyProject, backref="mappings")
clockify_description = CharField()
created_at = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "hamster_clockify_mappings"
class HamsterFactClockifyImport(Model):
hamster_fact = ForeignKeyField(HamsterFact, backref="clockify_imports")
clockify_time_entry_id = CharField()
exported_at = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "hamster_fact_clockify_imports"

View File

@ -1,5 +1,4 @@
from datetime import datetime
import pdb
import requests
from dataclasses import dataclass, field

View File

@ -0,0 +1,32 @@
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.screen import Screen
from textual.widgets import DataTable, Header, Footer
from hamstertools.db import HamsterActivity, ClockifyProject, HamsterClockifyMapping
class ClockifyMappingScreen(Screen):
"""Screen for managing Clockify activity-project mappings"""
BINDINGS = [("q", "quit", "Quit")]
def compose(self) -> ComposeResult:
yield Header()
yield VerticalScroll(DataTable())
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("Hamster Activity", "Clockify Project")
# Query all mappings
mappings = (HamsterClockifyMapping
.select()
.join(HamsterActivity)
.join(ClockifyProject))
for mapping in mappings:
activity = f"{mapping.hamster_activity.category.name} » {mapping.hamster_activity.name}"
project = mapping.clockify_project.name
table.add_row(activity, project)

View File

@ -0,0 +1,24 @@
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.screen import Screen
from textual.widgets import DataTable, Header, Footer
from hamstertools.db import ClockifyProject
class ClockifyProjectScreen(Screen):
"""Screen for listing Clockify projects"""
BINDINGS = [("q", "quit", "Quit")]
def compose(self) -> ComposeResult:
yield Header()
yield VerticalScroll(DataTable())
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("ID", "Name", "Workspace ID")
for project in ClockifyProject.select():
table.add_row(project.clockify_id, project.name, project.workspace_id)

View File

@ -14,7 +14,7 @@ from .tags import TagList
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("g", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
("t", "show_tab('tags')", "Tags"),
]

View File

@ -1,14 +1,11 @@
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.events import DescendantBlur
from textual.screen import ModalScreen
from textual.widgets import DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown
from hamstertools.db import HamsterFactTag, HamsterTag
from hamstertools.screens.list import ListPane

View File

@ -1,5 +1,4 @@
from .kimaiapi import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,

View File

@ -1,5 +1,7 @@
clockify-sdk==0.2.2
requests==2.26.0
peewee==3.17.0
pyxdg==0.28
requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0