Compare commits

...

11 Commits

Author SHA1 Message Date
3wc
90d74381df Add requirements 2025-08-24 23:11:53 -04:00
3wc
613d361c00 Handle in-progress fact import 2025-08-24 23:03:45 -04:00
3wc
a3ccc42a6e Add Clockify import, 4matting 2025-08-07 15:05:07 +01:00
3wc
68998be917 Remove ancient kimai csv command, add Clockify mapping2db 2025-08-07 14:28:12 +01:00
3wc
fb49a24ae1 Preliminary clockify support 2025-08-07 12:03:02 +01:00
3wc
db6d7ac640 Merge branch 'api-conf' 2025-08-06 11:09:13 +01:00
3wc
69d471a859 Working config loading from TOML, env var, cli args 2025-08-06 11:07:45 +01:00
3wc
5ed5a73950 Merge branch 'tag-management' 2024-12-07 10:08:44 -05:00
3wc
fd651ff25a Add hamster tag management, reinstate tags during import, reorganise
code
2024-12-07 10:08:20 -05:00
3wc
4fcf972e17 WIP: config file support 2024-12-06 21:30:34 -05:00
3wc
48905953ca Merge branch 'tui' 2024-10-04 13:14:38 -04:00
15 changed files with 812 additions and 341 deletions

View File

