Compare commits

...

15 Commits

Author SHA1 Message Date
3wc
5ed5a73950 Merge branch 'tag-management' 2024-12-07 10:08:44 -05:00
3wc
fd651ff25a Add hamster tag management, reinstate tags during import, reorganise
code
2024-12-07 10:08:20 -05:00
3wc
48905953ca Merge branch 'tui' 2024-10-04 13:14:38 -04:00
3wc
6e68e85954 Drop click from requirements 2024-02-10 03:35:19 -03:00
3wc
1db3591c05 More error output if global activities forbidden 2024-02-10 03:35:00 -03:00
3wc
fea15472e3 Add a confirmation dialogue when deleting activites..
..which have >0 facts.

Re #2
2023-12-25 19:28:42 -03:00
3wc
2a1d13236b Update requirements 2023-12-25 19:12:03 -03:00
3wc
26b8b5f334 Re4matting 2023-11-18 11:23:54 +00:00
3wc
b62eb5cb22 Yeet upload script into a command 2023-11-17 23:22:21 +00:00
3wc
1145f5e806 API improvements 2023-11-17 22:54:25 +00:00
3wc
a01652f301 Tidy up kimai screen a bit 2023-11-04 00:01:53 +00:00
3wc
fd28955da0 Add kimai "visible" field, moar kimai screens 2023-11-03 23:42:11 +00:00
3wc
d3c2da74e6 Switch to tabs 2023-11-03 21:52:08 +00:00
3wc
ff013525af Refucktoring 2023-11-03 00:13:54 +00:00
3wc
ce68a03ea9 Mapping editing 2023-11-02 22:43:11 +00:00
16 changed files with 1337 additions and 771 deletions

View File

@ -12,6 +12,7 @@ from peewee import fn, JOIN
from textual.logging import TextualHandler from textual.logging import TextualHandler
from .db import ( from .db import (
KimaiTag,
db, db,
HamsterCategory, HamsterCategory,
HamsterActivity, HamsterActivity,
@ -22,11 +23,11 @@ from .db import (
HamsterActivityKimaiMapping, HamsterActivityKimaiMapping,
HamsterFactKimaiImport, HamsterFactKimaiImport,
) )
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync from .sync import sync
HAMSTER_DIR = Path.home() / ".local/share/hamster" HAMSTER_DIR = Path.home() / ".local/share/hamster"
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db' HAMSTER_FILE = HAMSTER_DIR / "hamster.db"
HAMSTER_FILE = "hamster-testing.db"
db.init(HAMSTER_FILE) db.init(HAMSTER_FILE)
@ -370,7 +371,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
mapping_files = [] mapping_files = []
for mapping_path_item in mapping_path: for mapping_path_item in mapping_path:
if not Path(mapping_path_item).exists(): if not Path(mapping_path_item).exists():
raise click.UsageError(f'{mapping_path_item} does not exist') raise click.UsageError(f"{mapping_path_item} does not exist")
mapping_file = _get_kimai_mapping_file(mapping_path_item) mapping_file = _get_kimai_mapping_file(mapping_path_item)
next(mapping_file) next(mapping_file)
@ -482,7 +483,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
found_activities.append(activity_str) found_activities.append(activity_str)
@kimai.command("import") @kimai.command("csv")
@click.option( @click.option(
"--mapping-path", "--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)", help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
@ -493,7 +494,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
@click.option("--after", help="Only show time entries after this date") @click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True) @click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username") @click.argument("username")
def _import( def _csv(
username, username,
mapping_path=None, mapping_path=None,
output=None, output=None,
@ -638,6 +639,105 @@ def _import(
output_file.close() output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity} ({mappings[0].hamster_activity.name})", fg="red",
)
has_errors = True
continue
if f.imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description
if f.description != ""
else mapping.kimai_description,
tags=",".join([t.tag.name for t in f.tags]) if len(f.tags) > 0 else mapping.kimai_tags
)
r = t.upload().json()
if len(f.tags) > 0 or mapping.kimai_tags:
print(",".join([t.tag.name for t in f.tags]) if len(f.tags)> 0 else mapping.kimai_tags)
print(r["tags"])
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
@kimai.group("db") @kimai.group("db")
def db_(): def db_():
pass pass
@ -650,6 +750,7 @@ def init():
KimaiCustomer, KimaiCustomer,
KimaiProject, KimaiProject,
KimaiActivity, KimaiActivity,
KimaiTag,
HamsterActivityKimaiMapping, HamsterActivityKimaiMapping,
HamsterFactKimaiImport, HamsterFactKimaiImport,
] ]

