Compare commits

...

6 Commits

Author SHA1 Message Date
3wc
db6d7ac640 Merge branch 'api-conf' 2025-08-06 11:09:13 +01:00
3wc
69d471a859 Working config loading from TOML, env var, cli args 2025-08-06 11:07:45 +01:00
3wc
5ed5a73950 Merge branch 'tag-management' 2024-12-07 10:08:44 -05:00
3wc
fd651ff25a Add hamster tag management, reinstate tags during import, reorganise
code
2024-12-07 10:08:20 -05:00
3wc
4fcf972e17 WIP: config file support 2024-12-06 21:30:34 -05:00
3wc
48905953ca Merge branch 'tui' 2024-10-04 13:14:38 -04:00
12 changed files with 446 additions and 176 deletions

View File

@ -1,17 +1,23 @@
#!/usr/bin/env python3.7
import os
import csv
import logging
from datetime import datetime
from itertools import chain
from pathlib import Path
import sys
import tomllib
import click
import requests
from peewee import fn, JOIN
from textual.logging import TextualHandler
from xdg.BaseDirectory import xdg_config_home
from .db import (
KimaiTag,
db,
HamsterCategory,
HamsterActivity,
@ -21,19 +27,31 @@ from .db import (
KimaiActivity,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
HamsterFactTag,
)
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync
CONFIG_FILE = Path(xdg_config_home) / "hamstertools.toml"
HAMSTER_DIR = Path.home() / ".local/share/hamster"
HAMSTER_FILE = HAMSTER_DIR / "hamster.db"
db.init(HAMSTER_FILE)
@click.group()
@click.group(context_settings={"auto_envvar_prefix": "HAMSTERTOOL"})
@click.option("-d", "--debug", is_flag=True)
def cli(debug):
@click.option("--config", default=CONFIG_FILE, type=click.Path())
@click.option("--kimai-api-url", envvar="KIMAI_API_URL")
@click.option("--kimai-username", envvar="KIMAI_USERNAME")
@click.option("--kimai-api-key", envvar="KIMAI_API_KEY")
@click.pass_context
def cli(ctx, config, debug, kimai_api_url=None, kimai_username=None, kimai_api_key=None):
file_config = {}
if os.path.exists(config):
with open(config, "rb") as f:
file_config = tomllib.load(f)
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
@ -46,6 +64,12 @@ def cli(debug):
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
ctx.obj = KimaiAPI(
kimai_username if kimai_username is not None else file_config.get("kimai_username"),
kimai_api_key if kimai_api_key is not None else file_config.get("kimai_api_key"),
kimai_api_url if kimai_api_url is not None else file_config.get("kimai_api_url"),
)
@cli.group()
def categories():
@ -354,7 +378,6 @@ def _get_kimai_mapping_file(path, category_search=None):
multiple=True,
)
@click.argument("username")
@click.argument("api_key", envvar="KIMAI_API_KEY")
@click.option("--just-errors", "just_errors", is_flag=True, help="Only display errors")
@click.option("--ignore-activities", is_flag=True, help="Ignore missing activities")
def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
@ -643,7 +666,7 @@ def _csv(
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
api = KimaiAPI(username=KIMAI_USERNAME, api_key=KIMAI_API_KEY)
SEARCH = "auto"
@ -651,6 +674,8 @@ def _import(search, after, before):
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterFact)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
@ -719,15 +744,15 @@ def _import(search, after, before):
description=f.description
if f.description != ""
else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
tags=",".join([t.tag.name for t in f.tags]) if len(f.tags) > 0 else mapping.kimai_tags
)
r = t.upload().json()
if len(f.tags) > 0 or mapping.kimai_tags:
print(",".join([t.tag.name for t in f.tags]) if len(f.tags)> 0 else mapping.kimai_tags)
print(r["tags"])
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
@ -746,6 +771,7 @@ def init():
KimaiCustomer,
KimaiProject,
KimaiActivity,
KimaiTag,
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
]
@ -758,8 +784,11 @@ def reset():
@db_.command("sync")
def kimai_db_sync():
sync()
@click.pass_obj
def kimai_db_sync(api):
sync(
api
)
@db_.command()
@ -800,10 +829,13 @@ def mapping2db(mapping_path=None, global_=False):
@cli.command()
def app():
@click.pass_obj
def app(kimai_api):
from .app import HamsterToolsApp
app = HamsterToolsApp()
app = HamsterToolsApp(
kimai_api
)
app.run()

View File