@ -1,17 +1,26 @@
#!/usr/bin/env python3.7
import os
import csv
import logging
from datetime import datetime
from itertools import chain
from pathlib import Path
import sys
import tomllib
import click
from clockify_sdk import Clockify
import requests
from peewee import fn, JOIN
from textual.logging import TextualHandler
from xdg.BaseDirectory import xdg_config_home
from hamstertools.clockify import export_fact
from .db import (
KimaiTag,
db,
HamsterCategory,
HamsterActivity,
@ -21,19 +30,31 @@ from .db import (
KimaiActivity,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
HamsterFactTag,
ClockifyProject,
HamsterClockifyMapping,
HamsterFactClockifyImport,
)
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync
CONFIG_FILE = Path(xdg_config_home) / "hamstertools.toml"
HAMSTER_DIR = Path.home() / ".local/share/hamster"
HAMSTER_FILE = HAMSTER_DIR / "hamster.db"
db.init(HAMSTER_FILE)
@click.group()
@click.group(context_settings={"auto_envvar_prefix": "HAMSTERTOOL"})
@click.option("-d", "--debug", is_flag=True)
def cli(debug):
@click.option("--config", default=CONFIG_FILE, type=click.Path())
@click.pass_context
def cli(ctx, config, debug):
file_config = {}
if os.path.exists(config):
with open(config, "rb") as f:
file_config = tomllib.load(f)
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
@ -46,6 +67,9 @@ def cli(debug):
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
ctx.ensure_object(dict)
ctx.obj["config"] = file_config
@cli.group()
def categories():
@ -310,7 +334,23 @@ def find_duplicates():
@cli.group()
def kimai():
@click.option("--kimai-api-url", envvar="KIMAI_API_URL")
@click.option("--kimai-username", envvar="KIMAI_USERNAME")
@click.option("--kimai-api-key", envvar="KIMAI_API_KEY")
@click.pass_context
def kimai(ctx, kimai_api_url, kimai_username, kimai_api_key):
file_config = ctx.obj["config"]
ctx.obj["kimai"] = KimaiAPI(
kimai_username
if kimai_username is not None
else file_config.get("kimai_username"),
kimai_api_key
if kimai_api_key is not None
else file_config.get("kimai_api_key"),
kimai_api_url
if kimai_api_url is not None
else file_config.get("kimai_api_url"),
)
pass
@ -354,7 +394,6 @@ def _get_kimai_mapping_file(path, category_search=None):
multiple=True,
)
@click.argument("username")
@click.argument("api_key", envvar="KIMAI_API_KEY")
@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors")
@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities")
def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
@ -482,168 +521,13 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
found_activities.append(activity_str)
@kimai.command("csv")
@click.option(
"--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
multiple=True,
)
@click.option("--output", help="Output file (default kimai.csv)")
@click.option("--category-search", help="Category search string")
@click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username")
def _csv(
username,
mapping_path=None,
output=None,
category_search=None,
after=None,
show_missing=False,
):
"""
Export time tracking data in Kimai format
"""
if mapping_path is None:
mapping_path = HAMSTER_DIR / "mapping.kimai.csv"
if output is None:
timestamp = datetime.now().strftime("%F")
output = f"kimai_{timestamp}.csv"
if type(mapping_path) == tuple:
mapping_files = []
for mapping_path_item in mapping_path:
mapping_file = _get_kimai_mapping_file(mapping_path_item, category_search)
next(mapping_file)
mapping_files.append(mapping_file)
mapping_reader = csv.reader(chain(*mapping_files))
else:
mapping_file = _get_kimai_mapping_file(mapping_path, category_search)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
mapping = {
"{0}:{1}".format(row[0], row[1]): [row[2], row[3], row[4], row[5]]
for row in mapping_reader
}
if type(mapping_path) == tuple:
for mapping_file in mapping_files:
mapping_file.close()
else:
mapping_file.close()
output_file = open(output, "w")
output_writer = csv.writer(output_file)
args = []
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity)
.join(HamsterCategory, JOIN.LEFT_OUTER)
)
sql = """
SELECT
facts.id, facts.start_time, facts.end_time, facts.description,
activities.id, activities.name,
categories.name, categories.id
FROM
facts
LEFT JOIN
activities ON facts.activity_id = activities.id
LEFT JOIN
categories ON activities.category_id = categories.id """
if category_search is not None:
facts = facts.where(HamsterCategory.name.contains(category_search))
if after is not None:
facts = facts.where(HamsterFact.start_time >= after)
if not show_missing:
output_writer.writerow(
[
"Date",
"From",
"To",
"Duration",
"Rate",
"User",
"Customer",
"Project",
"Activity",
"Description",
"Exported",
"Tags",
"HourlyRate",
"FixedRate",
"InternalRate",
]
)
for fact in facts:
k = f"{fact.activity.category.name}:{fact.activity.name}"
try:
mapping_ = mapping[k]
except KeyError:
if show_missing:
output_writer.writerow(
[fact.activity.category.name, fact.activity.name]
)
click.secho("Can't find mapping for '{0}', skipping".format(k), fg="yellow")
continue
if show_missing:
continue
if fact.start_time is None or fact.end_time is None:
click.secho(
f"Missing duration data '{fact.start_time}-{fact.end_time}', skipping",
fg="yellow",
)
continue
if len(mapping_) < 5:
mapping_.append(None)
date_start, date_end = (
datetime.strptime(fact.start_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
datetime.strptime(fact.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S"),
)
duration = (date_start - date_end).seconds / 3600
output_writer.writerow(
[
date_start.strftime("%Y-%m-%d"),
date_start.strftime("%H:%M"),
"", # To (time)
duration,
"", # Rate
username,
mapping_[0],
mapping_[1],
mapping_[2],
fact.description or mapping_[4] or "",
"0", # Exported
mapping_[3],
"", # Hourly rate
"", # Fixed rate
]
)
output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
@click.pass_context
def kimai_import(ctx, search, after, before):
api = ctx.obj
SEARCH = "auto"
@ -651,6 +535,8 @@ def _import(search, after, before):
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterFact)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
@ -680,7 +566,8 @@ def _import(search, after, before):
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red",
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})",
fg="red",
)
has_errors = True
continue
@ -719,50 +606,58 @@ def _import(search, after, before):
description=f.description
if f.description != ""
else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
tags=",".join([t.tag.name for t in f.tags])
if len(f.tags) > 0
else mapping.kimai_tags,
)
r = t.upload().json()
if len(f.tags) > 0 or mapping.kimai_tags:
print(
",".join([t.tag.name for t in f.tags])
if len(f.tags) > 0
else mapping.kimai_tags
)
print(r["tags"])
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
print(f"Created Kimai timesheet {r['id']}")
@kimai.group("db")
def db_():
def kimai_db():
pass
@db_.command()
@kimai_db.command()
def init():
db.create_tables(
[
KimaiCustomer,
KimaiProject,
KimaiActivity,
KimaiTag,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
]
)
@db_.command()
@kimai_db.command()
def reset():
HamsterActivityKimaiMapping.delete().execute()
@db_.command("sync")
def kimai_db_sync():
sync()
@kimai_db.command("sync")
@click.pass_context
def kimai_db_sync(ctx):
sync(ctx.obj["kimai"])
@db_.command()
@kimai_db.command("mapping2db")
@click.option(
"-g",
"--global",
@ -771,7 +666,7 @@ def kimai_db_sync():
is_flag=True,
)
@click.option("--mapping-path", help="Mapping file")
def mapping2db(mapping_path=None, global_=False):
def kimai_mapping2db(mapping_path=None, global_=False):
mapping_file = _get_kimai_mapping_file(mapping_path, None)
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
@ -799,6 +694,172 @@ def mapping2db(mapping_path=None, global_=False):
)
@kimai.command("app")
@click.pass_context
def kimai_app(ctx):
from .app import HamsterToolsAppKimai
app = HamsterToolsAppKimai(ctx.obj["kimai"])
app.run()
@cli.group()
@click.option("--api-key", envvar="CLOCKIFY_API_KEY", help="Clockify API key")
@click.option(
"--workspace-id", envvar="CLOCKIFY_WORKSPACE_ID", help="Clockify workspace ID"
)
@click.pass_context
def clockify(ctx, api_key, workspace_id):
file_config = ctx.obj["config"]
ctx.obj["clockify"] = Clockify(
api_key=api_key if api_key is not None else file_config.get("clockify_api_key"),
workspace_id=workspace_id
if workspace_id is not None
else file_config.get("clockify_workspace_id"),
)
@clockify.group("db")
def clockify_db():
pass
@clockify_db.command("sync")
@click.pass_context
def clockify_db_sync(ctx):
from .clockify import sync_projects
count = sync_projects(ctx.obj["clockify"], db)
click.echo(f"Synced {count} Clockify projects")
@clockify_db.command("init")
def clockify_db_init():
db.create_tables(
[
ClockifyProject,
HamsterClockifyMapping,
HamsterFactClockifyImport,
]
)
@clockify_db.command("reset")
def clockify_db_reset():
HamsterClockifyMapping.delete().execute()
@clockify_db.command("mapping2db")
@click.argument("mapping-path")
def clockify_mapping2db(mapping_path):
mapping_file = open(mapping_path, "r")
next(mapping_file)
mapping_reader = csv.reader(mapping_file)
for row in mapping_reader:
hamster_category = HamsterCategory.get(name=row[0])
hamster_activity = HamsterActivity.get(
name=row[1], category_id=hamster_category.id
)
clockify_project = ClockifyProject.get(clockify_id=row[2])
HamsterClockifyMapping.create(
hamster_activity=hamster_activity,
clockify_project=clockify_project.clockify_id,
clockify_description=row[3],
)
@clockify.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
@click.pass_context
def clockify_import(ctx, search, after, before):
api = ctx.obj['clockify']
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterFact)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(search)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.clockify_mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if f.clockify_imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.clockify_imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.clockify_mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.clockify_imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.end_time is None:
print(
f"still in progress, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = export_fact(
api,
mapping.clockify_project_id,
f.start_time,
f.end_time,
f.description if f.description is not None else mapping.clockify_description,
)
HamsterFactClockifyImport.create(hamster_fact=f, clockify_time_entry_id=t).save()
print(f"Created Clockify timesheet {t}")
@clockify.command("app")
@click.pass_context
def clockify_app(ctx):
from .app import HamsterToolsAppClockify
app = HamsterToolsAppClockify(ctx.obj["clockify"])
app.run()
@cli.command()
def app():
from .app import HamsterToolsApp

