Compare commits

...

10 Commits

Author SHA1 Message Date
3wc
5ed5a73950 Merge branch 'tag-management' 2024-12-07 10:08:44 -05:00
3wc
fd651ff25a Add hamster tag management, reinstate tags during import, reorganise
code
2024-12-07 10:08:20 -05:00
3wc
48905953ca Merge branch 'tui' 2024-10-04 13:14:38 -04:00
3wc
6e68e85954 Drop click from requirements 2024-02-10 03:35:19 -03:00
3wc
1db3591c05 More error output if global activities forbidden 2024-02-10 03:35:00 -03:00
3wc
fea15472e3 Add a confirmation dialogue when deleting activites..
..which have >0 facts.

Re #2
2023-12-25 19:28:42 -03:00
3wc
2a1d13236b Update requirements 2023-12-25 19:12:03 -03:00
3wc
26b8b5f334 Re4matting 2023-11-18 11:23:54 +00:00
3wc
b62eb5cb22 Yeet upload script into a command 2023-11-17 23:22:21 +00:00
3wc
1145f5e806 API improvements 2023-11-17 22:54:25 +00:00
13 changed files with 598 additions and 247 deletions

View File

@ -12,6 +12,7 @@ from peewee import fn, JOIN
from textual.logging import TextualHandler from textual.logging import TextualHandler
from .db import ( from .db import (
KimaiTag,
db, db,
HamsterCategory, HamsterCategory,
HamsterActivity, HamsterActivity,
@ -22,6 +23,7 @@ from .db import (
HamsterActivityKimaiMapping, HamsterActivityKimaiMapping,
HamsterFactKimaiImport, HamsterFactKimaiImport,
) )
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync from .sync import sync
HAMSTER_DIR = Path.home() / ".local/share/hamster" HAMSTER_DIR = Path.home() / ".local/share/hamster"
@ -481,7 +483,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
found_activities.append(activity_str) found_activities.append(activity_str)
@kimai.command("import") @kimai.command("csv")
@click.option( @click.option(
"--mapping-path", "--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
@ -492,7 +494,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
@click.option("--after", help="Only show time entries after this date") @click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True) @click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username") @click.argument("username")
def _import( def _csv(
username, username,
mapping_path=None, mapping_path=None,
output=None, output=None,
@ -637,6 +639,105 @@ def _import(
output_file.close() output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red",
)
has_errors = True
continue
if f.imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description
if f.description != ""
else mapping.kimai_description,
tags=",".join([t.tag.name for t in f.tags]) if len(f.tags) > 0 else mapping.kimai_tags
)
r = t.upload().json()
if len(f.tags) > 0 or mapping.kimai_tags:
print(",".join([t.tag.name for t in f.tags]) if len(f.tags)> 0 else mapping.kimai_tags)
print(r["tags"])
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
@kimai.group("db") @kimai.group("db")
def db_(): def db_():
pass pass
@ -649,6 +750,7 @@ def init():
KimaiCustomer, KimaiCustomer,
KimaiProject, KimaiProject,
KimaiActivity, KimaiActivity,
KimaiTag,
HamsterActivityKimaiMapping, HamsterActivityKimaiMapping,
HamsterFactKimaiImport, HamsterFactKimaiImport,
] ]

View File

@ -1,6 +1,7 @@
from textual.app import App from textual.app import App
from .db import db from .db import db
from .kimaiapi import KimaiAPI
from .screens.hamster import HamsterScreen from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen from .screens.kimai import KimaiScreen
@ -8,12 +9,21 @@ from .screens.kimai import KimaiScreen
class HamsterToolsApp(App): class HamsterToolsApp(App):
CSS_PATH = "app.tcss" CSS_PATH = "app.tcss"
BINDINGS = [ BINDINGS = [
("h", "switch_mode('hamster')", "Hamster"), ("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"), ("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"), ("q", "quit", "Quit"),
] ]
api_ = None
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
def __init__(self): def __init__(self):
self.MODES = { self.MODES = {
"hamster": HamsterScreen(), "hamster": HamsterScreen(),

View File

@ -18,11 +18,12 @@ DataTable:focus .datatable--cursor {
width: 50%; width: 50%;
} }
ActivityEditScreen, ActivityMappingScreen { ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen, TagEditScreen {
align: center middle; align: center middle;
} }
ActivityEditScreen > Vertical, ActivityEditScreen > Vertical,
TagEditScreen > Vertical,
ActivityMappingScreen > Vertical { ActivityMappingScreen > Vertical {
padding: 0 1; padding: 0 1;
width: 80; width: 80;
@ -31,7 +32,8 @@ ActivityMappingScreen > Vertical {
background: $surface; background: $surface;
} }
ActivityEditScreen > Vertical { ActivityEditScreen > Vertical,
TagEditScreen > Vertical {
height: 10; height: 10;
} }
@ -55,10 +57,32 @@ ActivityMappingScreen AutoComplete {
width: 80; width: 80;
} }
#description, #tags { #description, #activity_tags {
width: 30; width: 30;
} }
ActivityEditScreen Input { ActivityEditScreen Input {
width: 60; width: 60;
} }
#dialog {
grid-size: 2;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 0 1;
width: 60;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question {
column-span: 2;
height: 1fr;
width: 1fr;
content-align: center middle;
}
Button {
width: 100%;
}

