from datetime import datetime from textual import on from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Grid from textual.events import DescendantBlur from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox from textual.containers import Horizontal, Vertical from textual.coordinate import Coordinate from textual.screen import Screen, ModalScreen from textual_autocomplete import AutoComplete, Dropdown, DropdownItem from peewee import fn, JOIN from .db import ( db, HamsterCategory, HamsterActivity, HamsterFact, KimaiProject, KimaiCustomer, KimaiActivity, HamsterActivityKimaiMapping, ) from .kimai import ( KimaiAPI, Customer as KimaiAPICustomer, Project as KimaiAPIProject, Activity as KimaiAPIActivity, ) class ListScreen(Screen): def compose(self) -> ComposeResult: yield Header() with Vertical(): yield DataTable() with Horizontal(id="filter"): yield Input( id="search", placeholder="Category/activity name contains text" ) yield Input( id="date", placeholder="After date, in {0} format".format( datetime.now().strftime("%Y-%m-%d") ), ) yield Footer() def action_refresh(self) -> None: self._refresh() def action_sort(self) -> None: self.table.cursor_type = "column" def on_data_table_column_selected(self, event): self.sort = (event.column_key,) event.data_table.sort(*self.sort) event.data_table.cursor_type = "row" def action_filter(self) -> None: self.query_one("#filter").display = True self._refresh() self.query_one("#filter #search").focus() def on_input_submitted(self, event): self.table.focus() def action_cancelfilter(self) -> None: self.query_one("#filter").display = False self.query_one("#filter #search").clear() self.query_one("#filter #date").clear() self.table.focus() self._refresh() @on(Input.Changed, "#filter Input") def filter(self, event): self._refresh() class ActivityEditScreen(ModalScreen): BINDINGS = [ ("escape", "cancel", "Cancel"), ("ctrl+s", "save", "Save"), ] category_id = None category_name = "" def _get_categories(self, input_state): categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()] return ActivityMappingScreen._filter_dropdowns(categories, input_state.value) def __init__(self, category, activity): if category is not None: self.category_id = category.id self.category_name = category.name self.activity_name = activity.name super().__init__() def compose(self) -> ComposeResult: yield Vertical( Horizontal( Label("Category:"), AutoComplete( Input( placeholder="Type to search...", id="category", value=self.category_name, ), Dropdown(items=self._get_categories), ), ), Horizontal( Label("Activity:"), Input(value=self.activity_name, id="activity") ), ) @on(Input.Submitted, "#category") def category_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.category_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#activity").focus() @on(DescendantBlur, "#category") def category_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.category_id = str( event.control.parent.dropdown.selected_item.left_meta ) def action_cancel(self): self.dismiss(None) def action_save(self): self.dismiss( { "category": self.category_id, "activity": self.query_one("#activity").value, } ) class ActivityMappingScreen(ModalScreen): BINDINGS = [ ("ctrl+g", "global", "Toggle global"), ("ctrl+s", "save", "Save"), ("escape", "cancel", "Cancel"), ] customer_id = None project_id = None activity_id = None def __init__(self, category, activity): self.category = category self.activity = activity super().__init__() @staticmethod def _filter_dropdowns(options, value): matches = [c for c in options if value.lower() in c.main.plain.lower()] return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower())) def _get_customers(self, input_state): customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()] return ActivityMappingScreen._filter_dropdowns(customers, input_state.value) def _get_projects(self, input_state): projects = [ DropdownItem(p.name, str(p.id)) for p in KimaiProject.select().where( KimaiProject.customer_id == self.customer_id ) ] return ActivityMappingScreen._filter_dropdowns(projects, input_state.value) def _get_activities(self, input_state): activities = KimaiActivity.select() if self.query_one("#global").value: activities = activities.where( KimaiActivity.project_id.is_null(), ) else: activities = activities.where(KimaiActivity.project_id == self.project_id) return ActivityMappingScreen._filter_dropdowns( [DropdownItem(a.name, str(a.id)) for a in activities], input_state.value ) def compose(self) -> ComposeResult: yield Vertical( Horizontal( Label(f"Mapping for {self.activity}@{self.category}"), ), Horizontal( Label("Customer"), AutoComplete( Input(placeholder="Type to search...", id="customer"), Dropdown(items=self._get_customers), ), ), Horizontal( Label("Project"), AutoComplete( Input(placeholder="Type to search...", id="project"), Dropdown(items=self._get_projects), ), ), Horizontal( Label("Activity"), AutoComplete( Input(placeholder="Type to search...", id="activity"), Dropdown(items=self._get_activities), ), ), Horizontal( Label("Description"), Input(id="description"), ), Horizontal( Label("Tags"), Input(id="tags"), ), Horizontal(Checkbox("Global", id="global")), ) @on(Input.Submitted, "#customer") def customer_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.customer_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#project").focus() @on(DescendantBlur, "#customer") def customer_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.customer_id = str( event.control.parent.dropdown.selected_item.left_meta ) @on(Input.Submitted, "#project") def project_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) self.query_one("#activity").focus() @on(DescendantBlur, "#project") def project_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) @on(Input.Submitted, "#activity") def activity_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.activity_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#activity").focus() @on(DescendantBlur, "#activity") def activity_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.activity_id = str( event.control.parent.dropdown.selected_item.left_meta ) def action_global(self): self.query_one("#global").value = not self.query_one("#global").value def action_save(self): self.dismiss( { "kimai_customer_id": self.customer_id, "kimai_project_id": self.project_id, "kimai_activity_id": self.activity_id, "kimai_description": self.query_one("#description").value, "kimai_tags": self.query_one("#tags").value, "global": self.query_one("#global").value, } ) def action_cancel(self): self.dismiss(None) class ActivityListScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete"), ("f", "move_facts", "Move facts"), ("e", "edit", "Edit"), ("m", "mapping", "Mapping"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self): self.table.clear() facts_count_query = ( HamsterFact.select( HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count") ) .group_by(HamsterFact.activity_id) .alias("facts_count_query") ) mappings_count_query = ( HamsterActivityKimaiMapping.select( HamsterActivityKimaiMapping.hamster_activity_id, fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"), ) .group_by(HamsterActivityKimaiMapping.hamster_activity_id) .alias("mappings_count_query") ) activities = ( HamsterActivity.select( HamsterActivity, HamsterCategory.id, HamsterFact.start_time, fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"), fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias( "mappings_count" ), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join(HamsterFact, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join( facts_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == facts_count_query.c.activity_id), ) .switch(HamsterActivity) .join( mappings_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id), ) .group_by(HamsterActivity) ) filter_search = self.query_one("#filter #search").value if filter_search is not None: activities = activities.where( HamsterActivity.name.contains(filter_search) | HamsterCategory.name.contains(filter_search) ) filter_date = self.query_one("#filter #date").value if filter_date is not None: try: activities = activities.where( HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") ) except ValueError: pass self.table.add_rows( [ [ activity.category_id, activity.category_name, activity.id, activity.name, activity.facts_count, activity.mappings_count, ] for activity in activities ] ) self.table.sort(*self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns( "category id", "category", "activity id", "activity", "entries", "mappings" ) self.sort = (self.columns[1], self.columns[3]) self._refresh() def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) activity_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 2), ) activity = HamsterActivity.get(id=activity_id) activity.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) def action_move_facts(self) -> None: row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) self.move_from_activity = HamsterActivity.get(id=row_cells[2]) for col_idx, cell_value in enumerate(row_cells): cell_coordinate = Coordinate(row_idx, col_idx) self.table.update_cell_at( cell_coordinate, f"[red]{cell_value}[/red]", ) self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1) def on_data_table_row_selected(self, event): if getattr(self, "move_from_activity", None) is not None: move_to_activity = HamsterActivity.get( self.table.get_cell_at(Coordinate(event.cursor_row, 2)) ) HamsterFact.update({HamsterFact.activity: move_to_activity}).where( HamsterFact.activity == self.move_from_activity ).execute() self._refresh() del self.move_from_activity def action_edit(self): row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) try: category = HamsterCategory.get(id=row_cells[0]) except HamsterCategory.DoesNotExist: category = None activity = HamsterActivity.get(id=row_cells[2]) def handle_edit(properties): if properties is None: return activity.name = properties["activity"] activity.category_id = properties["category"] activity.save() self._refresh() self.app.push_screen( ActivityEditScreen(category=category, activity=activity), handle_edit ) def action_mapping(self): selected_activity = ( HamsterActivity.select( HamsterActivity, fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .where( HamsterActivity.id == self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 2), ) ) .get() ) def handle_mapping(mapping): if mapping is None: return m = HamsterActivityKimaiMapping.create( hamster_activity=selected_activity, **mapping ) m.save() filter_search = self.query_one("#search") self._refresh() self.app.push_screen( ActivityMappingScreen( category=selected_activity.category_name, activity=selected_activity.name, ), handle_mapping, ) class CategoryListScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete category"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self): self.table.clear() categories = ( HamsterCategory.select( HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count"), HamsterFact.start_time, ) .join(HamsterActivity, JOIN.LEFT_OUTER) .join(HamsterFact, JOIN.LEFT_OUTER) .group_by(HamsterCategory) ) filter_search = self.query_one("#filter #search").value if filter_search is not None: categories = categories.where(HamsterCategory.name.contains(filter_search)) filter_date = self.query_one("#filter #date").value if filter_date is not None: try: categories = categories.where( HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") ) except ValueError: pass self.table.add_rows( [ [ category.id, category.name, category.activities_count, ] for category in categories ] ) self.table.sort(self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns("category id", "category", "activities") self.sort = self.columns[1] self._refresh() def action_delete(self) -> None: row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) category_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 0), ) category = HamsterCategory.get(id=category_id) category.delete_instance() self.table.remove_row(row_key) class KimaiProjectListScreen(ListScreen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("g", "get", "Get data"), ("/", "filter", "Search"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self, filter_query=None): self.table.clear() projects = ( KimaiProject.select( KimaiProject, KimaiCustomer, fn.Count(KimaiActivity.id).alias("activities_count"), ) .join(KimaiCustomer, JOIN.LEFT_OUTER) .switch(KimaiProject) .join(KimaiActivity, JOIN.LEFT_OUTER) .where(KimaiActivity.project.is_null(False)) .group_by(KimaiProject) ) if filter_query: projects = projects.where( KimaiProject.name.contains(filter_query) | KimaiCustomer.name.contains(filter_query) ) self.table.add_rows( [ [ project.customer.id, project.customer.name, project.id, project.name, project.activities_count, ] for project in projects ] ) self.table.sort(self.sort) def action_get(self) -> None: api = KimaiAPI() KimaiCustomer.delete().execute() KimaiProject.delete().execute() KimaiActivity.delete().execute() customers = KimaiAPICustomer.list(api) with db.atomic(): KimaiCustomer.insert_many( [{"id": customer.id, "name": customer.name} for customer in customers] ).execute() projects = KimaiAPIProject.list(api) with db.atomic(): KimaiProject.insert_many( [ { "id": project.id, "name": project.name, "customer_id": project.customer.id, "allow_global_activities": project.allow_global_activities, } for project in projects ] ).execute() activities = KimaiAPIActivity.list(api) with db.atomic(): KimaiActivity.insert_many( [ { "id": activity.id, "name": activity.name, "project_id": ( activity.project and activity.project.id or None ), } for activity in activities ] ).execute() self._refresh() def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns( "customer id", "customer", "project id", "project", "activities" ) # self.sort = (self.columns[1], self.columns[3]) self.sort = self.columns[1] self._refresh() class HamsterToolsApp(App): CSS_PATH = "app.tcss" BINDINGS = [ ("a", "switch_mode('activities')", "Activities"), ("c", "switch_mode('categories')", "Categories"), ("k", "switch_mode('kimai')", "Kimai"), ("q", "quit", "Quit"), ] def __init__(self): db.init("hamster-testing.db") self.MODES = { "categories": CategoryListScreen(), "activities": ActivityListScreen(), "kimai": KimaiProjectListScreen(), } super().__init__() def on_mount(self) -> None: self.switch_mode("activities") def action_quit(self) -> None: db.close() self.exit()