View File

@ -1,10 +1,12 @@
from textual.app import App
from hamstertools import clockify
from .db import db
from .kimaiapi import KimaiAPI
from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
from .screens.clockify.projects import ClockifyProjectScreen
class HamsterToolsApp(App):
@ -12,29 +14,48 @@ class HamsterToolsApp(App):
BINDINGS = [
("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
api_ = None
def __init__(self, kimai_api=None, clockify_api=None):
self.add_mode("hamster", HamsterScreen())
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
if kimai_api is not None:
self.kimai_api = kimai_api
self.add_mode("kimai", KimaiScreen())
def __init__(self):
self.MODES = {
"hamster": HamsterScreen(),
"kimai": KimaiScreen(),
}
if clockify_api is not None:
self.clockify_api = clockify_api
self.add_mode("clockify", ClockifyProjectScreen())
super().__init__()
def on_mount(self) -> None:
self.switch_mode("hamster")
def action_quit(self) -> None:
async def action_quit(self) -> None:
db.close()
self.exit()
class HamsterToolsAppKimai(HamsterToolsApp):
BINDINGS = HamsterToolsApp.BINDINGS + [
("k", "switch_mode('kimai')", "Kimai"),
]
def __init__(self, kimai_api):
self.kimai_api = kimai_api
self.add_mode("kimai", KimaiScreen())
super().__init__()
class HamsterToolsAppClockify(HamsterToolsApp):
BINDINGS = HamsterToolsApp.BINDINGS + [
("c", "switch_mode('clockify_projects')", "Clockify"),
]
def __init__(self, clockify_api):
self.clockify_api = clockify_api
self.add_mode("clockify_projects", ClockifyProjectScreen())
super().__init__()

View File

@ -18,11 +18,12 @@ DataTable:focus .datatable--cursor {
width: 50%;
}
ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen {
ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen, TagEditScreen {
align: center middle;
}
ActivityEditScreen > Vertical,
TagEditScreen > Vertical,
ActivityMappingScreen > Vertical {
padding: 0 1;
width: 80;
@ -31,7 +32,8 @@ ActivityMappingScreen > Vertical {
background: $surface;
}
ActivityEditScreen > Vertical {
ActivityEditScreen > Vertical,
TagEditScreen > Vertical {
height: 10;
}
@ -55,7 +57,7 @@ ActivityMappingScreen AutoComplete {
width: 80;
}
#description, #tags {
#description, #activity_tags {
width: 30;
}

62
hamstertools/clockify.py Normal file
View File

@ -0,0 +1,62 @@
from dataclasses import dataclass, field
from datetime import datetime
from clockify_sdk import Clockify
class NotFound(Exception):
pass
@dataclass()
class Project:
api: Clockify = field(repr=False)
id: str
name: str
workspace_id: str
@staticmethod
def list(api):
projects = api.projects.get_all()
return [
Project(api, p['id'], p['name'], p['workspaceId'])
for p in projects
]
@staticmethod
def get_by_id(api, project_id):
return api.projects.get_by_id(project_id)
def sync_projects(api, db):
"""Sync Clockify projects to local database"""
from .db import ClockifyProject
# Fetch and store all projects
projects = Project.list(api)
with db.atomic():
# Delete all existing projects
ClockifyProject.delete().execute()
query = ClockifyProject.insert_many(
[
{
"clockify_id": project.id,
"name": project.name,
"workspace_id": project.workspace_id
}
for project in projects
]
)
query.execute()
return len(projects)
def export_fact(api, project_id, start, end, description=None):
"""Export a Hamster fact to Clockify as a time entry"""
time_entry = api.time_entries.create(
project_id=project_id,
start=start,
end=end,
description=description,
billable=True
)
return time_entry['id']

View File

@ -8,6 +8,7 @@ from peewee import (
DateTimeField,
SmallIntegerField,
BooleanField,
CompositeKey
)
@ -31,6 +32,14 @@ class HamsterActivity(Model):
table_name = "activities"
class HamsterTag(Model):
name = CharField()
class Meta:
database = db
table_name = "tags"
class HamsterFact(Model):
activity = ForeignKeyField(HamsterActivity, backref="facts")
start_time = DateTimeField()
@ -42,6 +51,16 @@ class HamsterFact(Model):
table_name = "facts"
class HamsterFactTag(Model):
fact = ForeignKeyField(HamsterFact, backref="tags")
tag = ForeignKeyField(HamsterTag, backref="facts")
class Meta:
database = db
table_name = "fact_tags"
primary_key = CompositeKey('fact', 'tag')
class KimaiCustomer(Model):
visible = BooleanField(default=True)
name = CharField()
@ -72,7 +91,17 @@ class KimaiActivity(Model):
table_name = "kimai_activities"
class KimaiTag(Model):
name = CharField()
visible = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_tags"
class HamsterActivityKimaiMapping(Model):
# TODO: Rename these backrefs to kimai_mappings
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")
kimai_project = ForeignKeyField(KimaiProject, backref="mappings")
@ -93,3 +122,34 @@ class HamsterFactKimaiImport(Model):
class Meta:
database = db
table_name = "hamster_fact_kimai_imports"
class ClockifyProject(Model):
clockify_id = CharField()
name = CharField()
workspace_id = CharField()
class Meta:
database = db
table_name = "clockify_projects"
class HamsterClockifyMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="clockify_mappings")
clockify_project = ForeignKeyField(ClockifyProject, backref="mappings")
clockify_description = CharField()
created_at = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "hamster_clockify_mappings"
class HamsterFactClockifyImport(Model):
hamster_fact = ForeignKeyField(HamsterFact, backref="clockify_imports")
clockify_time_entry_id = CharField()
exported_at = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "hamster_fact_clockify_imports"

View File

@ -1,7 +1,5 @@
from datetime import datetime
import requests
import requests_cache
import os
from dataclasses import dataclass, field
@ -11,25 +9,22 @@ class NotFound(Exception):
class KimaiAPI(object):
# temporary hardcoded config
KIMAI_API_URL = "https://kimai.autonomic.zone/api"
# KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_USERNAME = "3wordchant"
KIMAI_API_KEY = os.environ["KIMAI_API_KEY"]
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self):
def __init__(self, username=None, api_key=None, api_url=None):
self.auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key}
self.kimai_api_url = api_url
# NOTE: Uncomment the following line to enable requests_cache, which can make development a *lot* faster
# TODO: Add a config option or something for this
# import requests_cache
# requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3})
self.tags_json = self.get("tags/find?name=", {"visible": 3})
self.user_json = self.get("users/me")
def get(self, endpoint, params=None):
result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
f"{self.kimai_api_url}/{endpoint}", params=params, headers=self.auth_headers
).json()
try:
if result["code"] != 200:
@ -40,7 +35,7 @@ class KimaiAPI(object):
def post(self, endpoint, data):
return requests.post(
f"{self.KIMAI_API_URL}/{endpoint}", json=data, headers=self.auth_headers
f"{self.kimai_api_url}/{endpoint}", json=data, headers=self.auth_headers
)
@ -145,6 +140,38 @@ class Activity(BaseAPI):
if not none:
raise NotFound()
@dataclass
class Tag(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Tag(
api,
t["id"],
t["name"],
t["visible"],
)
for t in api.tags_json
]
@staticmethod
def get_by_id(api, id, none=False):
for t in api.tags_json:
if t["id"] == id:
return Tag(
api,
t["id"],
t["name"],
t["visible"],
)
if not none:
raise NotFound()
@dataclass
class Timesheet(BaseAPI):

