Merge branch 'tui'

This commit is contained in:
3wc 2024-10-04 13:14:38 -04:00
commit 48905953ca
13 changed files with 1954 additions and 454 deletions

File diff suppressed because it is too large Load Diff

40
hamstertools/app.py Normal file
View File

@ -0,0 +1,40 @@
from textual.app import App
from .db import db
from .kimaiapi import KimaiAPI
from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
class HamsterToolsApp(App):
CSS_PATH = "app.tcss"
BINDINGS = [
("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
api_ = None
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
def __init__(self):
self.MODES = {
"hamster": HamsterScreen(),
"kimai": KimaiScreen(),
}
super().__init__()
def on_mount(self) -> None:
self.switch_mode("hamster")
def action_quit(self) -> None:
db.close()
self.exit()

86
hamstertools/app.tcss Normal file
View File

@ -0,0 +1,86 @@
DataTable {
height: 90%;
}
DataTable .datatable--cursor {
background: grey;
}
DataTable:focus .datatable--cursor {
background: orange;
}
#filter {
display: none;
}
#filter Input {
width: 50%;
}
ActivityEditScreen, ActivityMappingScreen, ActivityDeleteConfirmScreen {
align: center middle;
}
ActivityEditScreen > Vertical,
ActivityMappingScreen > Vertical {
padding: 0 1;
width: 80;
height: 30;
border: thick $background 80%;
background: $surface;
}
ActivityEditScreen > Vertical {
height: 10;
}
ActivityMappingScreen Horizontal {
align: left middle;
width: auto;
}
ActivityEditScreen Horizontal {
width: 80;
}
ActivityEditScreen Label,
ActivityMappingScreen Label {
padding: 0 1;
width: auto;
border: blank;
}
ActivityMappingScreen AutoComplete {
width: 80;
}
#description, #tags {
width: 30;
}
ActivityEditScreen Input {
width: 60;
}
#dialog {
grid-size: 2;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 0 1;
width: 60;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question {
column-span: 2;
height: 1fr;
width: 1fr;
content-align: center middle;
}
Button {
width: 100%;
}

95
hamstertools/db.py Normal file
View File

@ -0,0 +1,95 @@
from datetime import datetime
from peewee import (
SqliteDatabase,
Model,
CharField,
ForeignKeyField,
DateTimeField,
SmallIntegerField,
BooleanField,
)
db = SqliteDatabase(None)
class HamsterCategory(Model):
name = CharField()
class Meta:
database = db
table_name = "categories"
class HamsterActivity(Model):
name = CharField()
category = ForeignKeyField(HamsterCategory, backref="activities")
class Meta:
database = db
table_name = "activities"
class HamsterFact(Model):
activity = ForeignKeyField(HamsterActivity, backref="facts")
start_time = DateTimeField()
end_time = DateTimeField(null=True)
description = CharField()
class Meta:
database = db
table_name = "facts"
class KimaiCustomer(Model):
visible = BooleanField(default=True)
name = CharField()
class Meta:
database = db
table_name = "kimai_customers"
class KimaiProject(Model):
name = CharField()
customer = ForeignKeyField(KimaiCustomer, backref="projects")
visible = BooleanField(default=True)
allow_global_activities = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_projects"
class KimaiActivity(Model):
name = CharField()
project = ForeignKeyField(KimaiProject, backref="activities", null=True)
visible = BooleanField(default=True)
class Meta:
database = db
table_name = "kimai_activities"
class HamsterActivityKimaiMapping(Model):
hamster_activity = ForeignKeyField(HamsterActivity, backref="mappings")
kimai_customer = ForeignKeyField(KimaiCustomer, backref="mappings")
kimai_project = ForeignKeyField(KimaiProject, backref="mappings")
kimai_activity = ForeignKeyField(KimaiActivity, backref="mappings")
kimai_description = CharField()
kimai_tags = CharField()
class Meta:
database = db
table_name = "hamster_kimai_mappings"
class HamsterFactKimaiImport(Model):
hamster_fact = ForeignKeyField(HamsterFact, backref="imports")
kimai_id = SmallIntegerField()
imported = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "hamster_fact_kimai_imports"

224
hamstertools/kimaiapi.py Normal file
View File