View File

@ -1,643 +1,39 @@
from datetime import datetime from textual.app import App
from textual import on from .db import db
from textual.app import App, ComposeResult from .kimaiapi import KimaiAPI
from textual.binding import Binding
from textual.containers import Grid
from textual.events import DescendantBlur
from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import Screen, ModalScreen
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
from peewee import fn, JOIN
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .kimai import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
from .sync import sync
class ListScreen(Screen):
def compose(self) -> ComposeResult:
yield Header()
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
yield Footer()
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
def __init__(self, category, activity):
self.category = category
self.activity = activity
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.activity}@{self.category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(placeholder="Type to search...", id="customer"),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(placeholder="Type to search...", id="project"),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(placeholder="Type to search...", id="activity"),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description"),
),
Horizontal(
Label("Tags"),
Input(id="tags"),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move facts"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
def handle_mapping(mapping):
if mapping is None:
return
m = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping
)
m.save()
filter_search = self.query_one("#search")
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
),
handle_mapping,
)
class CategoryListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class KimaiProjectListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_query:
projects = projects.where(
KimaiProject.name.contains(filter_query)
| KimaiCustomer.name.contains(filter_query)
)
self.table.add_rows(
[
[
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count,
]
for project in projects
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
class HamsterToolsApp(App): class HamsterToolsApp(App):
CSS_PATH = "app.tcss" CSS_PATH = "app.tcss"
BINDINGS = [ BINDINGS = [
("a", "switch_mode('activities')", "Activities"), ("h", "switch_mode('hamster')", "Hamster"),
("c", "switch_mode('categories')", "Categories"),
("k", "switch_mode('kimai')", "Kimai"), ("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"), ("q", "quit", "Quit"),
] ]
def __init__(self): api_ = None
db.init("hamster-testing.db")
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
def __init__(self):
self.MODES = { self.MODES = {
"categories": CategoryListScreen(), "hamster": HamsterScreen(),
"activities": ActivityListScreen(), "kimai": KimaiScreen(),
"kimai": KimaiProjectListScreen(),
} }
super().__init__() super().__init__()
def on_mount(self) -> None: def on_mount(self) -> None:
self.switch_mode("activities") self.switch_mode("hamster")
def action_quit(self) -> None: def action_quit(self) -> None:
db.close() db.close()

View File

@ -18,11 +18,12 @@ DataTable:focus .datatable--cursor {
width: 50%; width: 50%;
} }
ActivityEditScreen, ActivityMappingScreen { ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen, TagEditScreen {
align: center middle; align: center middle;
} }
ActivityEditScreen > Vertical, ActivityEditScreen > Vertical,
TagEditScreen > Vertical,
ActivityMappingScreen > Vertical { ActivityMappingScreen > Vertical {
padding: 0 1; padding: 0 1;
width: 80; width: 80;
@ -31,7 +32,8 @@ ActivityMappingScreen > Vertical {
background: $surface; background: $surface;
} }
ActivityEditScreen > Vertical { ActivityEditScreen > Vertical,
TagEditScreen > Vertical {
height: 10; height: 10;
} }
@ -55,10 +57,32 @@ ActivityMappingScreen AutoComplete {
width: 80; width: 80;
} }
#description, #tags { #description, #activity_tags {
width: 30; width: 30;
} }
ActivityEditScreen Input { ActivityEditScreen Input {
width: 60; width: 60;
} }
#dialog {
grid-size: 2;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 0 1;
width: 60;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question {
column-span: 2;
height: 1fr;
width: 1fr;
content-align: center middle;
}
Button {
width: 100%;
}

