Refucktoring

This commit is contained in:
3wc 2023-11-03 00:13:54 +00:00
parent ce68a03ea9
commit ff013525af
3 changed files with 5 additions and 656 deletions

View File

@ -1,660 +1,9 @@
from datetime import datetime from textual.app import App
from textual import on from .db import db
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Grid
from textual.events import DescendantBlur
from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import Screen, ModalScreen
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem from .screens.hamster import CategoryListScreen, ActivityListScreen
from .screens.kimai import KimaiProjectListScreen
from peewee import fn, JOIN
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .kimai import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
from .sync import sync
class ListScreen(Screen):
def compose(self) -> ComposeResult:
yield Header()
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
yield Footer()
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
customer = ""
project = ""
activity = ""
description = ""
tags = ""
def __init__(self, category, activity, mapping=None):
self.hamster_category = category
self.hamster_activity = activity
if mapping is not None:
self.customer_id = mapping.kimai_customer_id
self.customer = mapping.kimai_customer.name
self.project_id = mapping.kimai_project_id
self.project = mapping.kimai_project.name
self.activity_id = mapping.kimai_activity_id
self.activity = mapping.kimai_activity.name
self.description = mapping.kimai_description
self.tags = mapping.kimai_tags
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(
placeholder="Type to search...",
id="customer",
value=self.customer,
),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(
placeholder="Type to search...",
id="project",
value=self.project,
),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(
placeholder="Type to search...",
id="activity",
value=self.activity,
),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description", value=self.description),
),
Horizontal(
Label("Tags"),
Input(id="tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move facts"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
mapping = None
try:
mapping = HamsterActivityKimaiMapping.get(
hamster_activity=selected_activity
)
except HamsterActivityKimaiMapping.DoesNotExist:
pass
def handle_mapping(mapping_data):
if mapping_data is None:
return
if mapping is not None:
mapping_ = mapping
for key, value in mapping_data.items():
setattr(mapping_, key, value)
else:
mapping_ = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping_data
)
mapping_.save()
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
mapping=mapping,
),
handle_mapping,
)
class CategoryListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class KimaiProjectListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_query:
projects = projects.where(
KimaiProject.name.contains(filter_query)
| KimaiCustomer.name.contains(filter_query)
)
self.table.add_rows(
[
[
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count,
]
for project in projects
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
class HamsterToolsApp(App): class HamsterToolsApp(App):

View File

@ -1,4 +1,4 @@
from .kimai import ( from .kimaiapi import (
KimaiAPI, KimaiAPI,
Customer as KimaiAPICustomer, Customer as KimaiAPICustomer,
Project as KimaiAPIProject, Project as KimaiAPIProject,