hamster-tools/hamstertools/app.py

574 lines
18 KiB
Python

from textual import on
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Grid
from textual.events import DescendantBlur
from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.reactive import reactive
from textual.screen import Screen, ModalScreen
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from peewee import fn, JOIN
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterKimaiMapping,
)
from .kimai import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
class ListScreen(Screen):
def compose(self) -> ComposeResult:
"""create child widgets for the app."""
yield Header()
with Vertical():
yield DataTable()
with Horizontal():
yield Input(id="filter")
yield Footer()
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = True
self._refresh(filter_input.value)
filter_input.focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = False
filter_input.clear()
self.table.focus()
self._refresh()
def on_input_changed(self, event):
self._refresh(event.value)
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel")
]
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to quit?", id="question"),
Button("Quit", variant="error", id="quit"),
Button("Cancel", variant="primary", id="cancel"),
id="dialog",
)
def action_cancel(self):
self.dismiss(None)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel")
]
customer_id = None
project_id = None
activity_id = None
def __init__(self, category, activity):
self.category = category
self.activity = activity
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [
DropdownItem(c.name, str(c.id))
for c in KimaiCustomer.select()
]
return ActivityMappingScreen._filter_dropdowns(customers,
input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects,
input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one('#global').value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(
KimaiActivity.project_id == self.project_id
)
return ActivityMappingScreen._filter_dropdowns([
DropdownItem(a.name, str(a.id))
for a in activities], input_state.value)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.activity}@{self.category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(placeholder="Type to search...", id="customer"),
Dropdown(items=self._get_customers),
)
),
Horizontal(
Label("Project"),
AutoComplete(
Input(placeholder="Type to search...", id='project'),
Dropdown(items=self._get_projects),
)
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(placeholder="Type to search...", id='activity'),
Dropdown(items=self._get_activities),
)
),
Horizontal(
Label("Description"),
Input(id='description'),
),
Horizontal(
Label("Tags"),
Input(id='tags'),
),
Horizontal(Checkbox("Global", id='global')),
)
@on(Input.Submitted, '#customer')
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one('#project').focus()
@on(DescendantBlur, '#customer')
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, '#project')
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one('#activity').focus()
@on(DescendantBlur, '#project')
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, '#activity')
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one('#activity').focus()
@on(DescendantBlur, '#activity')
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(event.control.parent.dropdown.selected_item.left_meta)
def action_global(self):
self.query_one('#global').value = not self.query_one('#global').value
def action_save(self):
self.dismiss({
'kimai_customer_id': self.customer_id,
'kimai_project_id': self.project_id,
'kimai_activity_id': self.activity_id,
'kimai_description': self.query_one('#description').value,
'kimai_tags': self.query_one('#tags').value,
'global': self.query_one('#global').value,
})
def action_cancel(self):
self.dismiss(None)
class ActivityListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move facts"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterKimaiMapping.select(
HamsterKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
if filter_query:
activities = activities.where(
HamsterActivity.name.contains(filter_query)
| HamsterCategory.name.contains(filter_query)
)
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
filter_input = self.query_one("#filter")
self._refresh(filter_input.value)
del self.move_from_activity
def action_edit(self):
def handle_edit(properties):
print(properties)
self.app.push_screen(ActivityEditScreen(), handle_edit)
def action_mapping(self):
selected_activity = HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
).join(HamsterCategory, JOIN.LEFT_OUTER).where(
HamsterActivity.id == self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
).get()
def handle_mapping(mapping):
if mapping is None:
return
m = HamsterKimaiMapping.create(hamster_activity=selected_activity, **mapping)
m.save()
filter_input = self.query_one("#filter")
self._refresh(filter_input.value)
self.app.push_screen(ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name
), handle_mapping)
class CategoryListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count")
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
if filter_query:
categories = categories.where(HamsterCategory.name.contains(filter_query))
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
class KimaiProjectListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_query:
projects = projects.where(
KimaiProject.name.contains(filter_query)
| KimaiCustomer.name.contains(filter_query)
)
self.table.add_rows(
[
[
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count,
]
for project in projects
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
api = KimaiAPI()
KimaiCustomer.delete().execute()
KimaiProject.delete().execute()
KimaiActivity.delete().execute()
customers = KimaiAPICustomer.list(api)
with db.atomic():
KimaiCustomer.insert_many(
[{"id": customer.id, "name": customer.name} for customer in customers]
).execute()
projects = KimaiAPIProject.list(api)
with db.atomic():
KimaiProject.insert_many(
[
{
"id": project.id,
"name": project.name,
"customer_id": project.customer.id,
}
for project in projects
]
).execute()
activities = KimaiAPIActivity.list(api)
with db.atomic():
KimaiActivity.insert_many(
[
{
"id": activity.id,
"name": activity.name,
"project_id": (
activity.project and activity.project.id or None
),
}
for activity in activities
]
).execute()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
class HamsterToolsApp(App):
CSS_PATH = "app.tcss"
BINDINGS = [
("a", "switch_mode('activities')", "Activities"),
("c", "switch_mode('categories')", "Categories"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
def __init__(self):
db.init("hamster-testing.db")
self.MODES = {
"categories": CategoryListScreen(),
"activities": ActivityListScreen(),
"kimai": KimaiProjectListScreen(),
}
super().__init__()
def on_mount(self) -> None:
self.switch_mode("activities")
def action_quit(self) -> None:
self.exit()
db.close()