View File

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from peewee import ( from peewee import (
SqliteDatabase, SqliteDatabase,
Model, Model,
@ -7,6 +8,7 @@ from peewee import (
DateTimeField, DateTimeField,
SmallIntegerField, SmallIntegerField,
BooleanField, BooleanField,
CompositeKey
) )
@ -41,7 +43,27 @@ class HamsterFact(Model):
table_name = "facts" table_name = "facts"
class HamsterTag(Model):
name = CharField()
class Meta:
database = db
table_name = "tags"
class HamsterFactTag(Model):
fact = ForeignKeyField(HamsterFact, backref="tags")
tag = ForeignKeyField(HamsterTag, backref="facts")
class Meta:
database = db
table_name = "fact_tags"
primary_key = CompositeKey('fact', 'tag')
class KimaiCustomer(Model): class KimaiCustomer(Model):
visible = BooleanField(default=True)
name = CharField() name = CharField()
class Meta: class Meta:
@ -52,6 +74,7 @@ class KimaiCustomer(Model):
class KimaiProject(Model): class KimaiProject(Model):
name = CharField() name = CharField()
customer = ForeignKeyField(KimaiCustomer, backref="projects") customer = ForeignKeyField(KimaiCustomer, backref="projects")
visible = BooleanField(default=True)
allow_global_activities = BooleanField(default=True) allow_global_activities = BooleanField(default=True)
class Meta: class Meta:
@ -62,12 +85,22 @@ class KimaiProject(Model):
class KimaiActivity(Model): class KimaiActivity(Model):
name = CharField() name = CharField()
project = ForeignKeyField(KimaiProject, backref="activities", null=True) project = ForeignKeyField(KimaiProject, backref="activities", null=True)
visible = BooleanField(default=True)
class Meta: class Meta:
database = db database = db
table_name = "kimai_activities" table_name = "kimai_activities"
class KimaiTag(Model):
name = CharField()
visible = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_tags"
class HamsterActivityKimaiMapping(Model): class HamsterActivityKimaiMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings") hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings") kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")