View File

@ -0,0 +1,32 @@
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.screen import Screen
from textual.widgets import DataTable, Header, Footer
from hamstertools.db import HamsterActivity, ClockifyProject, HamsterClockifyMapping
class ClockifyMappingScreen(Screen):
"""Screen for managing Clockify activity-project mappings"""
BINDINGS = [("q", "quit", "Quit")]
def compose(self) -> ComposeResult:
yield Header()
yield VerticalScroll(DataTable())
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("Hamster Activity", "Clockify Project")
# Query all mappings
mappings = (HamsterClockifyMapping
.select()
.join(HamsterActivity)
.join(ClockifyProject))
for mapping in mappings:
activity = f"{mapping.hamster_activity.category.name} » {mapping.hamster_activity.name}"
project = mapping.clockify_project.name
table.add_row(activity, project)

View File

@ -0,0 +1,24 @@
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.screen import Screen
from textual.widgets import DataTable, Header, Footer
from hamstertools.db import ClockifyProject
class ClockifyProjectScreen(Screen):
"""Screen for listing Clockify projects"""
BINDINGS = [("q", "quit", "Quit")]
def compose(self) -> ComposeResult:
yield Header()
yield VerticalScroll(DataTable())
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("ID", "Name", "Workspace ID")
for project in ClockifyProject.select():
table.add_row(project.clockify_id, project.name, project.workspace_id)

