from datetime import datetime from textual import on from textual.app import ComposeResult from textual.binding import Binding from textual.coordinate import Coordinate from textual.containers import Horizontal, Vertical, Grid from textual.events import DescendantBlur from textual.screen import Screen, ModalScreen from textual.widgets import ( Header, Footer, DataTable, Input, Label, Checkbox, TabbedContent, TabPane, Button ) from peewee import fn, JOIN from textual_autocomplete import AutoComplete, Dropdown, DropdownItem from ..db import ( HamsterCategory, HamsterActivity, HamsterFact, KimaiProject, KimaiCustomer, KimaiActivity, HamsterActivityKimaiMapping, ) from .list import ListPane class ActivityEditScreen(ModalScreen): BINDINGS = [ ("escape", "cancel", "Cancel"), ("ctrl+s", "save", "Save"), ] category_id = None category_name = "" def _get_categories(self, input_state): categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()] return ActivityMappingScreen._filter_dropdowns(categories, input_state.value) def __init__(self, category, activity): if category is not None: self.category_id = category.id self.category_name = category.name self.activity_name = activity.name super().__init__() def compose(self) -> ComposeResult: yield Vertical( Horizontal( Label("Category:"), AutoComplete( Input( placeholder="Type to search...", id="category", value=self.category_name, ), Dropdown(items=self._get_categories), ), ), Horizontal( Label("Activity:"), Input(value=self.activity_name, id="activity") ), ) @on(Input.Submitted, "#category") def category_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.category_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#activity").focus() @on(DescendantBlur, "#category") def category_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.category_id = str( event.control.parent.dropdown.selected_item.left_meta ) def action_cancel(self): self.dismiss(None) def action_save(self): self.dismiss( { "category": self.category_id, "activity": self.query_one("#activity").value, } ) class ActivityMappingScreen(ModalScreen): BINDINGS = [ ("ctrl+g", "global", "Toggle global"), ("ctrl+s", "save", "Save"), ("escape", "cancel", "Cancel"), ] customer_id = None project_id = None activity_id = None customer = "" project = "" activity = "" description = "" tags = "" def __init__(self, category, activity, mapping=None): self.hamster_category = category self.hamster_activity = activity if mapping is not None: self.customer_id = mapping.kimai_customer_id self.customer = mapping.kimai_customer.name self.project_id = mapping.kimai_project_id self.project = mapping.kimai_project.name self.activity_id = mapping.kimai_activity_id self.activity = mapping.kimai_activity.name self.description = mapping.kimai_description self.tags = mapping.kimai_tags super().__init__() @staticmethod def _filter_dropdowns(options, value): matches = [c for c in options if value.lower() in c.main.plain.lower()] return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower())) def _get_customers(self, input_state): customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()] return ActivityMappingScreen._filter_dropdowns(customers, input_state.value) def _get_projects(self, input_state): projects = [ DropdownItem(p.name, str(p.id)) for p in KimaiProject.select().where( KimaiProject.customer_id == self.customer_id ) ] return ActivityMappingScreen._filter_dropdowns(projects, input_state.value) def _get_activities(self, input_state): activities = KimaiActivity.select() if self.query_one("#global").value: activities = activities.where( KimaiActivity.project_id.is_null(), ) else: activities = activities.where(KimaiActivity.project_id == self.project_id) return ActivityMappingScreen._filter_dropdowns( [DropdownItem(a.name, str(a.id)) for a in activities], input_state.value ) def compose(self) -> ComposeResult: yield Vertical( Horizontal( Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"), ), Horizontal( Label("Customer"), AutoComplete( Input( placeholder="Type to search...", id="customer", value=self.customer, ), Dropdown(items=self._get_customers), ), ), Horizontal( Label("Project"), AutoComplete( Input( placeholder="Type to search...", id="project", value=self.project, ), Dropdown(items=self._get_projects), ), ), Horizontal( Label("Activity"), AutoComplete( Input( placeholder="Type to search...", id="activity", value=self.activity, ), Dropdown(items=self._get_activities), ), ), Horizontal( Label("Description"), Input(id="description", value=self.description), ), Horizontal( Label("Tags"), Input(id="tags", value=self.tags), ), Horizontal(Checkbox("Global", id="global")), ) @on(Input.Submitted, "#customer") def customer_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.customer_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#project").focus() @on(DescendantBlur, "#customer") def customer_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.customer_id = str( event.control.parent.dropdown.selected_item.left_meta ) @on(Input.Submitted, "#project") def project_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) self.query_one("#activity").focus() @on(DescendantBlur, "#project") def project_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) @on(Input.Submitted, "#activity") def activity_submitted(self, event): if event.control.parent.dropdown.selected_item is not None: self.activity_id = str( event.control.parent.dropdown.selected_item.left_meta ) self.query_one("#activity").focus() @on(DescendantBlur, "#activity") def activity_blur(self, event): if event.control.parent.dropdown.selected_item is not None: self.activity_id = str( event.control.parent.dropdown.selected_item.left_meta ) def action_global(self): self.query_one("#global").value = not self.query_one("#global").value def action_save(self): self.dismiss( { "kimai_customer_id": self.customer_id, "kimai_project_id": self.project_id, "kimai_activity_id": self.activity_id, "kimai_description": self.query_one("#description").value, "kimai_tags": self.query_one("#tags").value, "global": self.query_one("#global").value, } ) def action_cancel(self): self.dismiss(None) class ActivityDeleteConfirmScreen(ModalScreen): BINDINGS = [ ("escape", "cancel", "Cancel"), ] def compose(self) -> ComposeResult: yield Grid( Label("Are you sure you want to delete this activity?", id="question"), Button("Confirm", variant="error", id="confirm"), Button("Cancel", variant="primary", id="cancel"), id="dialog", ) def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "quit": self.dismiss(True) else: self.dismiss(False) def action_cancel(self): self.dismiss(False) class ActivityList(ListPane): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete"), ("f", "move_facts", "Move"), ("e", "edit", "Edit"), ("m", "mapping", "Mapping"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self): self.table.clear() facts_count_query = ( HamsterFact.select( HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count") ) .group_by(HamsterFact.activity_id) .alias("facts_count_query") ) mappings_count_query = ( HamsterActivityKimaiMapping.select( HamsterActivityKimaiMapping.hamster_activity_id, fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"), ) .group_by(HamsterActivityKimaiMapping.hamster_activity_id) .alias("mappings_count_query") ) activities = ( HamsterActivity.select( HamsterActivity, HamsterCategory.id, HamsterFact.start_time, fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"), fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias( "mappings_count" ), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join(HamsterFact, JOIN.LEFT_OUTER) .switch(HamsterActivity) .join( facts_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == facts_count_query.c.activity_id), ) .switch(HamsterActivity) .join( mappings_count_query, JOIN.LEFT_OUTER, on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id), ) .group_by(HamsterActivity) ) filter_search = self.query_one("#filter #search").value if filter_search is not None: activities = activities.where( HamsterActivity.name.contains(filter_search) | HamsterCategory.name.contains(filter_search) ) filter_date = self.query_one("#filter #date").value if filter_date is not None: try: activities = activities.where( HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") ) except ValueError: pass self.table.add_rows( [ [ activity.category_id, activity.category_name, activity.id, activity.name, activity.facts_count, activity.mappings_count, ] for activity in activities ] ) self.table.sort(*self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns( "category id", "category", "activity id", "activity", "entries", "mappings" ) self.sort = (self.columns[1], self.columns[3]) self._refresh() def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) activity_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 2), ) activity = HamsterActivity.get(id=activity_id) def check_delete(delete: bool) -> None: """Called when QuitScreen is dismissed.""" if delete: activity.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) if activity.facts.count() > 0: self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete) else: check_delete(True) def action_move_facts(self) -> None: row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) self.move_from_activity = HamsterActivity.get(id=row_cells[2]) for col_idx, cell_value in enumerate(row_cells): cell_coordinate = Coordinate(row_idx, col_idx) self.table.update_cell_at( cell_coordinate, f"[red]{cell_value}[/red]", ) self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1) def on_data_table_row_selected(self, event): if getattr(self, "move_from_activity", None) is not None: move_to_activity = HamsterActivity.get( self.table.get_cell_at(Coordinate(event.cursor_row, 2)) ) HamsterFact.update({HamsterFact.activity: move_to_activity}).where( HamsterFact.activity == self.move_from_activity ).execute() self._refresh() del self.move_from_activity def action_edit(self): row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) try: category = HamsterCategory.get(id=row_cells[0]) except HamsterCategory.DoesNotExist: category = None activity = HamsterActivity.get(id=row_cells[2]) def handle_edit(properties): if properties is None: return activity.name = properties["activity"] activity.category_id = properties["category"] activity.save() self._refresh() self.app.push_screen( ActivityEditScreen(category=category, activity=activity), handle_edit ) def action_mapping(self): selected_activity = ( HamsterActivity.select( HamsterActivity, fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), ) .join(HamsterCategory, JOIN.LEFT_OUTER) .where( HamsterActivity.id == self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 2), ) ) .get() ) mapping = None try: mapping = HamsterActivityKimaiMapping.get( hamster_activity=selected_activity ) except HamsterActivityKimaiMapping.DoesNotExist: pass def handle_mapping(mapping_data): if mapping_data is None: return if mapping is not None: mapping_ = mapping for key, value in mapping_data.items(): setattr(mapping_, key, value) else: mapping_ = HamsterActivityKimaiMapping.create( hamster_activity=selected_activity, **mapping_data ) mapping_.save() self._refresh() self.app.push_screen( ActivityMappingScreen( category=selected_activity.category_name, activity=selected_activity.name, mapping=mapping, ), handle_mapping, ) class CategoryList(ListPane): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("/", "filter", "Search"), ("d", "delete", "Delete category"), Binding(key="escape", action="cancelfilter", show=False), ] def _refresh(self): self.table.clear() categories = ( HamsterCategory.select( HamsterCategory, fn.Count(HamsterActivity.id).alias("activities_count"), HamsterFact.start_time, ) .join(HamsterActivity, JOIN.LEFT_OUTER) .join(HamsterFact, JOIN.LEFT_OUTER) .group_by(HamsterCategory) ) filter_search = self.query_one("#filter #search").value if filter_search is not None: categories = categories.where(HamsterCategory.name.contains(filter_search)) filter_date = self.query_one("#filter #date").value if filter_date is not None: try: categories = categories.where( HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") ) except ValueError: pass self.table.add_rows( [ [ category.id, category.name, category.activities_count, ] for category in categories ] ) self.table.sort(self.sort) def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns("category id", "category", "activities") self.sort = self.columns[1] self._refresh() def action_delete(self) -> None: row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) category_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 0), ) category = HamsterCategory.get(id=category_id) category.delete_instance() self.table.remove_row(row_key) class HamsterScreen(Screen): BINDINGS = [ ("c", "show_tab('categories')", "Categories"), ("a", "show_tab('activities')", "Activities"), ] SUB_TITLE = "Hamster" def compose(self) -> ComposeResult: yield Header() with TabbedContent(initial="activities"): with TabPane("Categories", id="categories"): yield CategoryList() with TabPane("Activities", id="activities"): yield ActivityList() yield Footer() def on_mount(self) -> None: self.query_one("TabbedContent Tabs").can_focus = False self.query_one("#activities DataTable").focus() def action_show_tab(self, tab: str) -> None: """Switch to a new tab.""" self.get_child_by_type(TabbedContent).active = tab self.query_one(f"#{tab} DataTable").focus()