View File

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
import pdb
import requests import requests
import requests_cache import requests_cache
import os import os
@ -12,8 +13,8 @@ class NotFound(Exception):
class KimaiAPI(object): class KimaiAPI(object):
# temporary hardcoded config # temporary hardcoded config
# KIMAI_API_URL = "https://kimai.autonomic.zone/api" KIMAI_API_URL = "https://kimai.autonomic.zone/api"
KIMAI_API_URL = "https://kimaitest.autonomic.zone/api" # KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_USERNAME = "3wordchant" KIMAI_USERNAME = "3wordchant"
KIMAI_API_KEY = os.environ["KIMAI_API_KEY"] KIMAI_API_KEY = os.environ["KIMAI_API_KEY"]
@ -21,16 +22,23 @@ class KimaiAPI(object):
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY} auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self): def __init__(self):
requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800) # requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3}) self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3}) self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3}) self.activities_json = self.get("activities", {"visible": 3})
self.tags_json = self.get("tags/find?name=", {"visible": 3})
self.user_json = self.get("users/me") self.user_json = self.get("users/me")
def get(self, endpoint, params=None): def get(self, endpoint, params=None):
return requests.get( result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
).json() ).json()
try:
if result["code"] != 200:
raise NotFound()
except (KeyError, TypeError):
pass
return result
def post(self, endpoint, data): def post(self, endpoint, data):
return requests.post( return requests.post(
@ -44,24 +52,26 @@ class BaseAPI(object):
setattr(self, key, value) setattr(self, key, value)
@dataclass
class Customer(BaseAPI): class Customer(BaseAPI):
def __init__(self, api, id, name): api: KimaiAPI = field(repr=False)
super().__init__(api, id=id, name=name) id: int
name: str
visible: bool = field(default=True)
@staticmethod @staticmethod
def list(api): def list(api):
return [Customer(api, c["id"], c["name"]) for c in api.customers_json] return [
Customer(api, c["id"], c["name"], c["visible"]) for c in api.customers_json
]
@staticmethod @staticmethod
def get_by_id(api, id): def get_by_id(api, id):
for value in api.customers_json: for c in api.customers_json:
if value["id"] == id: if c["id"] == id:
return Customer(api, value["id"], value["name"]) return Customer(api, c["id"], c["name"], c["visible"])
raise NotFound() raise NotFound()
def __repr__(self):
return f"Customer (id={self.id}, name={self.name})"
@dataclass @dataclass
class Project(BaseAPI): class Project(BaseAPI):
@ -70,6 +80,7 @@ class Project(BaseAPI):
name: str name: str
customer: Customer customer: Customer
allow_global_activities: bool = field(default=True) allow_global_activities: bool = field(default=True)
visible: bool = field(default=True)
@staticmethod @staticmethod
def list(api): def list(api):
@ -80,20 +91,22 @@ class Project(BaseAPI):
p["name"], p["name"],
Customer.get_by_id(api, p["customer"]), Customer.get_by_id(api, p["customer"]),
p["globalActivities"], p["globalActivities"],
p["visible"],
) )
for p in api.projects_json for p in api.projects_json
] ]
@staticmethod @staticmethod
def get_by_id(api, id, none=False): def get_by_id(api, id, none=False):
for value in api.projects_json: for p in api.projects_json:
if value["id"] == id: if p["id"] == id:
return Project( return Project(
api, api,
value["id"], p["id"],
value["name"], p["name"],
Customer.get_by_id(api, value["customer"]), Customer.get_by_id(api, p["customer"]),
value["globalActivities"], p["globalActivities"],
p["visible"],
) )
if not none: if not none:
raise NotFound() raise NotFound()
@ -105,25 +118,63 @@ class Activity(BaseAPI):
id: int id: int
name: str name: str
project: Project project: Project
visible: bool = field(default=True)
@staticmethod @staticmethod
def list(api): def list(api):
return [ return [
Activity( Activity(
api, a["id"], a["name"], Project.get_by_id(api, a["project"], none=True) api,
a["id"],
a["name"],
Project.get_by_id(api, a["project"], none=True),
a["visible"],
) )
for a in api.activities_json for a in api.activities_json
] ]
@staticmethod @staticmethod
def get_by_id(api, id, none=False): def get_by_id(api, id, none=False):
for value in api.activities_json: for a in api.activities_json:
if value["id"] == id: if a["id"] == id:
return Activity( return Activity(
api, api,
value["id"], a["id"],
value["name"], a["name"],
Project.get_by_id(api, value["project"]), Project.get_by_id(api, a["project"], none),
a["visible"],
)
if not none:
raise NotFound()
@dataclass
class Tag(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Tag(
api,
t["id"],
t["name"],
t["visible"],
)
for t in api.tags_json
]
@staticmethod
def get_by_id(api, id, none=False):
for t in api.tags_json:
if t["id"] == id:
return Tag(
api,
t["id"],
t["name"],
t["visible"],
) )
if not none: if not none:
raise NotFound() raise NotFound()
@ -158,6 +209,23 @@ class Timesheet(BaseAPI):
) )
] ]
@staticmethod
def list_by(api, **kwargs):
kwargs["size"] = 10000
return [
Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
for t in api.get("timesheets", params=kwargs)
]
@staticmethod @staticmethod
def get_by_id(api, id, none=False): def get_by_id(api, id, none=False):
t = api.get( t = api.get(

View File

View File

@ -0,0 +1,42 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import (
Header,
Footer,
TabbedContent,
TabPane,
)
from .activities import ActivityList
from .categories import CategoryList
from .tags import TagList
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
("t", "show_tab('tags')", "Tags"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
with TabPane("Tags", id="tags"):
yield TagList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,494 @@
from datetime import datetime
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Grid, Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.events import DescendantBlur
from textual.screen import ModalScreen
from textual.widgets import Button, Checkbox, DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from hamstertools.db import HamsterActivity, HamsterActivityKimaiMapping, HamsterCategory, HamsterFact, KimaiActivity, KimaiCustomer, KimaiProject
from hamstertools.screens.list import ListPane
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
customer = ""
project = ""
activity = ""
description = ""
tags = ""
def __init__(self, category, activity, mapping=None):
self.hamster_category = category
self.hamster_activity = activity
if mapping is not None:
self.customer_id = mapping.kimai_customer_id
self.customer = mapping.kimai_customer.name
self.project_id = mapping.kimai_project_id
self.project = mapping.kimai_project.name
self.activity_id = mapping.kimai_activity_id
self.activity = mapping.kimai_activity.name
self.description = mapping.kimai_description
self.tags = mapping.kimai_tags
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(
placeholder="Type to search...",
id="customer",
value=self.customer,
),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(
placeholder="Type to search...",
id="project",
value=self.project,
),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(
placeholder="Type to search...",
id="activity",
value=self.activity,
),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description", value=self.description),
),
Horizontal(
Label("Tags"),
Input(id="activity_tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#activity_tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityDeleteConfirmScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
]
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to delete this activity?", id="question"),
Button("Confirm", variant="error", id="confirm"),
Button("Cancel", variant="primary", id="cancel"),
id="dialog",
)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "quit":
self.dismiss(True)
else:
self.dismiss(False)
def action_cancel(self):
self.dismiss(False)
class ActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
def check_delete(delete: bool) -> None:
"""Called when QuitScreen is dismissed."""
if delete:
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
if activity.facts.count() > 0:
self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete)
else:
check_delete(True)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
mapping = None
try:
mapping = HamsterActivityKimaiMapping.get(
hamster_activity=selected_activity
)
except HamsterActivityKimaiMapping.DoesNotExist:
pass
def handle_mapping(mapping_data):
if mapping_data is None:
return
if mapping is not None:
mapping_ = mapping
for key, value in mapping_data.items():
setattr(mapping_, key, value)
else:
mapping_ = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping_data
)
mapping_.save()
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
mapping=mapping,
),
handle_mapping,
)