View File

@ -8,6 +8,7 @@ from peewee import (
DateTimeField, DateTimeField,
SmallIntegerField, SmallIntegerField,
BooleanField, BooleanField,
CompositeKey
) )
@ -42,6 +43,25 @@ class HamsterFact(Model):
table_name = "facts" table_name = "facts"
class HamsterTag(Model):
name = CharField()
class Meta:
database = db
table_name = "tags"
class HamsterFactTag(Model):
fact = ForeignKeyField(HamsterFact, backref="tags")
tag = ForeignKeyField(HamsterTag, backref="facts")
class Meta:
database = db
table_name = "fact_tags"
primary_key = CompositeKey('fact', 'tag')
class KimaiCustomer(Model): class KimaiCustomer(Model):
visible = BooleanField(default=True) visible = BooleanField(default=True)
name = CharField() name = CharField()
@ -72,6 +92,15 @@ class KimaiActivity(Model):
table_name = "kimai_activities" table_name = "kimai_activities"
class KimaiTag(Model):
name = CharField()
visible = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_tags"
class HamsterActivityKimaiMapping(Model): class HamsterActivityKimaiMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings") hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings") kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")

View File

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
import pdb
import requests import requests
import requests_cache import requests_cache
import os import os
@ -21,16 +22,23 @@ class KimaiAPI(object):
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY} auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self): def __init__(self):
requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800) # requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3}) self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1}) self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3}) self.activities_json = self.get("activities", {"visible": 3})
self.tags_json = self.get("tags/find?name=", {"visible": 3})
self.user_json = self.get("users/me") self.user_json = self.get("users/me")
def get(self, endpoint, params=None): def get(self, endpoint, params=None):
return requests.get( result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
).json() ).json()
try:
if result["code"] != 200:
raise NotFound()
except (KeyError, TypeError):
pass
return result
def post(self, endpoint, data): def post(self, endpoint, data):
return requests.post( return requests.post(
@ -133,12 +141,44 @@ class Activity(BaseAPI):
api, api,
a["id"], a["id"],
a["name"], a["name"],
Project.get_by_id(api, a["project"]), Project.get_by_id(api, a["project"], none),
a["visible"], a["visible"],
) )
if not none: if not none:
raise NotFound() raise NotFound()
@dataclass
class Tag(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Tag(
api,
t["id"],
t["name"],
t["visible"],
)
for t in api.tags_json
]
@staticmethod
def get_by_id(api, id, none=False):
for t in api.tags_json:
if t["id"] == id:
return Tag(
api,
t["id"],
t["name"],
t["visible"],
)
if not none:
raise NotFound()
@dataclass @dataclass
class Timesheet(BaseAPI): class Timesheet(BaseAPI):
@ -169,6 +209,23 @@ class Timesheet(BaseAPI):
) )
] ]
@staticmethod
def list_by(api, **kwargs):
kwargs["size"] = 10000
return [
Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
for t in api.get("timesheets", params=kwargs)
]
@staticmethod @staticmethod
def get_by_id(api, id, none=False): def get_by_id(api, id, none=False):
t = api.get( t = api.get(

View File

@ -0,0 +1,42 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import (
Header,
Footer,
TabbedContent,
TabPane,
)
from .activities import ActivityList
from .categories import CategoryList
from .tags import TagList
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
("t", "show_tab('tags')", "Tags"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
with TabPane("Tags", id="tags"):
yield TagList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -1,38 +1,18 @@
from datetime import datetime
from datetime import datetime
from peewee import JOIN, fn
from textual import on from textual import on
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.binding import Binding from textual.binding import Binding
from textual.containers import Grid, Horizontal, Vertical
from textual.coordinate import Coordinate from textual.coordinate import Coordinate
from textual.containers import Horizontal, Vertical
from textual.events import DescendantBlur from textual.events import DescendantBlur
from textual.screen import Screen, ModalScreen from textual.screen import ModalScreen
from textual.widgets import ( from textual.widgets import Button, Checkbox, DataTable, Input, Label
Header,
Footer,
DataTable,
Input,
Label,
Checkbox,
TabbedContent,
TabPane,
)
from peewee import fn, JOIN
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from ..db import ( from hamstertools.db import HamsterActivity, HamsterActivityKimaiMapping, HamsterCategory, HamsterFact, KimaiActivity, KimaiCustomer, KimaiProject
HamsterCategory, from hamstertools.screens.list import ListPane
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .list import ListPane
class ActivityEditScreen(ModalScreen): class ActivityEditScreen(ModalScreen):
@ -208,7 +188,7 @@ class ActivityMappingScreen(ModalScreen):
), ),
Horizontal( Horizontal(
Label("Tags"), Label("Tags"),
Input(id="tags", value=self.tags), Input(id="activity_tags", value=self.tags),
), ),
Horizontal(Checkbox("Global", id="global")), Horizontal(Checkbox("Global", id="global")),
) )
@ -264,7 +244,7 @@ class ActivityMappingScreen(ModalScreen):
"kimai_project_id": self.project_id, "kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id, "kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value, "kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value, "kimai_tags": self.query_one("#activity_tags").value,
"global": self.query_one("#global").value, "global": self.query_one("#global").value,
} }
) )
@ -273,6 +253,29 @@ class ActivityMappingScreen(ModalScreen):
self.dismiss(None) self.dismiss(None)
class ActivityDeleteConfirmScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
]
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to delete this activity?", id="question"),
Button("Confirm", variant="error", id="confirm"),
Button("Cancel", variant="primary", id="cancel"),
id="dialog",
)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "quit":
self.dismiss(True)
else:
self.dismiss(False)
def action_cancel(self):
self.dismiss(False)
class ActivityList(ListPane): class ActivityList(ListPane):
BINDINGS = [ BINDINGS = [
("s", "sort", "Sort"), ("s", "sort", "Sort"),
@ -384,10 +387,19 @@ class ActivityList(ListPane):
) )
activity = HamsterActivity.get(id=activity_id) activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row. def check_delete(delete: bool) -> None:
self.table.remove_row(row_key) """Called when QuitScreen is dismissed."""
if delete:
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
if activity.facts.count() > 0:
self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete)
else:
check_delete(True)
def action_move_facts(self) -> None: def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row row_idx: int = self.table.cursor_row
@ -480,98 +492,3 @@ class ActivityList(ListPane):
), ),
handle_mapping, handle_mapping,
) )
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,78 @@
from datetime import datetime
from peewee import JOIN, fn
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.widgets import DataTable
from hamstertools.db import HamsterActivity, HamsterCategory, HamsterFact
from hamstertools.screens.list import ListPane
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)

