From 8e5e28ea6741077ee0f023e4e4a64d77b755bbf8 Mon Sep 17 00:00:00 2001 From: 3wc <3wc@doesthisthing.work> Date: Fri, 27 Oct 2023 04:04:30 +0100 Subject: [PATCH] =?UTF-8?q?Proper=20DB=20ORM,=20filtering=20=F0=9F=8E=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hamstertools/__init__.py | 3 +- hamstertools/app.py | 174 ++++++++++++++++++++------------------- hamstertools/db.py | 170 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 85 deletions(-) create mode 100644 hamstertools/db.py diff --git a/hamstertools/__init__.py b/hamstertools/__init__.py index be12c92..bea9cf5 100755 --- a/hamstertools/__init__.py +++ b/hamstertools/__init__.py @@ -704,7 +704,8 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte @cli.command() def app(): from .app import HamsterToolsApp - app = HamsterToolsApp(db_cursor=c, db_connection=conn) + #app = HamsterToolsApp(db_cursor=c, db_connection=conn) + app = HamsterToolsApp() app.run() diff --git a/hamstertools/app.py b/hamstertools/app.py index d595795..6bd04c4 100644 --- a/hamstertools/app.py +++ b/hamstertools/app.py @@ -1,8 +1,12 @@ from textual.app import App, ComposeResult -from textual.widgets import Header, Footer, DataTable, Placeholder +from textual.binding import Binding +from textual.widgets import Header, Footer, DataTable, Input +from textual.containers import Horizontal, Vertical from textual.coordinate import Coordinate from textual.screen import Screen +from textual.reactive import reactive +from .db import DatabaseManager, Category, Activity class ActivitiesScreen(Screen): BINDINGS = [ @@ -10,58 +14,44 @@ class ActivitiesScreen(Screen): ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("d", "delete", "Delete activity"), + ("/", "filter", "Search"), + Binding(key="escape", action="cancelfilter", show=False), ] - def __init__(self, db_cursor, db_connection): - self.db_cursor = db_cursor - self.db_connection = db_connection + def __init__(self, db_manager): + self.db_manager = db_manager super().__init__() - def _refresh(self): + def _refresh(self, filter_query=None): self.table.clear() - sql = ''' - select - categories.id as category_id, - coalesce(categories.name, '') as category_name, - activities.id as activity_id, - activities.name as activity_name, - coalesce(facts_count, 0) as total_facts - from - activities - left join - categories - on - activities.category_id = categories.id - left join ( - select - activity_id, - count(*) as facts_count - from - facts - group by - activity_id - ) as facts_count_subquery - on - activities.id = facts_count_subquery.activity_id; - ''' + # List activities with the count of facts + activities = Activity.list_activities(self.db_manager, filter_query) - results = self.db_cursor.execute(sql) - # results = [[cell or "" for cell in row] for row in self.db_cursor.fetchall()] + self.table.add_rows([[ + activity.category_id, + activity.category_name, + activity.id, + activity.name, + activity.facts_count, + ] for activity in activities]) - self.table.add_rows(results) - self.table.sort(self.columns[1], self.columns[3]) + self.table.sort(*self.sort) def compose(self) -> ComposeResult: """create child widgets for the app.""" yield Header() - yield DataTable() + with Vertical(): + yield DataTable() + with Horizontal(): + yield Input(id="filter") yield Footer() def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns("category id","category","activity id","activity","entries") + self.sort = (self.columns[1], self.columns[3]) self._refresh() def action_refresh(self) -> None: @@ -70,6 +60,17 @@ class ActivitiesScreen(Screen): def action_sort(self) -> None: self.table.cursor_type = "column" + def action_filter(self) -> None: + filter_input = self.query_one("#filter") + filter_input.display = True + filter_input.focus() + print(filter_input) + + def action_cancelfilter(self) -> None: + filter_input = self.query_one("#filter") + filter_input.display = False + self._refresh() + def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) @@ -78,72 +79,65 @@ class ActivitiesScreen(Screen): Coordinate(self.table.cursor_coordinate.row, 2), ) - sql = 'delete from activities where id = ?' - print(Coordinate(2, self.table.cursor_coordinate.row),) - print(activity_id) - - self.db_cursor.execute(sql, (activity_id,)) - self.db_connection.commit() + activity = Activity.get_by_id(self.db_manager, activity_id) + activity.delete() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) def on_data_table_column_selected(self, event): - event.data_table.sort(event.column_key) + self.sort = (event.column_key,) + event.data_table.sort(*self.sort) event.data_table.cursor_type = "row" + def on_input_changed(self, event): + self._refresh(event.value) + class CategoriesScreen(Screen): BINDINGS = [ ("s", "sort", "Sort"), ("r", "refresh", "Refresh"), ("d", "delete", "Delete category"), + ("/", "filter", "Search"), + Binding(key="escape", action="cancelfilter", show=False), ] - def __init__(self, db_cursor, db_connection): - self.db_cursor = db_cursor - self.db_connection = db_connection + filtering = reactive(False) + filter_query = reactive("") + + def __init__(self, db_manager): + self.db_manager = db_manager super().__init__() - def _refresh(self): + def _refresh(self, filter_query=None): self.table.clear() - sql = ''' - select - categories.id as category_id, - coalesce(categories.name, '') as category_name, - coalesce(activities_count, 0) as total_activities - from - categories - left join ( - select - category_id, - count(*) as activities_count - from - activities - group by - category_id - ) as activities_count_subquery - on - categories.id = activities_count_subquery.category_id; - ''' + categories = Category.list_categories(self.db_manager, + filter_query=filter_query) - results = self.db_cursor.execute(sql) - # results = [[cell or "" for cell in row] for row in self.db_cursor.fetchall()] + self.table.add_rows([[ + category.id, + category.name, + category.activity_count, + ] for category in categories]) - self.table.add_rows(results) - self.table.sort(self.columns[1]) + self.table.sort(self.sort) def compose(self) -> ComposeResult: """create child widgets for the app.""" yield Header() - yield DataTable() + with Vertical(): + yield DataTable() + with Horizontal(): + yield Input(id="filter") yield Footer() def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" self.columns = self.table.add_columns("category id","category","activities") + self.sort = self.columns[1] self._refresh() def action_refresh(self) -> None: @@ -152,6 +146,20 @@ class CategoriesScreen(Screen): def action_sort(self) -> None: self.table.cursor_type = "column" + def action_filter(self) -> None: + filter_input = self.query_one("#filter") + filter_input.display = True + filter_input.focus() + print(filter_input) + + def action_cancelfilter(self) -> None: + filter_input = self.query_one("#filter") + filter_input.display = False + self._refresh() + + def on_input_changed(self, event): + self._refresh(event.value) + def action_delete(self) -> None: # get the keys for the row and column under the cursor. row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate) @@ -159,19 +167,16 @@ class CategoriesScreen(Screen): category_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 0), ) - - sql = 'delete from categories where id = ?' - print(Coordinate(2, self.table.cursor_coordinate.row),) - print(category_id) - - self.db_cursor.execute(sql, (category_id,)) - self.db_connection.commit() + category = Category.get_by_id(self.db_manager, category_id) + category.delete() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) def on_data_table_column_selected(self, event): - event.data_table.sort(event.column_key) + """ Handle column selection for sort """ + self.sort = event.column_key + event.data_table.sort(self.sort) event.data_table.cursor_type = "row" # class KimaiScreen(Screen): @@ -186,19 +191,20 @@ class CategoriesScreen(Screen): class HamsterToolsApp(App): + CSS_PATH = 'app.tcss' BINDINGS = [ ("a", "switch_mode('activities')", "Activities"), ("c", "switch_mode('categories')", "Categories"), # ("k", "switch_mode('kimai')", "Kimai"), ("q", "quit", "Quit"), ] - def __init__(self, db_cursor, db_connection): - self.db_cursor = db_cursor - self.db_connection = db_connection + + def __init__(self): + self.db_manager = DatabaseManager('hamster-testing.db') self.MODES = { - "categories": CategoriesScreen(db_cursor, db_connection), - "activities": ActivitiesScreen(db_cursor, db_connection), + "categories": CategoriesScreen(self.db_manager), + "activities": ActivitiesScreen(self.db_manager) # "kimai": KimaiScreen, } @@ -209,4 +215,4 @@ class HamsterToolsApp(App): def action_quit(self) -> None: self.exit() - + self.db_manager.close() diff --git a/hamstertools/db.py b/hamstertools/db.py new file mode 100644 index 0000000..6193361 --- /dev/null +++ b/hamstertools/db.py @@ -0,0 +1,170 @@ +import sqlite3 + +class DatabaseManager: + def __init__(self, database_name): + self.conn = sqlite3.connect(database_name) + self.cursor = self.conn.cursor() + + def get_conn(self): + return self.conn + + def get_cursor(self): + return self.cursor + + def close(self): + self.conn.close() + +class BaseORM: + def __init__(self, db_manager, table_name, id, **kwargs): + self.db_manager = db_manager + self.conn = db_manager.get_conn() + self.cursor = db_manager.get_cursor() + self.id = id + self.table_name = table_name + for key, value in kwargs.items(): + setattr(self, key, value) + + def delete(self): + self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,)) + self.conn.commit() + + +class Category(BaseORM): + def __init__(self, db_manager, id, name, activity_count): + super().__init__(db_manager, "categories", id, name=name, + activity_count=activity_count) + + @staticmethod + def list_categories(db_manager, filter_query=None): + cursor = db_manager.get_cursor() + where = "" + if filter_query is not None: + where = "WHERE categories.name LIKE ?" + sql = f""" + SELECT + categories.id, + COALESCE(categories.name, ""), + COUNT(activities.id) AS activity_count + FROM + categories + LEFT JOIN + activities + ON + categories.id = activities.category_id + {where} + GROUP BY + categories.id + """ + if filter_query is not None: + cursor.execute(sql, ("%{}%".format(filter_query),)) + else: + cursor.execute(sql) + rows = cursor.fetchall() + return [Category(db_manager, row[0], row[1], row[2]) for row in rows] + + @staticmethod + def get_by_id(db_manager, category_id): + cursor = db_manager.get_cursor() + cursor.execute(""" + SELECT + categories.id, + categories.name, + COUNT(activities.id) AS activity_count + FROM + categories + LEFT JOIN + activities + ON + categories.id = activities.category_id + WHERE + categories.id = ? + """, (category_id,)) + + row = cursor.fetchone() + if row: + return Category(db_manager, row[0], row[1], row[2]) + return None + + +class Activity(BaseORM): + def __init__(self, db_manager, id, name, category_id, category_name, facts_count): + super().__init__(db_manager, "activities", id, name=name, category_id=category_id) + self.category_name = category_name + self.facts_count = facts_count + + @staticmethod + def list_activities(db_manager, filter_query=None): + cursor = db_manager.get_cursor() + where = "" + if filter_query is not None: + where = "WHERE categories.name LIKE ? or activities.name like ?" + sql = f""" + SELECT + activities.id, + activities.name, + categories.id, + COALESCE(categories.name, ""), + COUNT(facts.id) AS facts_count + FROM + activities + LEFT JOIN + categories + ON + activities.category_id = categories.id + LEFT JOIN + facts + ON + activities.id = facts.activity_id + {where} + GROUP BY + activities.id + """ + + if filter_query is not None: + cursor.execute(sql, ("%{}%".format(filter_query),) * 2 ) + else: + cursor.execute(sql) + + rows = cursor.fetchall() + return [Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows] + + @staticmethod + def get_by_id(db_manager, activity_id): + cursor = db_manager.get_cursor() + cursor.execute(""" + SELECT + activities.id, + activities.name, + categories.id, + COALESCE(categories.name, ""), + COUNT(facts.id) AS facts_count + FROM + activities + LEFT JOIN + categories + ON + activities.category_id = categories.id + LEFT JOIN + facts + ON + activities.id = facts.activity_id + WHERE + activities.id = ? + """, (activity_id,)) + + row = cursor.fetchone() + if row: + return Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) + return None + + +class Fact(BaseORM): + def __init__(self, db_manager, id, activity_id): + super().__init__(db_manager, "facts", id, activity_id=activity_id) + + @staticmethod + def list_facts(db_manager): + cursor = db_manager.get_cursor() + cursor.execute("SELECT * FROM facts") + rows = cursor.fetchall() + return [Fact(db_manager, row[0], row[1]) for row in rows]