View File

@ -0,0 +1,78 @@
from datetime import datetime
from peewee import JOIN, fn
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.widgets import DataTable
from hamstertools.db import HamsterActivity, HamsterCategory, HamsterFact
from hamstertools.screens.list import ListPane
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)

View File

@ -0,0 +1,165 @@
from peewee import JOIN, fn
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.events import DescendantBlur
from textual.screen import ModalScreen
from textual.widgets import DataTable, Input, Label
from textual_autocomplete import AutoComplete, Dropdown
from hamstertools.db import HamsterFactTag, HamsterTag
from hamstertools.screens.list import ListPane
class TagEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
def __init__(self, tag):
self.tag_name = tag.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Tag:"), Input(value=self.tag_name, id="tag")
),
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"tag": self.query_one("#tag").value,
}
)
class TagList(ListPane):
BINDINGS = [
# ("s", "sort", "Sort"),
# ("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
Binding(key="escape", action="cancelfilter", show=False),
]
move_from_tag = None
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFactTag.select(
HamsterFactTag.tag_id, fn.COUNT(HamsterFactTag.tag_id).alias("facts_count")
)
.group_by(HamsterFactTag.tag_id)
.alias("facts_count_query")
)
tags = (
HamsterTag.select(
HamsterTag,
HamsterTag.name,
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
)
.join(HamsterFactTag, JOIN.LEFT_OUTER)
.switch(HamsterTag)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterTag.id == facts_count_query.c.tag_id),
)
.group_by(HamsterTag)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
tags = tags.where(
HamsterTag.name.contains(filter_search)
)
self.table.add_rows(
[
[
tag.id,
tag.name,
tag.facts_count,
]
for tag in tags
]
)
self.table.sort(*self.sort)
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
tag_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
tag = HamsterTag.get(id=tag_id)
tag.delete_instance()
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_tag = HamsterTag.get(id=row_cells[0])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_tag", None) is not None:
move_to_tag = HamsterTag.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 0))
)
HamsterFactTag.update({HamsterFactTag.tag: move_to_tag}).where(
HamsterFactTag.tag == self.move_from_tag
).execute()
self._refresh()
del self.move_from_tag
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
tag = HamsterTag.get(id=row_cells[0])
def handle_edit(properties):
if properties is None:
return
tag.name = properties["tag"]
tag.save()
self._refresh()
self.app.push_screen(
TagEditScreen(tag=tag), handle_edit
)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"tag id", "tag", "facts"
)
self.sort = (self.columns[1],)
self._refresh()

View File

