hamster-tools/hamstertools/db.py

203 lines
5.6 KiB
Python
Raw Normal View History

2023-10-27 03:04:30 +00:00
import sqlite3
2023-10-27 18:32:16 +00:00
2023-10-27 03:04:30 +00:00
class DatabaseManager:
def __init__(self, database_name):
self.conn = sqlite3.connect(database_name)
self.cursor = self.conn.cursor()
def get_conn(self):
return self.conn
def get_cursor(self):
return self.cursor
def close(self):
self.conn.close()
2023-10-27 18:32:16 +00:00
2023-10-27 03:04:30 +00:00
class BaseORM:
def __init__(self, db_manager, table_name, id, **kwargs):
self.db_manager = db_manager
self.conn = db_manager.get_conn()
self.cursor = db_manager.get_cursor()
self.id = id
self.table_name = table_name
for key, value in kwargs.items():
setattr(self, key, value)
def delete(self):
self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,))
self.conn.commit()
class Category(BaseORM):
def __init__(self, db_manager, id, name, activity_count):
2023-10-27 18:32:16 +00:00
super().__init__(
db_manager, "categories", id, name=name, activity_count=activity_count
)
2023-10-27 03:04:30 +00:00
@staticmethod
def list_categories(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ?"
sql = f"""
SELECT
categories.id,
COALESCE(categories.name, ""),
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
{where}
GROUP BY
categories.id
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),))
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [Category(db_manager, row[0], row[1], row[2]) for row in rows]
@staticmethod
def get_by_id(db_manager, category_id):
cursor = db_manager.get_cursor()
2023-10-27 18:32:16 +00:00
cursor.execute(
"""
2023-10-27 03:04:30 +00:00
SELECT
categories.id,
categories.name,
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
WHERE
categories.id = ?
2023-10-27 18:32:16 +00:00
""",
(category_id,),
)
2023-10-27 03:04:30 +00:00
row = cursor.fetchone()
if row:
return Category(db_manager, row[0], row[1], row[2])
return None
class Activity(BaseORM):
def __init__(self, db_manager, id, name, category_id, category_name, facts_count):
2023-10-27 18:32:16 +00:00
super().__init__(
db_manager, "activities", id, name=name, category_id=category_id
)
2023-10-27 03:04:30 +00:00
self.category_name = category_name
self.facts_count = facts_count
2023-10-27 15:28:46 +00:00
def move_facts(self, to_activity):
cursor = self.db_manager.get_cursor()
print(f"moving from {self.id} to {to_activity.id}")
2023-10-27 18:32:16 +00:00
cursor.execute(
"""
2023-10-27 15:28:46 +00:00
UPDATE
facts
SET
activity_id = ?
WHERE
activity_id = ?
2023-10-27 18:32:16 +00:00
""",
(to_activity.id, self.id),
)
2023-10-27 15:28:46 +00:00
self.conn.commit()
2023-10-27 03:04:30 +00:00
@staticmethod
def list_activities(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ? or activities.name like ?"
sql = f"""
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
{where}
GROUP BY
activities.id
"""
if filter_query is not None:
2023-10-27 15:28:46 +00:00
cursor.execute(sql, ("%{}%".format(filter_query),) * 2)
2023-10-27 03:04:30 +00:00
else:
cursor.execute(sql)
rows = cursor.fetchall()
2023-10-27 18:32:16 +00:00
return [
Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows
]
2023-10-27 03:04:30 +00:00
@staticmethod
def get_by_id(db_manager, activity_id):
cursor = db_manager.get_cursor()
2023-10-27 18:32:16 +00:00
cursor.execute(
"""
2023-10-27 03:04:30 +00:00
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
WHERE
activities.id = ?
2023-10-27 18:32:16 +00:00
""",
(activity_id,),
)
2023-10-27 03:04:30 +00:00
row = cursor.fetchone()
if row:
return Activity(db_manager, row[0], row[1], row[2], row[3], row[4])
return None
class Fact(BaseORM):
def __init__(self, db_manager, id, activity_id):
super().__init__(db_manager, "facts", id, activity_id=activity_id)
@staticmethod
def list_facts(db_manager):
cursor = db_manager.get_cursor()
cursor.execute("SELECT * FROM facts")
rows = cursor.fetchall()
return [Fact(db_manager, row[0], row[1]) for row in rows]