@ -0,0 +1,224 @@
from datetime import datetime
import requests
import requests_cache
import os
from dataclasses import dataclass, field
class NotFound(Exception):
pass
class KimaiAPI(object):
# temporary hardcoded config
KIMAI_API_URL = "https://kimai.autonomic.zone/api"
# KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_USERNAME = "3wordchant"
KIMAI_API_KEY = os.environ["KIMAI_API_KEY"]
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self):
# requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3})
self.user_json = self.get("users/me")
def get(self, endpoint, params=None):
result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
).json()
try:
if result["code"] != 200:
raise NotFound()
except (KeyError, TypeError):
pass
return result
def post(self, endpoint, data):
return requests.post(
f"{self.KIMAI_API_URL}/{endpoint}", json=data, headers=self.auth_headers
)
class BaseAPI(object):
def __init__(self, api, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
@dataclass
class Customer(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Customer(api, c["id"], c["name"], c["visible"]) for c in api.customers_json
]
@staticmethod
def get_by_id(api, id):
for c in api.customers_json:
if c["id"] == id:
return Customer(api, c["id"], c["name"], c["visible"])
raise NotFound()
@dataclass
class Project(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
customer: Customer
allow_global_activities: bool = field(default=True)
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Project(
api,
p["id"],
p["name"],
Customer.get_by_id(api, p["customer"]),
p["globalActivities"],
p["visible"],
)
for p in api.projects_json
]
@staticmethod
def get_by_id(api, id, none=False):
for p in api.projects_json:
if p["id"] == id:
return Project(
api,
p["id"],
p["name"],
Customer.get_by_id(api, p["customer"]),
p["globalActivities"],
p["visible"],
)
if not none:
raise NotFound()
@dataclass
class Activity(BaseAPI):
api: KimaiAPI = field(repr=False)
id: int
name: str
project: Project
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Activity(
api,
a["id"],
a["name"],
Project.get_by_id(api, a["project"], none=True),
a["visible"],
)
for a in api.activities_json
]
@staticmethod
def get_by_id(api, id, none=False):
for a in api.activities_json:
if a["id"] == id:
return Activity(
api,
a["id"],
a["name"],
Project.get_by_id(api, a["project"], none),
a["visible"],
)
if not none:
raise NotFound()
@dataclass
class Timesheet(BaseAPI):
api: KimaiAPI = field(repr=False)
activity: Activity
project: Project
begin: datetime
end: datetime
id: int = field(default=None)
description: str = field(default="")
tags: str = field(default="")
@staticmethod
def list(api):
return [
Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
for t in api.get(
"timesheets",
)
]
@staticmethod
def list_by(api, **kwargs):
kwargs["size"] = 10000
return [
Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
for t in api.get("timesheets", params=kwargs)
]
@staticmethod
def get_by_id(api, id, none=False):
t = api.get(
f"timesheets/{id}",
)
return Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
def upload(self):
return self.api.post(
"timesheets",
{
"begin": self.begin.isoformat(),
"end": self.end.isoformat(),
"project": self.project.id,
"activity": self.activity.id,
"description": self.description,
# FIXME: support multiple users
# "user": self.,
"tags": self.tags,
},
)

View File

View File

