from textual.app import App, ComposeResult from textual.binding import Binding from textual.widgets import Header, Footer, DataTable, Input from textual.containers import Horizontal, Vertical from textual.coordinate import Coordinate from textual.screen import Screen from peewee import fn, JOIN from .db import ( db, HamsterCategory, HamsterActivity, HamsterFact, KimaiProject, KimaiCustomer, KimaiActivity, ) from .kimai import ( KimaiAPI, Customer as KimaiAPICustomer, Project as KimaiAPIProject, Activity as KimaiAPIActivity, ) class ListScreen(Screen): def compose(self) -> ComposeResult: """create child widgets for the app.""" yield Header() with Vertical(): yield DataTable() with Horizontal(): yield Input(id="filter") yield Footer() def action_refresh(self) -> None: self._refresh() def action_sort(self) -> None: self.table.cursor_type = "column" def on_data_table_column_selected(self, event): self.sort = (event.column_key,) event.data_table.sort(*self.sort) event.data_table.cursor_type = "row" def action_filter(self) -> None: filter_input = self.query_one("#filter") filter_input.display = True self._refresh(filter_input.value) filter_input.focus() def on_input_submitted(self, event): self.table.focus() def action_cancelfilter(self) -> None: filter_input = self.query_one("#filter") filter_input.display = False filter_input.clear() self.table.focus() self._refresh() def on_input_changed(self, event): self._refresh(event.value) class ActivitiesScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete activity"), ("f", "move_facts", "Move facts"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self, filter_query=None): self.table.clear() activities = ( HamsterActivity.select( HamsterActivity, HamsterCategory, fn.Count(HamsterFact.id).alias("facts_count") ) .join(HamsterCategory, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join(HamsterFact, JOIN.LEFT_OUTER) .group_by(HamsterActivity) ) if filter_query: activities = activities.where( HamsterActivity.name.contains(filter_query) | HamsterCategory.name.contains(filter_query) ) self.table.add_rows( [ [ activity.category_id, (activity.category.name if (activity.category_id != -1) else ""), activity.id, activity.name, activity.facts_count, ] for activity in activities ] ) self.table.sort(*self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns( "category id", "category", "activity id", "activity", "entries" ) self.sort = (self.columns[1], self.columns[3]) self._refresh() def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) activity_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 2), ) activity = HamsterActivity.get(id=activity_id) activity.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) def action_move_facts(self) -> None: row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) self.move_from_activity = HamsterActivity.get(id=row_cells[2]) for col_idx, cell_value in enumerate(row_cells): cell_coordinate = Coordinate(row_idx, col_idx) self.table.update_cell_at( cell_coordinate, f"[red]{cell_value}[/red]", ) self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1) def on_data_table_row_selected(self, event): if getattr(self, "move_from_activity", None) is not None: move_to_activity = HamsterActivity.get( self.table.get_cell_at(Coordinate(event.cursor_row, 2)) ) HamsterFact.update({HamsterFact.activity: move_to_activity}).where(HamsterFact.activity == self.move_from_activity).execute() filter_input = self.query_one("#filter") self._refresh(filter_input.value) del self.move_from_activity class CategoriesScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete category"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self, filter_query=None): self.table.clear() categories = ( HamsterCategory.select( HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count") ) .join(HamsterActivity, JOIN.LEFT_OUTER) .group_by(HamsterCategory) ) if filter_query: categories = categories.where( HamsterCategory.name.contains(filter_query) ) self.table.add_rows( [ [ category.id, category.name, category.activities_count, ] for category in categories ] ) self.table.sort(self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns("category id", "category", "activities") self.sort = self.columns[1] self._refresh() def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) category_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 0), ) category = HamsterCategory.get(id=category_id) category.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) class KimaiScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("g", "get", "Get data"), ("/", "filter", "Search"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self, filter_query=None): self.table.clear() projects = ( KimaiProject.select( KimaiProject, KimaiCustomer, fn.Count(KimaiActivity.id).alias("activities_count") ) .join(KimaiCustomer, JOIN.LEFT_OUTER) .switch(KimaiProject) .join(KimaiActivity, JOIN.LEFT_OUTER) .where(KimaiActivity.project.is_null(False)) .group_by(KimaiProject) ) if filter_query: projects = projects.where( KimaiProject.name.contains(filter_query) | KimaiCustomer.name.contains(filter_query) ) self.table.add_rows( [ [ project.customer.id, project.customer.name, project.id, project.name, project.activities_count ] for project in projects ] ) self.table.sort(self.sort) def action_get(self) -> None: api = KimaiAPI() KimaiCustomer.delete().execute() KimaiProject.delete().execute() KimaiActivity.delete().execute() customers = KimaiAPICustomer.list(api) with db.atomic(): KimaiCustomer.insert_many([{ 'id': customer.id, 'name': customer.name } for customer in customers]).execute() projects = KimaiAPIProject.list(api) with db.atomic(): KimaiProject.insert_many([{ 'id': project.id, 'name': project.name, 'customer_id': project.customer.id } for project in projects]).execute() activities = KimaiAPIActivity.list(api) with db.atomic(): KimaiActivity.insert_many([{ 'id': activity.id, 'name': activity.name, 'project_id': (activity.project and activity.project.id or None) } for activity in activities]).execute() self._refresh() def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns( "customer id", "customer", "project id", "project", "activities" ) # self.sort = (self.columns[1], self.columns[3]) self.sort = self.columns[1] self._refresh() class HamsterToolsApp(App): CSS_PATH = "app.tcss" BINDINGS = [ ("a", "switch_mode('activities')", "Activities"), ("c", "switch_mode('categories')", "Categories"), ("k", "switch_mode('kimai')", "Kimai"), ("q", "quit", "Quit"), ] def __init__(self): db.init("hamster-testing.db") self.MODES = { "categories": CategoriesScreen(), "activities": ActivitiesScreen(), "kimai": KimaiScreen(), } super().__init__() def on_mount(self) -> None: self.switch_mode("activities") def action_quit(self) -> None: self.exit() db.close()