@ -16,25 +16,20 @@ class HamsterToolsApp(App):
("q", "quit", "Quit"),
]
api_ = None
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
def __init__(self):
self.MODES = {
"hamster": HamsterScreen(),
"kimai": KimaiScreen(),
}
def __init__(self, kimai_api=None):
self.add_mode("hamster", HamsterScreen())
self.add_mode("kimai", KimaiScreen())
# self.mode MODES = {
# "hamster": HamsterScreen(),
# "kimai": KimaiScreen(),
# }
self.api = kimai_api
super().__init__()
def on_mount(self) -> None:
self.switch_mode("hamster")
def action_quit(self) -> None:
async def action_quit(self) -> None:
db.close()
self.exit()

View File

@ -18,11 +18,12 @@ DataTable:focus .datatable--cursor {
width: 50%;
}
ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen {
ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen, TagEditScreen {
align: center middle;
}
ActivityEditScreen > Vertical,
TagEditScreen > Vertical,
ActivityMappingScreen > Vertical {
padding: 0 1;
width: 80;
@ -31,7 +32,8 @@ ActivityMappingScreen > Vertical {
background: $surface;
}
ActivityEditScreen > Vertical {
ActivityEditScreen > Vertical,
TagEditScreen > Vertical {
height: 10;
}
@ -55,7 +57,7 @@ ActivityMappingScreen AutoComplete {
width: 80;
}
#description, #tags {
#description, #activity_tags {
width: 30;
}

View File

@ -1,6 +1,7 @@
from datetime import datetime
from peewee import (
CompositeKey,
SqliteDatabase,
Model,
CharField,
@ -8,6 +9,7 @@ from peewee import (
DateTimeField,
SmallIntegerField,
BooleanField,
CompositeKey
)
@ -31,6 +33,14 @@ class HamsterActivity(Model):
table_name = "activities"
class HamsterTag(Model):
name = CharField()
class Meta:
database = db
table_name = "tags"
class HamsterFact(Model):
activity = ForeignKeyField(HamsterActivity, backref="facts")
start_time = DateTimeField()
@ -42,6 +52,16 @@ class HamsterFact(Model):
table_name = "facts"
class HamsterFactTag(Model):
fact = ForeignKeyField(HamsterFact, backref="tags")
tag = ForeignKeyField(HamsterTag, backref="facts")
class Meta:
database = db
table_name = "fact_tags"
primary_key = CompositeKey('fact', 'tag')
class KimaiCustomer(Model):
visible = BooleanField(default=True)
name = CharField()
@ -72,6 +92,15 @@ class KimaiActivity(Model):
table_name = "kimai_activities"
class KimaiTag(Model):
name = CharField()
visible = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_tags"
class HamsterActivityKimaiMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")

View File

@ -1,7 +1,6 @@
from datetime import datetime
import pdb
import requests
import requests_cache
import os
from dataclasses import dataclass, field
@ -11,25 +10,22 @@ class NotFound(Exception):
class KimaiAPI(object):
# temporary hardcoded config
KIMAI_API_URL = "https://kimai.autonomic.zone/api"
# KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_USERNAME = "3wordchant"
KIMAI_API_KEY = os.environ["KIMAI_API_KEY"]
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self):
def __init__(self, username=None, api_key=None, api_url=None):
self.auth_headers = {"X-AUTH-USER": username, "X-AUTH-TOKEN": api_key}
self.kimai_api_url = api_url
# NOTE: Uncomment the following line to enable requests_cache, which can make development a *lot* faster
# TODO: Add a config option or something for this
# import requests_cache
# requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3})
self.tags_json = self.get("tags/find?name=", {"visible": 3})
self.user_json = self.get("users/me")
def get(self, endpoint, params=None):
result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
f"{self.kimai_api_url}/{endpoint}", params=params, headers=self.auth_headers
).json()
try:
if result["code"] != 200:
@ -40,7 +36,7 @@ class KimaiAPI(object):
def post(self, endpoint, data):
return requests.post(
f"{self.KIMAI_API_URL}/{endpoint}", json=data, headers=self.auth_headers
f"{self.kimai_api_url}/{endpoint}", json=data, headers=self.auth_headers
)
@ -145,6 +141,38 @@ class Activity(BaseAPI):
if not none:
raise NotFound()
@dataclass
class Tag(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Tag(
api,
t["id"],
t["name"],
t["visible"],
)
for t in api.tags_json
]
@staticmethod
def get_by_id(api, id, none=False):
for t in api.tags_json:
if t["id"] == id:
return Tag(
api,
t["id"],
t["name"],
t["visible"],
)
if not none:
raise NotFound()
@dataclass
class Timesheet(BaseAPI):

View File

@ -0,0 +1,42 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import (
Header,
Footer,
TabbedContent,
TabPane,
)
from .activities import ActivityList
from .categories import CategoryList
from .tags import TagList
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
("t", "show_tab('tags')", "Tags"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
with TabPane("Tags", id="tags"):
yield TagList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -1,39 +1,18 @@
from datetime import datetime
from datetime import datetime
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Grid, Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.containers import Horizontal, Vertical, Grid
from textual.events import DescendantBlur
from textual.screen import Screen, ModalScreen
from textual.widgets import (
Header,
Footer,
DataTable,
Input,
Label,
Checkbox,
TabbedContent,
TabPane,
Button
)
from peewee import fn, JOIN
from textual.screen import ModalScreen
from textual.widgets import Button, Checkbox, DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from ..db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .list import ListPane
from hamstertools.db import HamsterActivity, HamsterActivityKimaiMapping, HamsterCategory, HamsterFact, KimaiActivity, KimaiCustomer, KimaiProject
from hamstertools.screens.list import ListPane
class ActivityEditScreen(ModalScreen):
@ -209,7 +188,7 @@ class ActivityMappingScreen(ModalScreen):
),
Horizontal(
Label("Tags"),
Input(id="tags", value=self.tags),
Input(id="activity_tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@ -265,7 +244,7 @@ class ActivityMappingScreen(ModalScreen):
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"kimai_tags": self.query_one("#activity_tags").value,
"global": self.query_one("#global").value,
}
)
@ -513,98 +492,3 @@ class ActivityList(ListPane):
),
handle_mapping,
)
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,78 @@
from datetime import datetime
from peewee import JOIN, fn
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.widgets import DataTable
from hamstertools.db import HamsterActivity, HamsterCategory, HamsterFact
from hamstertools.screens.list import ListPane
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)

