diff --git a/hamstertools/app.py b/hamstertools/app.py index dcd4964..7d30ea5 100644 --- a/hamstertools/app.py +++ b/hamstertools/app.py @@ -1,660 +1,9 @@ -from datetime import datetime +from textual.app import App -from textual import on -from textual.app import App, ComposeResult -from textual.binding import Binding -from textual.containers import Grid -from textual.events import DescendantBlur -from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox -from textual.containers import Horizontal, Vertical -from textual.coordinate import Coordinate -from textual.screen import Screen, ModalScreen +from .db import db -from textual_autocomplete import AutoComplete, Dropdown, DropdownItem - -from peewee import fn, JOIN - -from .db import ( - db, - HamsterCategory, - HamsterActivity, - HamsterFact, - KimaiProject, - KimaiCustomer, - KimaiActivity, - HamsterActivityKimaiMapping, -) -from .kimai import ( - KimaiAPI, - Customer as KimaiAPICustomer, - Project as KimaiAPIProject, - Activity as KimaiAPIActivity, -) -from .sync import sync - - -class ListScreen(Screen): - def compose(self) -> ComposeResult: - yield Header() - with Vertical(): - yield DataTable() - with Horizontal(id="filter"): - yield Input( - id="search", placeholder="Category/activity name contains text" - ) - yield Input( - id="date", - placeholder="After date, in {0} format".format( - datetime.now().strftime("%Y-%m-%d") - ), - ) - yield Footer() - - def action_refresh(self) -> None: - self._refresh() - - def action_sort(self) -> None: - self.table.cursor_type = "column" - - def on_data_table_column_selected(self, event): - self.sort = (event.column_key,) - event.data_table.sort(*self.sort) - event.data_table.cursor_type = "row" - - def action_filter(self) -> None: - self.query_one("#filter").display = True - self._refresh() - self.query_one("#filter #search").focus() - - def on_input_submitted(self, event): - self.table.focus() - - def action_cancelfilter(self) -> None: - self.query_one("#filter").display = False - self.query_one("#filter #search").clear() - self.query_one("#filter #date").clear() - self.table.focus() - self._refresh() - - @on(Input.Changed, "#filter Input") - def filter(self, event): - self._refresh() - - -class ActivityEditScreen(ModalScreen): - BINDINGS = [ - ("escape", "cancel", "Cancel"), - ("ctrl+s", "save", "Save"), - ] - - category_id = None - category_name = "" - - def _get_categories(self, input_state): - categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()] - return ActivityMappingScreen._filter_dropdowns(categories, input_state.value) - - def __init__(self, category, activity): - if category is not None: - self.category_id = category.id - self.category_name = category.name - self.activity_name = activity.name - super().__init__() - - def compose(self) -> ComposeResult: - yield Vertical( - Horizontal( - Label("Category:"), - AutoComplete( - Input( - placeholder="Type to search...", - id="category", - value=self.category_name, - ), - Dropdown(items=self._get_categories), - ), - ), - Horizontal( - Label("Activity:"), Input(value=self.activity_name, id="activity") - ), - ) - - @on(Input.Submitted, "#category") - def category_submitted(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.category_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - self.query_one("#activity").focus() - - @on(DescendantBlur, "#category") - def category_blur(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.category_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - - def action_cancel(self): - self.dismiss(None) - - def action_save(self): - self.dismiss( - { - "category": self.category_id, - "activity": self.query_one("#activity").value, - } - ) - - -class ActivityMappingScreen(ModalScreen): - BINDINGS = [ - ("ctrl+g", "global", "Toggle global"), - ("ctrl+s", "save", "Save"), - ("escape", "cancel", "Cancel"), - ] - - customer_id = None - project_id = None - activity_id = None - customer = "" - project = "" - activity = "" - description = "" - tags = "" - - def __init__(self, category, activity, mapping=None): - self.hamster_category = category - self.hamster_activity = activity - - if mapping is not None: - self.customer_id = mapping.kimai_customer_id - self.customer = mapping.kimai_customer.name - self.project_id = mapping.kimai_project_id - self.project = mapping.kimai_project.name - self.activity_id = mapping.kimai_activity_id - self.activity = mapping.kimai_activity.name - self.description = mapping.kimai_description - self.tags = mapping.kimai_tags - - super().__init__() - - @staticmethod - def _filter_dropdowns(options, value): - matches = [c for c in options if value.lower() in c.main.plain.lower()] - return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower())) - - def _get_customers(self, input_state): - customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()] - return ActivityMappingScreen._filter_dropdowns(customers, input_state.value) - - def _get_projects(self, input_state): - projects = [ - DropdownItem(p.name, str(p.id)) - for p in KimaiProject.select().where( - KimaiProject.customer_id == self.customer_id - ) - ] - return ActivityMappingScreen._filter_dropdowns(projects, input_state.value) - - def _get_activities(self, input_state): - activities = KimaiActivity.select() - - if self.query_one("#global").value: - activities = activities.where( - KimaiActivity.project_id.is_null(), - ) - else: - activities = activities.where(KimaiActivity.project_id == self.project_id) - - return ActivityMappingScreen._filter_dropdowns( - [DropdownItem(a.name, str(a.id)) for a in activities], input_state.value - ) - - def compose(self) -> ComposeResult: - yield Vertical( - Horizontal( - Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"), - ), - Horizontal( - Label("Customer"), - AutoComplete( - Input( - placeholder="Type to search...", - id="customer", - value=self.customer, - ), - Dropdown(items=self._get_customers), - ), - ), - Horizontal( - Label("Project"), - AutoComplete( - Input( - placeholder="Type to search...", - id="project", - value=self.project, - ), - Dropdown(items=self._get_projects), - ), - ), - Horizontal( - Label("Activity"), - AutoComplete( - Input( - placeholder="Type to search...", - id="activity", - value=self.activity, - ), - Dropdown(items=self._get_activities), - ), - ), - Horizontal( - Label("Description"), - Input(id="description", value=self.description), - ), - Horizontal( - Label("Tags"), - Input(id="tags", value=self.tags), - ), - Horizontal(Checkbox("Global", id="global")), - ) - - @on(Input.Submitted, "#customer") - def customer_submitted(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.customer_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - self.query_one("#project").focus() - - @on(DescendantBlur, "#customer") - def customer_blur(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.customer_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - - @on(Input.Submitted, "#project") - def project_submitted(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) - self.query_one("#activity").focus() - - @on(DescendantBlur, "#project") - def project_blur(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.project_id = str(event.control.parent.dropdown.selected_item.left_meta) - - @on(Input.Submitted, "#activity") - def activity_submitted(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.activity_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - self.query_one("#activity").focus() - - @on(DescendantBlur, "#activity") - def activity_blur(self, event): - if event.control.parent.dropdown.selected_item is not None: - self.activity_id = str( - event.control.parent.dropdown.selected_item.left_meta - ) - - def action_global(self): - self.query_one("#global").value = not self.query_one("#global").value - - def action_save(self): - self.dismiss( - { - "kimai_customer_id": self.customer_id, - "kimai_project_id": self.project_id, - "kimai_activity_id": self.activity_id, - "kimai_description": self.query_one("#description").value, - "kimai_tags": self.query_one("#tags").value, - "global": self.query_one("#global").value, - } - ) - - def action_cancel(self): - self.dismiss(None) - - -class ActivityListScreen(ListScreen): - BINDINGS = [ - ("s", "sort", "Sort"), - ("r", "refresh", "Refresh"), - ("/", "filter", "Search"), - ("d", "delete", "Delete"), - ("f", "move_facts", "Move facts"), - ("e", "edit", "Edit"), - ("m", "mapping", "Mapping"), - Binding(key="escape", action="cancelfilter", show=False), - ] - - def _refresh(self): - self.table.clear() - - facts_count_query = ( - HamsterFact.select( - HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count") - ) - .group_by(HamsterFact.activity_id) - .alias("facts_count_query") - ) - - mappings_count_query = ( - HamsterActivityKimaiMapping.select( - HamsterActivityKimaiMapping.hamster_activity_id, - fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"), - ) - .group_by(HamsterActivityKimaiMapping.hamster_activity_id) - .alias("mappings_count_query") - ) - - activities = ( - HamsterActivity.select( - HamsterActivity, - HamsterCategory.id, - HamsterFact.start_time, - fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), - fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"), - fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias( - "mappings_count" - ), - ) - .join(HamsterCategory, JOIN.LEFT_OUTER) - .switch(HamsterActivity) - .join(HamsterFact, JOIN.LEFT_OUTER) - .switch(HamsterActivity) - .join( - facts_count_query, - JOIN.LEFT_OUTER, - on=(HamsterActivity.id == facts_count_query.c.activity_id), - ) - .switch(HamsterActivity) - .join( - mappings_count_query, - JOIN.LEFT_OUTER, - on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id), - ) - .group_by(HamsterActivity) - ) - - filter_search = self.query_one("#filter #search").value - if filter_search is not None: - activities = activities.where( - HamsterActivity.name.contains(filter_search) - | HamsterCategory.name.contains(filter_search) - ) - - filter_date = self.query_one("#filter #date").value - if filter_date is not None: - try: - activities = activities.where( - HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") - ) - except ValueError: - pass - - self.table.add_rows( - [ - [ - activity.category_id, - activity.category_name, - activity.id, - activity.name, - activity.facts_count, - activity.mappings_count, - ] - for activity in activities - ] - ) - - self.table.sort(*self.sort) - - def on_mount(self) -> None: - self.table = self.query_one(DataTable) - self.table.cursor_type = "row" - self.columns = self.table.add_columns( - "category id", "category", "activity id", "activity", "entries", "mappings" - ) - self.sort = (self.columns[1], self.columns[3]) - self._refresh() - - def action_delete(self) -> None: - # get the keys for the row and column under the cursor. - row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) - - activity_id = self.table.get_cell_at( - Coordinate(self.table.cursor_coordinate.row, 2), - ) - - activity = HamsterActivity.get(id=activity_id) - activity.delete_instance() - - # supply the row key to `remove_row` to delete the row. - self.table.remove_row(row_key) - - def action_move_facts(self) -> None: - row_idx: int = self.table.cursor_row - row_cells = self.table.get_row_at(row_idx) - self.move_from_activity = HamsterActivity.get(id=row_cells[2]) - for col_idx, cell_value in enumerate(row_cells): - cell_coordinate = Coordinate(row_idx, col_idx) - self.table.update_cell_at( - cell_coordinate, - f"[red]{cell_value}[/red]", - ) - self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1) - - def on_data_table_row_selected(self, event): - if getattr(self, "move_from_activity", None) is not None: - move_to_activity = HamsterActivity.get( - self.table.get_cell_at(Coordinate(event.cursor_row, 2)) - ) - HamsterFact.update({HamsterFact.activity: move_to_activity}).where( - HamsterFact.activity == self.move_from_activity - ).execute() - self._refresh() - del self.move_from_activity - - def action_edit(self): - row_idx: int = self.table.cursor_row - row_cells = self.table.get_row_at(row_idx) - - try: - category = HamsterCategory.get(id=row_cells[0]) - except HamsterCategory.DoesNotExist: - category = None - activity = HamsterActivity.get(id=row_cells[2]) - - def handle_edit(properties): - if properties is None: - return - activity.name = properties["activity"] - activity.category_id = properties["category"] - activity.save() - self._refresh() - - self.app.push_screen( - ActivityEditScreen(category=category, activity=activity), handle_edit - ) - - def action_mapping(self): - selected_activity = ( - HamsterActivity.select( - HamsterActivity, - fn.COALESCE(HamsterCategory.name, "None").alias("category_name"), - ) - .join(HamsterCategory, JOIN.LEFT_OUTER) - .where( - HamsterActivity.id - == self.table.get_cell_at( - Coordinate(self.table.cursor_coordinate.row, 2), - ) - ) - .get() - ) - - mapping = None - try: - mapping = HamsterActivityKimaiMapping.get( - hamster_activity=selected_activity - ) - except HamsterActivityKimaiMapping.DoesNotExist: - pass - - def handle_mapping(mapping_data): - if mapping_data is None: - return - if mapping is not None: - mapping_ = mapping - for key, value in mapping_data.items(): - setattr(mapping_, key, value) - else: - mapping_ = HamsterActivityKimaiMapping.create( - hamster_activity=selected_activity, **mapping_data - ) - mapping_.save() - self._refresh() - - self.app.push_screen( - ActivityMappingScreen( - category=selected_activity.category_name, - activity=selected_activity.name, - mapping=mapping, - ), - handle_mapping, - ) - - -class CategoryListScreen(ListScreen): - BINDINGS = [ - ("s", "sort", "Sort"), - ("r", "refresh", "Refresh"), - ("/", "filter", "Search"), - ("d", "delete", "Delete category"), - Binding(key="escape", action="cancelfilter", show=False), - ] - - def _refresh(self): - self.table.clear() - - categories = ( - HamsterCategory.select( - HamsterCategory, - fn.Count(HamsterActivity.id).alias("activities_count"), - HamsterFact.start_time, - ) - .join(HamsterActivity, JOIN.LEFT_OUTER) - .join(HamsterFact, JOIN.LEFT_OUTER) - .group_by(HamsterCategory) - ) - - filter_search = self.query_one("#filter #search").value - if filter_search is not None: - categories = categories.where(HamsterCategory.name.contains(filter_search)) - - filter_date = self.query_one("#filter #date").value - if filter_date is not None: - try: - categories = categories.where( - HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d") - ) - except ValueError: - pass - - self.table.add_rows( - [ - [ - category.id, - category.name, - category.activities_count, - ] - for category in categories - ] - ) - - self.table.sort(self.sort) - - def on_mount(self) -> None: - self.table = self.query_one(DataTable) - self.table.cursor_type = "row" - self.columns = self.table.add_columns("category id", "category", "activities") - self.sort = self.columns[1] - self._refresh() - - def action_delete(self) -> None: - row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) - - category_id = self.table.get_cell_at( - Coordinate(self.table.cursor_coordinate.row, 0), - ) - category = HamsterCategory.get(id=category_id) - category.delete_instance() - - self.table.remove_row(row_key) - - -class KimaiProjectListScreen(ListScreen): - BINDINGS = [ - ("s", "sort", "Sort"), - ("r", "refresh", "Refresh"), - ("g", "get", "Get data"), - ("/", "filter", "Search"), - Binding(key="escape", action="cancelfilter", show=False), - ] - - def _refresh(self, filter_query=None): - self.table.clear() - - projects = ( - KimaiProject.select( - KimaiProject, - KimaiCustomer, - fn.Count(KimaiActivity.id).alias("activities_count"), - ) - .join(KimaiCustomer, JOIN.LEFT_OUTER) - .switch(KimaiProject) - .join(KimaiActivity, JOIN.LEFT_OUTER) - .where(KimaiActivity.project.is_null(False)) - .group_by(KimaiProject) - ) - - if filter_query: - projects = projects.where( - KimaiProject.name.contains(filter_query) - | KimaiCustomer.name.contains(filter_query) - ) - - self.table.add_rows( - [ - [ - project.customer.id, - project.customer.name, - project.id, - project.name, - project.activities_count, - ] - for project in projects - ] - ) - - self.table.sort(self.sort) - - def action_get(self) -> None: - sync() - self._refresh() - - def on_mount(self) -> None: - self.table = self.query_one(DataTable) - self.table.cursor_type = "row" - self.columns = self.table.add_columns( - "customer id", "customer", "project id", "project", "activities" - ) - # self.sort = (self.columns[1], self.columns[3]) - self.sort = self.columns[1] - self._refresh() +from .screens.hamster import CategoryListScreen, ActivityListScreen +from .screens.kimai import KimaiProjectListScreen class HamsterToolsApp(App): diff --git a/hamstertools/kimai.py b/hamstertools/kimaiapi.py similarity index 100% rename from hamstertools/kimai.py rename to hamstertools/kimaiapi.py diff --git a/hamstertools/sync.py b/hamstertools/sync.py index a03fe50..3ced975 100644 --- a/hamstertools/sync.py +++ b/hamstertools/sync.py @@ -1,4 +1,4 @@ -from .kimai import ( +from .kimaiapi import ( KimaiAPI, Customer as KimaiAPICustomer, Project as KimaiAPIProject,