@ -0,0 +1,193 @@
from textual.app import ComposeResult
from textual.coordinate import Coordinate
from textual.binding import Binding
from textual.screen import Screen
from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN
from ...utils import truncate
from ...sync import sync
from ...db import (
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from ...kimaiapi import Timesheet as KimaiAPITimesheet
from ..list import ListPane
class KimaiCustomerList(ListPane):
pass
class KimaiProjectList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer.id,
fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"),
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_search:
projects = projects.where(
KimaiProject.name.contains(filter_search)
| KimaiCustomer.name.contains(filter_search)
)
self.table.add_rows(
[
[
str(project.customer_id) if project.customer_id is not None else "",
project.customer_name,
project.id,
project.name,
project.activities_count,
project.visible,
]
for project in projects
]
)
self.table.sort(*self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities", "visible"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
class KimaiActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("#", "count", "Count"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
activities = (
KimaiActivity.select(
KimaiActivity,
fn.COALESCE(KimaiProject.name, "").alias("project_name"),
fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"),
)
.join(KimaiProject, JOIN.LEFT_OUTER)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.group_by(KimaiActivity)
)
if filter_search:
activities = activities.where(KimaiActivity.name.contains(filter_search))
self.table.add_rows(
[
[
# activity.project.customer_id if activity.project is not None else '',
# activity.customer_name,
str(activity.project_id) if activity.project_id is not None else "",
truncate(activity.project_name, 40),
activity.id,
truncate(activity.name, 40),
activity.visible,
"?",
]
for activity in activities
]
)
self.table.sort(*self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
# "customer id",
# "customer",
"project id",
"project",
"id",
"name",
"visible",
"times",
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_count(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
activity_id = row_cells[2]
count = len(KimaiAPITimesheet.list_by(self.app.api, activity=activity_id))
self.table.update_cell_at(Coordinate(row_idx, 5), count)
class KimaiScreen(Screen):
BINDINGS = [
("c", "show_tab('customers')", "Customers"),
("p", "show_tab('projects')", "Projects"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Kimai"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="projects"):
with TabPane("Customers", id="customers"):
yield KimaiCustomerList()
with TabPane("Projects", id="projects"):
yield KimaiProjectList()
with TabPane("Activities", id="activities"):
yield KimaiActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#projects DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,52 @@
from datetime import datetime
from textual import on
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical, Container
from textual.widgets import DataTable, Input
class ListPane(Container):
def compose(self) -> ComposeResult:
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()

View File

@ -1,18 +1,16 @@
from .kimai import ( from .kimaiapi import (
KimaiAPI, KimaiAPI,
Customer as KimaiAPICustomer, Customer as KimaiAPICustomer,
Project as KimaiAPIProject, Project as KimaiAPIProject,
Activity as KimaiAPIActivity, Activity as KimaiAPIActivity,
Tag as KimaiAPITag,
) )
from .db import ( from .db import (
KimaiTag,
db, db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject, KimaiProject,
KimaiCustomer, KimaiCustomer,
KimaiActivity, KimaiActivity,
HamsterActivityKimaiMapping,
) )
@ -22,11 +20,19 @@ def sync() -> None:
KimaiCustomer.delete().execute() KimaiCustomer.delete().execute()
KimaiProject.delete().execute() KimaiProject.delete().execute()
KimaiActivity.delete().execute() KimaiActivity.delete().execute()
KimaiTag.delete().execute()
customers = KimaiAPICustomer.list(api) customers = KimaiAPICustomer.list(api)
with db.atomic(): with db.atomic():
KimaiCustomer.insert_many( KimaiCustomer.insert_many(
[{"id": customer.id, "name": customer.name} for customer in customers] [
{
"id": customer.id,
"name": customer.name,
"visible": customer.visible,
}
for customer in customers
]
).execute() ).execute()
projects = KimaiAPIProject.list(api) projects = KimaiAPIProject.list(api)
@ -38,6 +44,7 @@ def sync() -> None:
"name": project.name, "name": project.name,
"customer_id": project.customer.id, "customer_id": project.customer.id,
"allow_global_activities": project.allow_global_activities, "allow_global_activities": project.allow_global_activities,
"visible": project.visible,
} }
for project in projects for project in projects
] ]
@ -50,8 +57,22 @@ def sync() -> None:
{ {
"id": activity.id, "id": activity.id,
"name": activity.name, "name": activity.name,
"project_id": (activity.project and activity.project.id or None), "project_id": (activity.project.id if activity.project else None),
"visible": activity.visible,
} }
for activity in activities for activity in activities
] ]
).execute() ).execute()
tags = KimaiAPITag.list(api)
with db.atomic():
KimaiTag.insert_many(
[
{
"id": tag.id,
"name": tag.name,
"visible": tag.visible,
}
for tag in tags
]
).execute()

2
hamstertools/utils.py Normal file
View File

@ -0,0 +1,2 @@
def truncate(string: str, length: int) -> str:
return string[: length - 2] + ".." if len(string) > 52 else string

View File

@ -1,2 +1,6 @@
click==8.0.3
requests==2.26.0 requests==2.26.0
peewee==3.17.0
requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0
textual-dev==1.2.1

View File

@ -1,107 +0,0 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from datetime import datetime
from peewee import JOIN
from hamstertools.db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
HamsterFactKimaiImport,
)
from hamstertools.kimai import KimaiAPI, Timesheet, Project, Activity
api = KimaiAPI()
DATE_FROM = "2023-10-01"
DATE_TO = "2023-11-01"
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(DATE_FROM, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(DATE_TO, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
print(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity}"
)
has_errors = True
continue
if f.imports.count() > 0:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)"
)
has_errors = True
continue
# if has_errors:
# sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description if f.description != "" else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
)
r = t.upload().json()
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')