View File

@ -0,0 +1,165 @@
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.events import DescendantBlur
from textual.screen import ModalScreen
from textual.widgets import DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown
from hamstertools.db import HamsterFactTag, HamsterTag
from hamstertools.screens.list import ListPane
class TagEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
def __init__(self, tag):
self.tag_name = tag.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Tag:"), Input(value=self.tag_name, id="tag")
),
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"tag": self.query_one("#tag").value,
}
)
class TagList(ListPane):
BINDINGS = [
# ("s", "sort", "Sort"),
# ("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
Binding(key="escape", action="cancelfilter", show=False),
]
move_from_tag = None
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFactTag.select(
HamsterFactTag.tag_id, fn.COUNT(HamsterFactTag.tag_id).alias("facts_count")
)
.group_by(HamsterFactTag.tag_id)
.alias("facts_count_query")
)
tags = (
HamsterTag.select(
HamsterTag,
HamsterTag.name,
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.switch(HamsterTag)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterTag.id == facts_count_query.c.tag_id),
)
.group_by(HamsterTag)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
tags = tags.where(
HamsterTag.name.contains(filter_search)
)
self.table.add_rows(
[
[
tag.id,
tag.name,
tag.facts_count,
]
for tag in tags
]
)
self.table.sort(*self.sort)
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
tag_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
tag = HamsterTag.get(id=tag_id)
tag.delete_instance()
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_tag = HamsterTag.get(id=row_cells[0])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_tag", None) is not None:
move_to_tag = HamsterTag.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 0))
)
HamsterFactTag.update({HamsterFactTag.tag: move_to_tag}).where(
HamsterFactTag.tag == self.move_from_tag
).execute()
self._refresh()
del self.move_from_tag
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
tag = HamsterTag.get(id=row_cells[0])
def handle_edit(properties):
if properties is None:
return
tag.name = properties["tag"]
tag.save()
self._refresh()
self.app.push_screen(
TagEditScreen(tag=tag), handle_edit
)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"tag id", "tag", "facts"
)
self.sort = (self.columns[1],)
self._refresh()

View File

@ -6,16 +6,16 @@ from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN
from ..utils import truncate
from ..sync import sync
from ..db import (
from ...utils import truncate
from ...sync import sync
from ...db import (
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from ..kimaiapi import Timesheet as KimaiAPITimesheet
from ...kimaiapi import Timesheet as KimaiAPITimesheet
from .list import ListPane
from ..list import ListPane
class KimaiCustomerList(ListPane):

View File

@ -3,8 +3,10 @@ from .kimaiapi import (
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
Tag as KimaiAPITag,
)
from .db import (
KimaiTag,
db,
KimaiProject,
KimaiCustomer,
@ -12,12 +14,11 @@ from .db import (
)
def sync() -> None:
api = KimaiAPI()
def sync(api) -> None:
KimaiCustomer.delete().execute()
KimaiProject.delete().execute()
KimaiActivity.delete().execute()
KimaiTag.delete().execute()
customers = KimaiAPICustomer.list(api)
with db.atomic():
@ -60,3 +61,16 @@ def sync() -> None:
for activity in activities
]
).execute()
tags = KimaiAPITag.list(api)
with db.atomic():
KimaiTag.insert_many(
[
{
"id": tag.id,
"name": tag.name,
"visible": tag.visible,
}
for tag in tags
]
).execute()

View File

@ -4,3 +4,4 @@ requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0
textual-dev==1.2.1
xdg==6.0.0