import sqlite3 class DatabaseManager: def __init__(self, database_name): self.conn = sqlite3.connect(database_name) self.cursor = self.conn.cursor() def get_conn(self): return self.conn def get_cursor(self): return self.cursor def close(self): self.conn.close() class BaseORM: def __init__(self, db_manager, table_name, id, **kwargs): self.db_manager = db_manager self.conn = db_manager.get_conn() self.cursor = db_manager.get_cursor() self.id = id self.table_name = table_name for key, value in kwargs.items(): setattr(self, key, value) def delete(self): self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,)) self.conn.commit() class Category(BaseORM): def __init__(self, db_manager, id, name, activity_count): super().__init__( db_manager, "categories", id, name=name, activity_count=activity_count ) @staticmethod def list(db_manager, filter_query=None): cursor = db_manager.get_cursor() where = "" if filter_query is not None: where = "WHERE categories.name LIKE ?" sql = f""" SELECT categories.id, COALESCE(categories.name, ""), COUNT(activities.id) AS activity_count FROM categories LEFT JOIN activities ON categories.id = activities.category_id {where} GROUP BY categories.id """ if filter_query is not None: cursor.execute(sql, ("%{}%".format(filter_query),)) else: cursor.execute(sql) rows = cursor.fetchall() return [Category(db_manager, row[0], row[1], row[2]) for row in rows] @staticmethod def get_by_id(db_manager, category_id): cursor = db_manager.get_cursor() cursor.execute( """ SELECT categories.id, categories.name, COUNT(activities.id) AS activity_count FROM categories LEFT JOIN activities ON categories.id = activities.category_id WHERE categories.id = ? """, (category_id,), ) row = cursor.fetchone() if row: return Category(db_manager, row[0], row[1], row[2]) return None class Activity(BaseORM): def __init__(self, db_manager, id, name, category_id, category_name, facts_count): super().__init__( db_manager, "activities", id, name=name, category_id=category_id ) self.category_name = category_name self.facts_count = facts_count def move_facts(self, to_activity): cursor = self.db_manager.get_cursor() print(f"moving from {self.id} to {to_activity.id}") cursor.execute( """ UPDATE facts SET activity_id = ? WHERE activity_id = ? """, (to_activity.id, self.id), ) self.conn.commit() @staticmethod def list(db_manager, filter_query=None): cursor = db_manager.get_cursor() where = "" if filter_query is not None: where = "WHERE categories.name LIKE ? or activities.name like ?" sql = f""" SELECT activities.id, activities.name, categories.id, COALESCE(categories.name, ""), COUNT(facts.id) AS facts_count FROM activities LEFT JOIN categories ON activities.category_id = categories.id LEFT JOIN facts ON activities.id = facts.activity_id {where} GROUP BY activities.id """ if filter_query is not None: cursor.execute(sql, ("%{}%".format(filter_query),) * 2) else: cursor.execute(sql) rows = cursor.fetchall() return [ Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows ] @staticmethod def get_by_id(db_manager, activity_id): cursor = db_manager.get_cursor() cursor.execute( """ SELECT activities.id, activities.name, categories.id, COALESCE(categories.name, ""), COUNT(facts.id) AS facts_count FROM activities LEFT JOIN categories ON activities.category_id = categories.id LEFT JOIN facts ON activities.id = facts.activity_id WHERE activities.id = ? """, (activity_id,), ) row = cursor.fetchone() if row: return Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) return None class Fact(BaseORM): def __init__(self, db_manager, id, activity_id): super().__init__(db_manager, "facts", id, activity_id=activity_id) @staticmethod def list(db_manager): cursor = db_manager.get_cursor() cursor.execute("SELECT * FROM facts") rows = cursor.fetchall() return [Fact(db_manager, row[0], row[1]) for row in rows] class KimaiCustomer(BaseORM): def __init__(self, db_manager, id, name): super().__init__(db_manager, "kimai_customers", id, name=name) def save(self): cursor = self.db_manager.get_cursor() cursor.execute("SELECT id FROM kimai_customers WHERE id = ?", (self.id,)) row = cursor.fetchone() if row: cursor.execute(""" UPDATE kimai_customers SET name = ? WHERE id = ? """, (self.name, self.id)) else: cursor.execute(""" INSERT INTO kimai_customers (id, name) VALUES (?, ?) """, (self.id, self.name)) self.db_manager.get_conn().commit() class KimaiProject(BaseORM): def __init__(self, db_manager, id, name, customer_id, customer_name): super().__init__(db_manager, "kimai_projects", id, name=name, customer_id=customer_id, customer_name=customer_name) def save(self): cursor = self.db_manager.get_cursor() cursor.execute("SELECT id FROM kimai_projects WHERE id = ?", (self.id,)) row = cursor.fetchone() if row: cursor.execute(""" UPDATE kimai_projects SET name = ?, customer_id = ? WHERE id = ? """, (self.name, self.customer_id, self.id)) else: cursor.execute(""" INSERT INTO kimai_projects (id, name, customer_id) VALUES (?, ?, ?) """, (self.id, self.name, self.customer_id)) self.db_manager.get_conn().commit() @staticmethod def list(db_manager, filter_query=None): cursor = db_manager.get_cursor() where = "" if filter_query is not None: where = "WHERE kimai_projects.name LIKE ? or kimai_customers.name like ?" sql = f""" SELECT kimai_projects.id, COALESCE(kimai_projects.name, ""), kimai_customers.id, COALESCE(kimai_customers.name, "") FROM kimai_projects LEFT JOIN kimai_customers ON kimai_customers.id = kimai_projects.customer_id {where} """ if filter_query is not None: cursor.execute(sql, ("%{}%".format(filter_query),) * 2) else: cursor.execute(sql) rows = cursor.fetchall() return [KimaiProject(db_manager, row[0], row[1], row[2], row[3]) for row in rows] class KimaiACtivity(BaseORM): pass