Proper DB ORM, filtering 🎉

This commit is contained in:
3wc 2023-10-27 04:04:30 +01:00
parent dc30727c62
commit 8e5e28ea67
3 changed files with 262 additions and 85 deletions

View File

@ -704,7 +704,8 @@ def _import(username, mapping_path=None, output=None, category_search=None, afte
@cli.command()
def app():
from .app import HamsterToolsApp
app = HamsterToolsApp(db_cursor=c, db_connection=conn)
#app = HamsterToolsApp(db_cursor=c, db_connection=conn)
app = HamsterToolsApp()
app.run()

View File

@ -1,8 +1,12 @@
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, DataTable, Placeholder
from textual.binding import Binding
from textual.widgets import Header, Footer, DataTable, Input
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import Screen
from textual.reactive import reactive
from .db import DatabaseManager, Category, Activity
class ActivitiesScreen(Screen):
BINDINGS = [
@ -10,58 +14,44 @@ class ActivitiesScreen(Screen):
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("d", "delete", "Delete activity"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def __init__(self, db_cursor, db_connection):
self.db_cursor = db_cursor
self.db_connection = db_connection
def __init__(self, db_manager):
self.db_manager = db_manager
super().__init__()
def _refresh(self):
def _refresh(self, filter_query=None):
self.table.clear()
sql = '''
select
categories.id as category_id,
coalesce(categories.name, '') as category_name,
activities.id as activity_id,
activities.name as activity_name,
coalesce(facts_count, 0) as total_facts
from
activities
left join
categories
on
activities.category_id = categories.id
left join (
select
activity_id,
count(*) as facts_count
from
facts
group by
activity_id
) as facts_count_subquery
on
activities.id = facts_count_subquery.activity_id;
'''
# List activities with the count of facts
activities = Activity.list_activities(self.db_manager, filter_query)
results = self.db_cursor.execute(sql)
# results = [[cell or "" for cell in row] for row in self.db_cursor.fetchall()]
self.table.add_rows([[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
] for activity in activities])
self.table.add_rows(results)
self.table.sort(self.columns[1], self.columns[3])
self.table.sort(*self.sort)
def compose(self) -> ComposeResult:
"""create child widgets for the app."""
yield Header()
with Vertical():
yield DataTable()
with Horizontal():
yield Input(id="filter")
yield Footer()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id","category","activity id","activity","entries")
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_refresh(self) -> None:
@ -70,6 +60,17 @@ class ActivitiesScreen(Screen):
def action_sort(self) -> None:
self.table.cursor_type = "column"
def action_filter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = True
filter_input.focus()
print(filter_input)
def action_cancelfilter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = False
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
@ -78,72 +79,65 @@ class ActivitiesScreen(Screen):
Coordinate(self.table.cursor_coordinate.row, 2),
)
sql = 'delete from activities where id = ?'
print(Coordinate(2, self.table.cursor_coordinate.row),)
print(activity_id)
self.db_cursor.execute(sql, (activity_id,))
self.db_connection.commit()
activity = Activity.get_by_id(self.db_manager, activity_id)
activity.delete()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def on_data_table_column_selected(self, event):
event.data_table.sort(event.column_key)
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def on_input_changed(self, event):
self._refresh(event.value)
class CategoriesScreen(Screen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("d", "delete", "Delete category"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def __init__(self, db_cursor, db_connection):
self.db_cursor = db_cursor
self.db_connection = db_connection
filtering = reactive(False)
filter_query = reactive("")
def __init__(self, db_manager):
self.db_manager = db_manager
super().__init__()
def _refresh(self):
def _refresh(self, filter_query=None):
self.table.clear()
sql = '''
select
categories.id as category_id,
coalesce(categories.name, '') as category_name,
coalesce(activities_count, 0) as total_activities
from
categories
left join (
select
category_id,
count(*) as activities_count
from
activities
group by
category_id
) as activities_count_subquery
on
categories.id = activities_count_subquery.category_id;
'''
categories = Category.list_categories(self.db_manager,
filter_query=filter_query)
results = self.db_cursor.execute(sql)
# results = [[cell or "" for cell in row] for row in self.db_cursor.fetchall()]
self.table.add_rows([[
category.id,
category.name,
category.activity_count,
] for category in categories])
self.table.add_rows(results)
self.table.sort(self.columns[1])
self.table.sort(self.sort)
def compose(self) -> ComposeResult:
"""create child widgets for the app."""
yield Header()
with Vertical():
yield DataTable()
with Horizontal():
yield Input(id="filter")
yield Footer()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id","category","activities")
self.sort = self.columns[1]
self._refresh()
def action_refresh(self) -> None:
@ -152,6 +146,20 @@ class CategoriesScreen(Screen):
def action_sort(self) -> None:
self.table.cursor_type = "column"
def action_filter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = True
filter_input.focus()
print(filter_input)
def action_cancelfilter(self) -> None:
filter_input = self.query_one("#filter")
filter_input.display = False
self._refresh()
def on_input_changed(self, event):
self._refresh(event.value)
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
@ -159,19 +167,16 @@ class CategoriesScreen(Screen):
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
sql = 'delete from categories where id = ?'
print(Coordinate(2, self.table.cursor_coordinate.row),)
print(category_id)
self.db_cursor.execute(sql, (category_id,))
self.db_connection.commit()
category = Category.get_by_id(self.db_manager, category_id)
category.delete()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def on_data_table_column_selected(self, event):
event.data_table.sort(event.column_key)
""" Handle column selection for sort """
self.sort = event.column_key
event.data_table.sort(self.sort)
event.data_table.cursor_type = "row"
# class KimaiScreen(Screen):
@ -186,19 +191,20 @@ class CategoriesScreen(Screen):
class HamsterToolsApp(App):
CSS_PATH = 'app.tcss'
BINDINGS = [
("a", "switch_mode('activities')", "Activities"),
("c", "switch_mode('categories')", "Categories"),
# ("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
def __init__(self, db_cursor, db_connection):
self.db_cursor = db_cursor
self.db_connection = db_connection
def __init__(self):
self.db_manager = DatabaseManager('hamster-testing.db')
self.MODES = {
"categories": CategoriesScreen(db_cursor, db_connection),
"activities": ActivitiesScreen(db_cursor, db_connection),
"categories": CategoriesScreen(self.db_manager),
"activities": ActivitiesScreen(self.db_manager)
# "kimai": KimaiScreen,
}
@ -209,4 +215,4 @@ class HamsterToolsApp(App):
def action_quit(self) -> None:
self.exit()
self.db_manager.close()

170
hamstertools/db.py Normal file
View File

@ -0,0 +1,170 @@
import sqlite3
class DatabaseManager:
def __init__(self, database_name):
self.conn = sqlite3.connect(database_name)
self.cursor = self.conn.cursor()
def get_conn(self):
return self.conn
def get_cursor(self):
return self.cursor
def close(self):
self.conn.close()
class BaseORM:
def __init__(self, db_manager, table_name, id, **kwargs):
self.db_manager = db_manager
self.conn = db_manager.get_conn()
self.cursor = db_manager.get_cursor()
self.id = id
self.table_name = table_name
for key, value in kwargs.items():
setattr(self, key, value)
def delete(self):
self.cursor.execute(f"DELETE FROM {self.table_name} WHERE id=?", (self.id,))
self.conn.commit()
class Category(BaseORM):
def __init__(self, db_manager, id, name, activity_count):
super().__init__(db_manager, "categories", id, name=name,
activity_count=activity_count)
@staticmethod
def list_categories(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ?"
sql = f"""
SELECT
categories.id,
COALESCE(categories.name, ""),
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
{where}
GROUP BY
categories.id
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),))
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [Category(db_manager, row[0], row[1], row[2]) for row in rows]
@staticmethod
def get_by_id(db_manager, category_id):
cursor = db_manager.get_cursor()
cursor.execute("""
SELECT
categories.id,
categories.name,
COUNT(activities.id) AS activity_count
FROM
categories
LEFT JOIN
activities
ON
categories.id = activities.category_id
WHERE
categories.id = ?
""", (category_id,))
row = cursor.fetchone()
if row:
return Category(db_manager, row[0], row[1], row[2])
return None
class Activity(BaseORM):
def __init__(self, db_manager, id, name, category_id, category_name, facts_count):
super().__init__(db_manager, "activities", id, name=name, category_id=category_id)
self.category_name = category_name
self.facts_count = facts_count
@staticmethod
def list_activities(db_manager, filter_query=None):
cursor = db_manager.get_cursor()
where = ""
if filter_query is not None:
where = "WHERE categories.name LIKE ? or activities.name like ?"
sql = f"""
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
{where}
GROUP BY
activities.id
"""
if filter_query is not None:
cursor.execute(sql, ("%{}%".format(filter_query),) * 2 )
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [Activity(db_manager, row[0], row[1], row[2], row[3], row[4]) for row in rows]
@staticmethod
def get_by_id(db_manager, activity_id):
cursor = db_manager.get_cursor()
cursor.execute("""
SELECT
activities.id,
activities.name,
categories.id,
COALESCE(categories.name, ""),
COUNT(facts.id) AS facts_count
FROM
activities
LEFT JOIN
categories
ON
activities.category_id = categories.id
LEFT JOIN
facts
ON
activities.id = facts.activity_id
WHERE
activities.id = ?
""", (activity_id,))
row = cursor.fetchone()
if row:
return Activity(db_manager, row[0], row[1], row[2], row[3], row[4])
return None
class Fact(BaseORM):
def __init__(self, db_manager, id, activity_id):
super().__init__(db_manager, "facts", id, activity_id=activity_id)
@staticmethod
def list_facts(db_manager):
cursor = db_manager.get_cursor()
cursor.execute("SELECT * FROM facts")
rows = cursor.fetchall()
return [Fact(db_manager, row[0], row[1]) for row in rows]