View File

@ -0,0 +1,165 @@
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.events import DescendantBlur
from textual.screen import ModalScreen
from textual.widgets import DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown
from hamstertools.db import HamsterFactTag, HamsterTag
from hamstertools.screens.list import ListPane
class TagEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
def __init__(self, tag):
self.tag_name = tag.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Tag:"), Input(value=self.tag_name, id="tag")
),
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"tag": self.query_one("#tag").value,
}
)
class TagList(ListPane):
BINDINGS = [
# ("s", "sort", "Sort"),
# ("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
Binding(key="escape", action="cancelfilter", show=False),
]
move_from_tag = None
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFactTag.select(
HamsterFactTag.tag_id, fn.COUNT(HamsterFactTag.tag_id).alias("facts_count")
)
.group_by(HamsterFactTag.tag_id)
.alias("facts_count_query")
)
tags = (
HamsterTag.select(
HamsterTag,
HamsterTag.name,
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.switch(HamsterTag)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterTag.id == facts_count_query.c.tag_id),
)
.group_by(HamsterTag)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
tags = tags.where(
HamsterTag.name.contains(filter_search)
)
self.table.add_rows(
[
[
tag.id,
tag.name,
tag.facts_count,
]
for tag in tags
]
)
self.table.sort(*self.sort)
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
tag_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
tag = HamsterTag.get(id=tag_id)
tag.delete_instance()
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_tag = HamsterTag.get(id=row_cells[0])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_tag", None) is not None:
move_to_tag = HamsterTag.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 0))
)
HamsterFactTag.update({HamsterFactTag.tag: move_to_tag}).where(
HamsterFactTag.tag == self.move_from_tag
).execute()
self._refresh()
del self.move_from_tag
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
tag = HamsterTag.get(id=row_cells[0])
def handle_edit(properties):
if properties is None:
return
tag.name = properties["tag"]
tag.save()
self._refresh()
self.app.push_screen(
TagEditScreen(tag=tag), handle_edit
)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"tag id", "tag", "facts"
)
self.sort = (self.columns[1],)
self._refresh()

