From cd278b32aa49bf8daeaaa9b9fcce12d630927e48 Mon Sep 17 00:00:00 2001 From: 3wc <3wc@doesthisthing.work> Date: Sat, 28 Oct 2023 00:42:30 +0100 Subject: [PATCH] Switch to peewee ORM --- hamstertools/app.py | 135 ++++++++++++++------ hamstertools/db.py | 301 +++++++------------------------------------- 2 files changed, 142 insertions(+), 294 deletions(-) diff --git a/hamstertools/app.py b/hamstertools/app.py index 2165764..b87d7a7 100644 --- a/hamstertools/app.py +++ b/hamstertools/app.py @@ -5,15 +5,26 @@ from textual.containers import Horizontal, Vertical from textual.coordinate import Coordinate from textual.screen import Screen -from .db import DatabaseManager, Category, Activity, KimaiProject, KimaiCustomer -from .kimai import KimaiAPI, Customer as KimaiAPICustomer, Project as KimaiAPIProject, Activity as KimaiAPIActivity +from peewee import fn, JOIN + +from .db import ( + db, + HamsterCategory, + HamsterActivity, + HamsterFact, + KimaiProject, + KimaiCustomer, + KimaiActivity, +) +from .kimai import ( + KimaiAPI, + Customer as KimaiAPICustomer, + Project as KimaiAPIProject, + Activity as KimaiAPIActivity, +) class ListScreen(Screen): - def __init__(self, db_manager): - self.db_manager = db_manager - super().__init__() - def compose(self) -> ComposeResult: """create child widgets for the app.""" yield Header() @@ -67,14 +78,29 @@ class ActivitiesScreen(ListScreen): def _refresh(self, filter_query=None): self.table.clear() - # List activities with the count of facts - activities = Activity.list(self.db_manager, filter_query) + activities = ( + HamsterActivity.select( + HamsterActivity, + HamsterCategory, + fn.Count(HamsterFact.id).alias("facts_count") + ) + .join(HamsterCategory, JOIN.LEFT_OUTER) + .switch(HamsterActivity) + .join(HamsterFact, JOIN.LEFT_OUTER) + .group_by(HamsterActivity) + ) + + if filter_query: + activities = activities.where( + HamsterActivity.name.contains(filter_query) + | HamsterCategory.name.contains(filter_query) + ) self.table.add_rows( [ [ - activity.category_id, - activity.category_name, + activity.category.id, + activity.category.name, activity.id, activity.name, activity.facts_count, @@ -102,8 +128,8 @@ class ActivitiesScreen(ListScreen): Coordinate(self.table.cursor_coordinate.row, 2), ) - activity = Activity.get_by_id(self.db_manager, activity_id) - activity.delete() + activity = HamsterActivity.get(id=activity_id) + activity.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) @@ -111,7 +137,7 @@ class ActivitiesScreen(ListScreen): def action_move_facts(self) -> None: row_idx: int = self.table.cursor_row row_cells = self.table.get_row_at(row_idx) - self.move_from_activity = Activity.get_by_id(self.db_manager, row_cells[2]) + self.move_from_activity = HamsterActivity.get(id=row_cells[2]) for col_idx, cell_value in enumerate(row_cells): cell_coordinate = Coordinate(row_idx, col_idx) self.table.update_cell_at( @@ -122,10 +148,12 @@ class ActivitiesScreen(ListScreen): def on_data_table_row_selected(self, event): if getattr(self, "move_from_activity", None) is not None: - move_to_activity = Activity.get_by_id( - self.db_manager, self.table.get_cell_at(Coordinate(event.cursor_row, 2)) + move_to_activity = HamsterActivity.get( + self.table.get_cell_at(Coordinate(event.cursor_row, 2)) ) - self.move_from_activity.move_facts(move_to_activity) + HamsterFact.update({HamsterFact.activity: + move_to_activity}).where(HamsterFact.activity == + self.move_from_activity).execute() filter_input = self.query_one("#filter") self._refresh(filter_input.value) del self.move_from_activity @@ -143,16 +171,26 @@ class CategoriesScreen(ListScreen): def _refresh(self, filter_query=None): self.table.clear() - categories = Category.list( - self.db_manager, filter_query=filter_query + categories = ( + HamsterCategory.select( + HamsterCategory, + fn.Count(HamsterActivity.id).alias("activities_count") + ) + .join(HamsterActivity, JOIN.LEFT_OUTER) + .group_by(HamsterCategory) ) + if filter_query: + categories = categories.where( + HamsterCategory.name.contains(filter_query) + ) + self.table.add_rows( [ [ category.id, category.name, - category.activity_count, + category.activities_count, ] for category in categories ] @@ -174,8 +212,8 @@ class CategoriesScreen(ListScreen): category_id = self.table.get_cell_at( Coordinate(self.table.cursor_coordinate.row, 0), ) - category = Category.get_by_id(self.db_manager, category_id) - category.delete() + category = HamsterCategory.get(id=category_id) + category.delete_instance() # supply the row key to `remove_row` to delete the row. self.table.remove_row(row_key) @@ -193,15 +231,33 @@ class KimaiScreen(ListScreen): def _refresh(self, filter_query=None): self.table.clear() - projects = KimaiProject.list(self.db_manager, filter_query) + projects = ( + KimaiProject.select( + KimaiProject, + KimaiCustomer, + fn.Count(KimaiActivity.id).alias("activities_count") + ) + .join(KimaiCustomer, JOIN.LEFT_OUTER) + .switch(KimaiProject) + .join(KimaiActivity, JOIN.LEFT_OUTER) + .group_by(KimaiProject) + ) + + if filter_query: + projects = projects.where( + KimaiProject.name.contains(filter_query) + | KimaiCustomer.name.contains(filter_query) + ) + self.table.add_rows( [ [ - project.customer_id, - project.customer_name, + project.customer.id, + project.customer.name, project.id, project.name, + project.activities_count ] for project in projects ] @@ -213,21 +269,28 @@ class KimaiScreen(ListScreen): api = KimaiAPI() customers = KimaiAPICustomer.list(api) - for customer in customers: - KimaiCustomer(self.db_manager, id=customer.id, name=customer.name).save() + with db.atomic(): + KimaiCustomer.insert_many([{ + 'id': customer.id, + 'name': customer.name + } for customer in customers]).execute() projects = KimaiAPIProject.list(api) - for project in projects: - KimaiProject(self.db_manager, id=project.id, name=project.name, - customer_id=project.customer.id, customer_name="").save() + with db.atomic(): + KimaiProject.insert_many([{ + 'id': project.id, + 'name': project.name, + 'customer_id': project.customer.id + } for project in projects]).execute() self._refresh() def on_mount(self) -> None: self.table = self.query_one(DataTable) self.table.cursor_type = "row" - self.columns = self.table.add_columns("customer id", "customer", - "project id", "project") + self.columns = self.table.add_columns( + "customer id", "customer", "project id", "project", "activities" + ) # self.sort = (self.columns[1], self.columns[3]) self.sort = self.columns[1] self._refresh() @@ -243,12 +306,12 @@ class HamsterToolsApp(App): ] def __init__(self): - self.db_manager = DatabaseManager("hamster-testing.db") + db.init("hamster-testing.db") self.MODES = { - "categories": CategoriesScreen(self.db_manager), - "activities": ActivitiesScreen(self.db_manager), - "kimai": KimaiScreen(self.db_manager) + "categories": CategoriesScreen(), + "activities": ActivitiesScreen(), + "kimai": KimaiScreen(), } super().__init__() @@ -258,4 +321,4 @@ class HamsterToolsApp(App): def action_quit(self) -> None: self.exit() - self.db_manager.close() + db.close() diff --git a/hamstertools/db.py b/hamstertools/db.py index 6712900..336e47b 100644 --- a/hamstertools/db.py +++ b/hamstertools/db.py @@ -1,276 +1,61 @@ -import sqlite3 +import logging +from peewee import SqliteDatabase, Model, CharField, DateField, ForeignKeyField + +from textual.logging import TextualHandler + +logger = logging.getLogger('peewee') +logger.addHandler(TextualHandler()) +logger.setLevel(logging.DEBUG) + +db = SqliteDatabase(None) -class DatabaseManager: - def __init__(self, database_name): - self.conn = sqlite3.connect(database_name) - self.cursor = self.conn.cursor() +class HamsterCategory(Model): + name = CharField() - def get_conn(self): - return self.conn - - def get_cursor(self): - return self.cursor - - def close(self): - self.conn.close() + class Meta: + database = db + table_name = 'categories' -class BaseORM: - def __init__(self, db_manager, table_name, id, **kwargs): - self.db_manager = db_manager - self.conn = db_manager.get_conn() - self.cursor = db_manager.get_cursor() - self.id = id - self.table_name = table_name - for key, value in kwargs.items(): - setattr(self, key, value) +class HamsterActivity(Model): + name = CharField() + category = ForeignKeyField(HamsterCategory, backref='activities') - def delete(self): - self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,)) - self.conn.commit() + class Meta: + database = db + table_name = 'activities' -class Category(BaseORM): - def __init__(self, db_manager, id, name, activity_count): - super().__init__( - db_manager, "categories", id, name=name, activity_count=activity_count - ) +class HamsterFact(Model): + activity = ForeignKeyField(HamsterActivity, backref='facts') - @staticmethod - def list(db_manager, filter_query=None): - cursor = db_manager.get_cursor() - where = "" - if filter_query is not None: - where = "WHERE categories.name LIKE ?" - sql = f""" - SELECT - categories.id, - COALESCE(categories.name, ""), - COUNT(activities.id) AS activity_count - FROM - categories - LEFT JOIN - activities - ON - categories.id = activities.category_id - {where} - GROUP BY - categories.id - """ - if filter_query is not None: - cursor.execute(sql, ("%{}%".format(filter_query),)) - else: - cursor.execute(sql) - rows = cursor.fetchall() - return [Category(db_manager, row[0], row[1], row[2]) for row in rows] - - @staticmethod - def get_by_id(db_manager, category_id): - cursor = db_manager.get_cursor() - cursor.execute( - """ - SELECT - categories.id, - categories.name, - COUNT(activities.id) AS activity_count - FROM - categories - LEFT JOIN - activities - ON - categories.id = activities.category_id - WHERE - categories.id = ? - """, - (category_id,), - ) - - row = cursor.fetchone() - if row: - return Category(db_manager, row[0], row[1], row[2]) - return None + class Meta: + database = db + table_name = 'facts' -class Activity(BaseORM): - def __init__(self, db_manager, id, name, category_id, category_name, facts_count): - super().__init__( - db_manager, "activities", id, name=name, category_id=category_id - ) - self.category_name = category_name - self.facts_count = facts_count +class KimaiCustomer(Model): + name = CharField() - def move_facts(self, to_activity): - cursor = self.db_manager.get_cursor() - - print(f"moving from {self.id} to {to_activity.id}") - - cursor.execute( - """ - UPDATE - facts - SET - activity_id = ? - WHERE - activity_id = ? - """, - (to_activity.id, self.id), - ) - - self.conn.commit() - - @staticmethod - def list(db_manager, filter_query=None): - cursor = db_manager.get_cursor() - where = "" - if filter_query is not None: - where = "WHERE categories.name LIKE ? or activities.name like ?" - sql = f""" - SELECT - activities.id, - activities.name, - categories.id, - COALESCE(categories.name, ""), - COUNT(facts.id) AS facts_count - FROM - activities - LEFT JOIN - categories - ON - activities.category_id = categories.id - LEFT JOIN - facts - ON - activities.id = facts.activity_id - {where} - GROUP BY - activities.id - """ - - if filter_query is not None: - cursor.execute(sql, ("%{}%".format(filter_query),) * 2) - else: - cursor.execute(sql) - - rows = cursor.fetchall() - return [ - Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows - ] - - @staticmethod - def get_by_id(db_manager, activity_id): - cursor = db_manager.get_cursor() - cursor.execute( - """ - SELECT - activities.id, - activities.name, - categories.id, - COALESCE(categories.name, ""), - COUNT(facts.id) AS facts_count - FROM - activities - LEFT JOIN - categories - ON - activities.category_id = categories.id - LEFT JOIN - facts - ON - activities.id = facts.activity_id - WHERE - activities.id = ? - """, - (activity_id,), - ) - - row = cursor.fetchone() - if row: - return Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) - return None + class Meta: + database = db + table_name = 'kimai_customers' -class Fact(BaseORM): - def __init__(self, db_manager, id, activity_id): - super().__init__(db_manager, "facts", id, activity_id=activity_id) +class KimaiProject(Model): + name = CharField() + customer = ForeignKeyField(KimaiCustomer, backref='projects') - @staticmethod - def list(db_manager): - cursor = db_manager.get_cursor() - cursor.execute("SELECT * FROM facts") - rows = cursor.fetchall() - return [Fact(db_manager, row[0], row[1]) for row in rows] + class Meta: + database = db + table_name = 'kimai_projects' -class KimaiCustomer(BaseORM): - def __init__(self, db_manager, id, name): - super().__init__(db_manager, "kimai_customers", id, name=name) +class KimaiActivity(Model): + name = CharField() + project = ForeignKeyField(KimaiProject, backref='activities') - def save(self): - cursor = self.db_manager.get_cursor() - cursor.execute("SELECT id FROM kimai_customers WHERE id = ?", (self.id,)) - row = cursor.fetchone() - if row: - cursor.execute(""" - UPDATE kimai_customers SET name = ? WHERE id = ? - """, (self.name, self.id)) - else: - cursor.execute(""" - INSERT INTO kimai_customers (id, name) VALUES (?, ?) - """, (self.id, self.name)) - self.db_manager.get_conn().commit() - - - -class KimaiProject(BaseORM): - def __init__(self, db_manager, id, name, customer_id, customer_name): - super().__init__(db_manager, "kimai_projects", id, name=name, - customer_id=customer_id, customer_name=customer_name) - - def save(self): - cursor = self.db_manager.get_cursor() - cursor.execute("SELECT id FROM kimai_projects WHERE id = ?", (self.id,)) - row = cursor.fetchone() - if row: - cursor.execute(""" - UPDATE kimai_projects SET name = ?, customer_id = ? WHERE id = ? - """, (self.name, self.customer_id, self.id)) - else: - cursor.execute(""" - INSERT INTO kimai_projects (id, name, customer_id) VALUES (?, ?, ?) - """, (self.id, self.name, self.customer_id)) - self.db_manager.get_conn().commit() - - @staticmethod - def list(db_manager, filter_query=None): - cursor = db_manager.get_cursor() - where = "" - if filter_query is not None: - where = "WHERE kimai_projects.name LIKE ? or kimai_customers.name like ?" - - sql = f""" - SELECT - kimai_projects.id, - COALESCE(kimai_projects.name, ""), - kimai_customers.id, - COALESCE(kimai_customers.name, "") - FROM - kimai_projects - LEFT JOIN - kimai_customers - ON - kimai_customers.id = kimai_projects.customer_id - {where} - """ - - if filter_query is not None: - cursor.execute(sql, ("%{}%".format(filter_query),) * 2) - else: - cursor.execute(sql) - - rows = cursor.fetchall() - return [KimaiProject(db_manager, row[0], row[1], row[2], row[3]) for row in rows] - - -class KimaiACtivity(BaseORM): - pass + class Meta: + database = db + table_name = 'kimai_activities'