View File

@ -0,0 +1,42 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import (
Header,
Footer,
TabbedContent,
TabPane,
)
from .activities import ActivityList
from .categories import CategoryList
from .tags import TagList
class HamsterScreen(Screen):
BINDINGS = [
("g", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
("t", "show_tab('tags')", "Tags"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
with TabPane("Tags", id="tags"):
yield TagList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -1,39 +1,18 @@
from datetime import datetime
from datetime import datetime
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Grid, Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.containers import Horizontal, Vertical, Grid
from textual.events import DescendantBlur
from textual.screen import Screen, ModalScreen
from textual.widgets import (
Header,
Footer,
DataTable,
Input,
Label,
Checkbox,
TabbedContent,
TabPane,
Button
)
from peewee import fn, JOIN
from textual.screen import ModalScreen
from textual.widgets import Button, Checkbox, DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from ..db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .list import ListPane
from hamstertools.db import HamsterActivity, HamsterActivityKimaiMapping, HamsterCategory, HamsterFact, KimaiActivity, KimaiCustomer, KimaiProject
from hamstertools.screens.list import ListPane
class ActivityEditScreen(ModalScreen):
@ -209,7 +188,7 @@ class ActivityMappingScreen(ModalScreen):
),
Horizontal(
Label("Tags"),
Input(id="tags", value=self.tags),
Input(id="activity_tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@ -265,7 +244,7 @@ class ActivityMappingScreen(ModalScreen):
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"kimai_tags": self.query_one("#activity_tags").value,
"global": self.query_one("#global").value,
}
)
@ -513,98 +492,3 @@ class ActivityList(ListPane):
),
handle_mapping,
)
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,78 @@
from datetime import datetime
from peewee import JOIN, fn
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.widgets import DataTable
from hamstertools.db import HamsterActivity, HamsterCategory, HamsterFact
from hamstertools.screens.list import ListPane
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)