@ -0,0 +1,610 @@
from datetime import datetime
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.containers import Horizontal, Vertical, Grid
from textual.events import DescendantBlur
from textual.screen import Screen, ModalScreen
from textual.widgets import (
Header,
Footer,
DataTable,
Input,
Label,
Checkbox,
TabbedContent,
TabPane,
Button
)
from peewee import fn, JOIN
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from ..db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .list import ListPane
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
customer = ""
project = ""
activity = ""
description = ""
tags = ""
def __init__(self, category, activity, mapping=None):
self.hamster_category = category
self.hamster_activity = activity
if mapping is not None:
self.customer_id = mapping.kimai_customer_id
self.customer = mapping.kimai_customer.name
self.project_id = mapping.kimai_project_id
self.project = mapping.kimai_project.name
self.activity_id = mapping.kimai_activity_id
self.activity = mapping.kimai_activity.name
self.description = mapping.kimai_description
self.tags = mapping.kimai_tags
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(
placeholder="Type to search...",
id="customer",
value=self.customer,
),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(
placeholder="Type to search...",
id="project",
value=self.project,
),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(
placeholder="Type to search...",
id="activity",
value=self.activity,
),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description", value=self.description),
),
Horizontal(
Label("Tags"),
Input(id="tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityDeleteConfirmScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
]
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to delete this activity?", id="question"),
Button("Confirm", variant="error", id="confirm"),
Button("Cancel", variant="primary", id="cancel"),
id="dialog",
)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "quit":
self.dismiss(True)
else:
self.dismiss(False)
def action_cancel(self):
self.dismiss(False)
class ActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
def check_delete(delete: bool) -> None:
"""Called when QuitScreen is dismissed."""
if delete:
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
if activity.facts.count() > 0:
self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete)
else:
check_delete(True)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
mapping = None
try:
mapping = HamsterActivityKimaiMapping.get(
hamster_activity=selected_activity
)
except HamsterActivityKimaiMapping.DoesNotExist:
pass
def handle_mapping(mapping_data):
if mapping_data is None:
return
if mapping is not None:
mapping_ = mapping
for key, value in mapping_data.items():
setattr(mapping_, key, value)
else:
mapping_ = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping_data
)
mapping_.save()
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
mapping=mapping,
),
handle_mapping,
)
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,193 @@
from textual.app import ComposeResult
from textual.coordinate import Coordinate
from textual.binding import Binding
from textual.screen import Screen
from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN
from ..utils import truncate
from ..sync import sync
from ..db import (
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from ..kimaiapi import Timesheet as KimaiAPITimesheet
from .list import ListPane
class KimaiCustomerList(ListPane):
pass
class KimaiProjectList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer.id,
fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"),
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_search:
projects = projects.where(
KimaiProject.name.contains(filter_search)
| KimaiCustomer.name.contains(filter_search)
)
self.table.add_rows(
[
[
str(project.customer_id) if project.customer_id is not None else "",
project.customer_name,
project.id,
project.name,
project.activities_count,
project.visible,
]
for project in projects
]
)
self.table.sort(*self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities", "visible"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
class KimaiActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("#", "count", "Count"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
activities = (
KimaiActivity.select(
KimaiActivity,
fn.COALESCE(KimaiProject.name, "").alias("project_name"),
fn.COALESCE(KimaiCustomer.name, "").alias("customer_name"),
)
.join(KimaiProject, JOIN.LEFT_OUTER)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.group_by(KimaiActivity)
)
if filter_search:
activities = activities.where(KimaiActivity.name.contains(filter_search))
self.table.add_rows(
[
[
# activity.project.customer_id if activity.project is not None else '',
# activity.customer_name,
str(activity.project_id) if activity.project_id is not None else "",
truncate(activity.project_name, 40),
activity.id,
truncate(activity.name, 40),
activity.visible,
"?",
]
for activity in activities
]
)
self.table.sort(*self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
# "customer id",
# "customer",
"project id",
"project",
"id",
"name",
"visible",
"times",
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_count(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
activity_id = row_cells[2]
count = len(KimaiAPITimesheet.list_by(self.app.api, activity=activity_id))
self.table.update_cell_at(Coordinate(row_idx, 5), count)
class KimaiScreen(Screen):
BINDINGS = [
("c", "show_tab('customers')", "Customers"),
("p", "show_tab('projects')", "Projects"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Kimai"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="projects"):
with TabPane("Customers", id="customers"):
yield KimaiCustomerList()
with TabPane("Projects", id="projects"):
yield KimaiProjectList()
with TabPane("Activities", id="activities"):
yield KimaiActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#projects DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,52 @@
from datetime import datetime
from textual import on
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical, Container
from textual.widgets import DataTable, Input
class ListPane(Container):
def compose(self) -> ComposeResult:
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()

62
hamstertools/sync.py Normal file
View File

@ -0,0 +1,62 @@
from .kimaiapi import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
from .db import (
db,
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
def sync() -> None:
api = KimaiAPI()
KimaiCustomer.delete().execute()
KimaiProject.delete().execute()
KimaiActivity.delete().execute()
customers = KimaiAPICustomer.list(api)
with db.atomic():
KimaiCustomer.insert_many(
[
{
"id": customer.id,
"name": customer.name,
"visible": customer.visible,
}
for customer in customers
]
).execute()
projects = KimaiAPIProject.list(api)
with db.atomic():
KimaiProject.insert_many(
[
{
"id": project.id,
"name": project.name,
"customer_id": project.customer.id,
"allow_global_activities": project.allow_global_activities,
"visible": project.visible,
}
for project in projects
]
).execute()
activities = KimaiAPIActivity.list(api)
with db.atomic():
KimaiActivity.insert_many(
[
{
"id": activity.id,
"name": activity.name,
"project_id": (activity.project.id if activity.project else None),
"visible": activity.visible,
}
for activity in activities
]
).execute()

2
hamstertools/utils.py Normal file
View File

@ -0,0 +1,2 @@
def truncate(string: str, length: int) -> str:
return string[: length - 2] + ".." if len(string) > 52 else string

View File

@ -1,2 +1,6 @@
click==8.0.3
requests==2.26.0
peewee==3.17.0
requests-cache==1.1.1
textual==0.44.1
textual-autocomplete==2.1.0b0
textual-dev==1.2.1

25
scripts/apitest.py Normal file
View File

@ -0,0 +1,25 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from datetime import datetime, timedelta
from hamstertools.kimai import KimaiAPI, Timesheet, Project, Activity
api = KimaiAPI()
# print(Timesheet.list(api))
t = Timesheet(
api,
activity=Activity.get_by_id(api, 613),
project=Project.get_by_id(api, 233),
begin=datetime.now() - timedelta(minutes=10),
end=datetime.now(),
)
# r = t.upload()
# from pdb import set_trace; set_trace()
print(Timesheet.get_by_id(api, 30683))