View File

@ -1,19 +1,21 @@
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.coordinate import Coordinate
from textual.binding import Binding from textual.binding import Binding
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN from peewee import fn, JOIN
from ..utils import truncate from ...utils import truncate
from ..sync import sync from ...sync import sync
from ..db import ( from ...db import (
KimaiProject, KimaiProject,
KimaiCustomer, KimaiCustomer,
KimaiActivity, KimaiActivity,
) )
from ...kimaiapi import Timesheet as KimaiAPITimesheet
from .list import ListPane from ..list import ListPane
class KimaiCustomerList(ListPane): class KimaiCustomerList(ListPane):
@ -89,6 +91,7 @@ class KimaiActivityList(ListPane):
("s", "sort", "Sort"), ("s", "sort", "Sort"),
("r", "refresh", "Refresh"), ("r", "refresh", "Refresh"),
("g", "get", "Get data"), ("g", "get", "Get data"),
("#", "count", "Count"),
("/", "filter", "Search"), ("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False), Binding(key="escape", action="cancelfilter", show=False),
] ]
@ -122,6 +125,7 @@ class KimaiActivityList(ListPane):
activity.id, activity.id,
truncate(activity.name, 40), truncate(activity.name, 40),
activity.visible, activity.visible,
"?",
] ]
for activity in activities for activity in activities
] ]
@ -144,10 +148,20 @@ class KimaiActivityList(ListPane):
"id", "id",
"name", "name",
"visible", "visible",
"times",
) )
self.sort = (self.columns[1], self.columns[3]) self.sort = (self.columns[1], self.columns[3])
self._refresh() self._refresh()
def action_count(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
activity_id = row_cells[2]
count = len(KimaiAPITimesheet.list_by(self.app.api, activity=activity_id))
self.table.update_cell_at(Coordinate(row_idx, 5), count)
class KimaiScreen(Screen): class KimaiScreen(Screen):
BINDINGS = [ BINDINGS = [

View File

@ -3,8 +3,10 @@ from .kimaiapi import (
Customer as KimaiAPICustomer, Customer as KimaiAPICustomer,
Project as KimaiAPIProject, Project as KimaiAPIProject,
Activity as KimaiAPIActivity, Activity as KimaiAPIActivity,
Tag as KimaiAPITag,
) )
from .db import ( from .db import (
KimaiTag,
db, db,
KimaiProject, KimaiProject,
KimaiCustomer, KimaiCustomer,
@ -18,6 +20,7 @@ def sync() -> None:
KimaiCustomer.delete().execute() KimaiCustomer.delete().execute()
KimaiProject.delete().execute() KimaiProject.delete().execute()
KimaiActivity.delete().execute() KimaiActivity.delete().execute()
KimaiTag.delete().execute()
customers = KimaiAPICustomer.list(api) customers = KimaiAPICustomer.list(api)
with db.atomic(): with db.atomic():
@ -60,3 +63,16 @@ def sync() -> None:
for activity in activities for activity in activities
] ]
).execute() ).execute()
tags = KimaiAPITag.list(api)
with db.atomic():
KimaiTag.insert_many(
[
{
"id": tag.id,
"name": tag.name,
"visible": tag.visible,
}
for tag in tags
]
).execute()

View File

@ -1,2 +1,6 @@
click==8.0.3
requests==2.26.0 requests==2.26.0
peewee==3.17.0
requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0
textual-dev==1.2.1

View File

@ -1,107 +0,0 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from datetime import datetime
from peewee import JOIN
from hamstertools.db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
HamsterFactKimaiImport,
)
from hamstertools.kimaiapi import KimaiAPI, Timesheet, Project, Activity
api = KimaiAPI()
DATE_FROM = "2023-10-01"
DATE_TO = "2023-11-01"
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(DATE_FROM, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(DATE_TO, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
print(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity}"
)
has_errors = True
continue
if f.imports.count() > 0:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)"
)
has_errors = True
continue
# if has_errors:
# sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description if f.description != "" else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
)
r = t.upload().json()
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')