Switch to peewee ORM

This commit is contained in:
3wc 2023-10-28 00:42:30 +01:00
parent 65f16a252c
commit cd278b32aa
2 changed files with 142 additions and 294 deletions

View File

@ -5,15 +5,26 @@ from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import Screen
from .db import DatabaseManager, Category, Activity, KimaiProject, KimaiCustomer
from .kimai import KimaiAPI, Customer as KimaiAPICustomer, Project as KimaiAPIProject, Activity as KimaiAPIActivity
from peewee import fn, JOIN
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from .kimai import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
class ListScreen(Screen):
def __init__(self, db_manager):
self.db_manager = db_manager
super().__init__()
def compose(self) -> ComposeResult:
"""create child widgets for the app."""
yield Header()
@ -67,14 +78,29 @@ class ActivitiesScreen(ListScreen):
def _refresh(self, filter_query=None):
self.table.clear()
# List activities with the count of facts
activities = Activity.list(self.db_manager, filter_query)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory,
fn.Count(HamsterFact.id).alias("facts_count")
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterActivity)
)
if filter_query:
activities = activities.where(
HamsterActivity.name.contains(filter_query)
| HamsterCategory.name.contains(filter_query)
)
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.category.id,
activity.category.name,
activity.id,
activity.name,
activity.facts_count,
@ -102,8 +128,8 @@ class ActivitiesScreen(ListScreen):
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = Activity.get_by_id(self.db_manager, activity_id)
activity.delete()
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
@ -111,7 +137,7 @@ class ActivitiesScreen(ListScreen):
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = Activity.get_by_id(self.db_manager, row_cells[2])
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
@ -122,10 +148,12 @@ class ActivitiesScreen(ListScreen):
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = Activity.get_by_id(
self.db_manager, self.table.get_cell_at(Coordinate(event.cursor_row, 2))
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
self.move_from_activity.move_facts(move_to_activity)
HamsterFact.update({HamsterFact.activity:
move_to_activity}).where(HamsterFact.activity ==
self.move_from_activity).execute()
filter_input = self.query_one("#filter")
self._refresh(filter_input.value)
del self.move_from_activity
@ -143,8 +171,18 @@ class CategoriesScreen(ListScreen):
def _refresh(self, filter_query=None):
self.table.clear()
categories = Category.list(
self.db_manager, filter_query=filter_query
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count")
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
if filter_query:
categories = categories.where(
HamsterCategory.name.contains(filter_query)
)
self.table.add_rows(
@ -152,7 +190,7 @@ class CategoriesScreen(ListScreen):
[
category.id,
category.name,
category.activity_count,
category.activities_count,
]
for category in categories
]
@ -174,8 +212,8 @@ class CategoriesScreen(ListScreen):
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = Category.get_by_id(self.db_manager, category_id)
category.delete()
category = HamsterCategory.get(id=category_id)
category.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
@ -193,15 +231,33 @@ class KimaiScreen(ListScreen):
def _refresh(self, filter_query=None):
self.table.clear()
projects = KimaiProject.list(self.db_manager, filter_query)
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count")
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.group_by(KimaiProject)
)
if filter_query:
projects = projects.where(
KimaiProject.name.contains(filter_query)
| KimaiCustomer.name.contains(filter_query)
)
self.table.add_rows(
[
[
project.customer_id,
project.customer_name,
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count
]
for project in projects
]
@ -213,21 +269,28 @@ class KimaiScreen(ListScreen):
api = KimaiAPI()
customers = KimaiAPICustomer.list(api)
for customer in customers:
KimaiCustomer(self.db_manager, id=customer.id, name=customer.name).save()
with db.atomic():
KimaiCustomer.insert_many([{
'id': customer.id,
'name': customer.name
} for customer in customers]).execute()
projects = KimaiAPIProject.list(api)
for project in projects:
KimaiProject(self.db_manager, id=project.id, name=project.name,
customer_id=project.customer.id, customer_name="").save()
with db.atomic():
KimaiProject.insert_many([{
'id': project.id,
'name': project.name,
'customer_id': project.customer.id
} for project in projects]).execute()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("customer id", "customer",
"project id", "project")
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
@ -243,12 +306,12 @@ class HamsterToolsApp(App):
]
def __init__(self):
self.db_manager = DatabaseManager("hamster-testing.db")
db.init("hamster-testing.db")
self.MODES = {
"categories": CategoriesScreen(self.db_manager),
"activities": ActivitiesScreen(self.db_manager),
"kimai": KimaiScreen(self.db_manager)
"categories": CategoriesScreen(),
"activities": ActivitiesScreen(),
"kimai": KimaiScreen(),
}
super().__init__()
@ -258,4 +321,4 @@ class HamsterToolsApp(App):
def action_quit(self) -> None:
self.exit()
self.db_manager.close()
db.close()

View File

@ -1,276 +1,61 @@
import sqlite3
import logging
from peewee import SqliteDatabase, Model, CharField, DateField, ForeignKeyField
from textual.logging import TextualHandler
logger = logging.getLogger('peewee')
logger.addHandler(TextualHandler())
logger.setLevel(logging.DEBUG)
db = SqliteDatabase(None)
class DatabaseManager:
def __init__(self, database_name):
self.conn = sqlite3.connect(database_name)
self.cursor = self.conn.cursor()
class HamsterCategory(Model):
name = CharField()
def get_conn(self):
return self.conn
def get_cursor(self):
return self.cursor
def close(self):
self.conn.close()
class Meta:
database = db
table_name = 'categories'
class BaseORM:
def __init__(self, db_manager, table_name, id, **kwargs):
self.db_manager = db_manager
self.conn = db_manager.get_conn()
self.cursor = db_manager.get_cursor()
self.id = id
self.table_name = table_name
for key, value in kwargs.items():
setattr(self, key, value)
class HamsterActivity(Model):
name = CharField()
category = ForeignKeyField(HamsterCategory, backref='activities')
def delete(self):
self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,))
self.conn.commit()
class Meta:
database = db
table_name = 'activities'
class Category(BaseORM):
def __init__(self, db_manager, id, name, activity_count):
super().__init__(
db_manager, "categories", id, name=name, activity_count=activity_count
)
class HamsterFact(Model):
activity = ForeignKeyField(HamsterActivity, backref='facts')
@staticmethod
def list(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ?"
sql = f"""
SELECT
categories.id,
COALESCE(categories.name, ""),
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
{where}
GROUP BY
categories.id
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),))
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [Category(db_manager, row[0], row[1], row[2]) for row in rows]
@staticmethod
def get_by_id(db_manager, category_id):
cursor = db_manager.get_cursor()
cursor.execute(
"""
SELECT
categories.id,
categories.name,
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
WHERE
categories.id = ?
""",
(category_id,),
)
row = cursor.fetchone()
if row:
return Category(db_manager, row[0], row[1], row[2])
return None
class Meta:
database = db
table_name = 'facts'
class Activity(BaseORM):
def __init__(self, db_manager, id, name, category_id, category_name, facts_count):
super().__init__(
db_manager, "activities", id, name=name, category_id=category_id
)
self.category_name = category_name
self.facts_count = facts_count
class KimaiCustomer(Model):
name = CharField()
def move_facts(self, to_activity):
cursor = self.db_manager.get_cursor()
print(f"moving from {self.id} to {to_activity.id}")
cursor.execute(
"""
UPDATE
facts
SET
activity_id = ?
WHERE
activity_id = ?
""",
(to_activity.id, self.id),
)
self.conn.commit()
@staticmethod
def list(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ? or activities.name like ?"
sql = f"""
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
{where}
GROUP BY
activities.id
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),) * 2)
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [
Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows
]
@staticmethod
def get_by_id(db_manager, activity_id):
cursor = db_manager.get_cursor()
cursor.execute(
"""
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
WHERE
activities.id = ?
""",
(activity_id,),
)
row = cursor.fetchone()
if row:
return Activity(db_manager, row[0], row[1], row[2], row[3], row[4])
return None
class Meta:
database = db
table_name = 'kimai_customers'
class Fact(BaseORM):
def __init__(self, db_manager, id, activity_id):
super().__init__(db_manager, "facts", id, activity_id=activity_id)
class KimaiProject(Model):
name = CharField()
customer = ForeignKeyField(KimaiCustomer, backref='projects')
@staticmethod
def list(db_manager):
cursor = db_manager.get_cursor()
cursor.execute("SELECT * FROM facts")
rows = cursor.fetchall()
return [Fact(db_manager, row[0], row[1]) for row in rows]
class Meta:
database = db
table_name = 'kimai_projects'
class KimaiCustomer(BaseORM):
def __init__(self, db_manager, id, name):
super().__init__(db_manager, "kimai_customers", id, name=name)
class KimaiActivity(Model):
name = CharField()
project = ForeignKeyField(KimaiProject, backref='activities')
def save(self):
cursor = self.db_manager.get_cursor()
cursor.execute("SELECT id FROM kimai_customers WHERE id = ?", (self.id,))
row = cursor.fetchone()
if row:
cursor.execute("""
UPDATE kimai_customers SET name = ? WHERE id = ?
""", (self.name, self.id))
else:
cursor.execute("""
INSERT INTO kimai_customers (id, name) VALUES (?, ?)
""", (self.id, self.name))
self.db_manager.get_conn().commit()
class KimaiProject(BaseORM):
def __init__(self, db_manager, id, name, customer_id, customer_name):
super().__init__(db_manager, "kimai_projects", id, name=name,
customer_id=customer_id, customer_name=customer_name)
def save(self):
cursor = self.db_manager.get_cursor()
cursor.execute("SELECT id FROM kimai_projects WHERE id = ?", (self.id,))
row = cursor.fetchone()
if row:
cursor.execute("""
UPDATE kimai_projects SET name = ?, customer_id = ? WHERE id = ?
""", (self.name, self.customer_id, self.id))
else:
cursor.execute("""
INSERT INTO kimai_projects (id, name, customer_id) VALUES (?, ?, ?)
""", (self.id, self.name, self.customer_id))
self.db_manager.get_conn().commit()
@staticmethod
def list(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE kimai_projects.name LIKE ? or kimai_customers.name like ?"
sql = f"""
SELECT
kimai_projects.id,
COALESCE(kimai_projects.name, ""),
kimai_customers.id,
COALESCE(kimai_customers.name, "")
FROM
kimai_projects
LEFT JOIN
kimai_customers
ON
kimai_customers.id = kimai_projects.customer_id
{where}
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),) * 2)
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [KimaiProject(db_manager, row[0], row[1], row[2], row[3]) for row in rows]
class KimaiACtivity(BaseORM):
pass
class Meta:
database = db
table_name = 'kimai_activities'