View File

@ -0,0 +1,162 @@
from peewee import JOIN, fn
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import ModalScreen
from textual.widgets import DataTable, Input, Label
from hamstertools.db import HamsterFactTag, HamsterTag
from hamstertools.screens.list import ListPane
class TagEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
def __init__(self, tag):
self.tag_name = tag.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Tag:"), Input(value=self.tag_name, id="tag")
),
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"tag": self.query_one("#tag").value,
}
)
class TagList(ListPane):
BINDINGS = [
# ("s", "sort", "Sort"),
# ("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
Binding(key="escape", action="cancelfilter", show=False),
]
move_from_tag = None
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFactTag.select(
HamsterFactTag.tag_id, fn.COUNT(HamsterFactTag.tag_id).alias("facts_count")
)
.group_by(HamsterFactTag.tag_id)
.alias("facts_count_query")
)
tags = (
HamsterTag.select(
HamsterTag,
HamsterTag.name,
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.switch(HamsterTag)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterTag.id == facts_count_query.c.tag_id),
)
.group_by(HamsterTag)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
tags = tags.where(
HamsterTag.name.contains(filter_search)
)
self.table.add_rows(
[
[
tag.id,
tag.name,
tag.facts_count,
]
for tag in tags
]
)
self.table.sort(*self.sort)
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
tag_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
tag = HamsterTag.get(id=tag_id)
tag.delete_instance()
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_tag = HamsterTag.get(id=row_cells[0])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_tag", None) is not None:
move_to_tag = HamsterTag.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 0))
)
HamsterFactTag.update({HamsterFactTag.tag: move_to_tag}).where(
HamsterFactTag.tag == self.move_from_tag
).execute()
self._refresh()
del self.move_from_tag
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
tag = HamsterTag.get(id=row_cells[0])
def handle_edit(properties):
if properties is None:
return
tag.name = properties["tag"]
tag.save()
self._refresh()
self.app.push_screen(
TagEditScreen(tag=tag), handle_edit
)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"tag id", "tag", "facts"
)
self.sort = (self.columns[1],)
self._refresh()

View File

@ -6,16 +6,16 @@ from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN
from ..utils import truncate
from ..sync import sync
from ..db import (
from ...utils import truncate
from ...sync import sync
from ...db import (
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from ..kimaiapi import Timesheet as KimaiAPITimesheet
from ...kimaiapi import Timesheet as KimaiAPITimesheet
from .list import ListPane
from ..list import ListPane
class KimaiCustomerList(ListPane):

View File

@ -1,10 +1,11 @@
from .kimaiapi import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
Tag as KimaiAPITag,
)
from .db import (
KimaiTag,
db,
KimaiProject,
KimaiCustomer,
@ -12,12 +13,11 @@ from .db import (
)
def sync() -> None:
api = KimaiAPI()
def sync(api) -> None:
KimaiCustomer.delete().execute()
KimaiProject.delete().execute()
KimaiActivity.delete().execute()
KimaiTag.delete().execute()
customers = KimaiAPICustomer.list(api)
with db.atomic():
@ -60,3 +60,16 @@ def sync() -> None:
for activity in activities
]
).execute()
tags = KimaiAPITag.list(api)
with db.atomic():
KimaiTag.insert_many(
[
{
"id": tag.id,
"name": tag.name,
"visible": tag.visible,
}
for tag in tags
]
).execute()

View File

@ -1,6 +1,9 @@
clockify-sdk==0.2.2
requests==2.26.0
peewee==3.17.0
pyxdg==0.28
requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0
textual-dev==1.2.1
xdg==6.0.0