495 lines
17 KiB
Python
495 lines
17 KiB
Python
|
|
from datetime import datetime
|
|
from peewee import JOIN, fn
|
|
from textual import on
|
|
from textual.app import ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Grid, Horizontal, Vertical
|
|
from textual.coordinate import Coordinate
|
|
from textual.events import DescendantBlur
|
|
from textual.screen import ModalScreen
|
|
from textual.widgets import Button, Checkbox, DataTable, Input, Label
|
|
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
|
|
|
|
from hamstertools.db import HamsterActivity, HamsterActivityKimaiMapping, HamsterCategory, HamsterFact, KimaiActivity, KimaiCustomer, KimaiProject
|
|
from hamstertools.screens.list import ListPane
|
|
|
|
|
|
class ActivityEditScreen(ModalScreen):
|
|
BINDINGS = [
|
|
("escape", "cancel", "Cancel"),
|
|
("ctrl+s", "save", "Save"),
|
|
]
|
|
|
|
category_id = None
|
|
category_name = ""
|
|
|
|
def _get_categories(self, input_state):
|
|
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
|
|
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
|
|
|
|
def __init__(self, category, activity):
|
|
if category is not None:
|
|
self.category_id = category.id
|
|
self.category_name = category.name
|
|
self.activity_name = activity.name
|
|
super().__init__()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Vertical(
|
|
Horizontal(
|
|
Label("Category:"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="category",
|
|
value=self.category_name,
|
|
),
|
|
Dropdown(items=self._get_categories),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Activity:"), Input(value=self.activity_name, id="activity")
|
|
),
|
|
)
|
|
|
|
@on(Input.Submitted, "#category")
|
|
def category_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.category_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#category")
|
|
def category_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.category_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
def action_cancel(self):
|
|
self.dismiss(None)
|
|
|
|
def action_save(self):
|
|
self.dismiss(
|
|
{
|
|
"category": self.category_id,
|
|
"activity": self.query_one("#activity").value,
|
|
}
|
|
)
|
|
|
|
|
|
class ActivityMappingScreen(ModalScreen):
|
|
BINDINGS = [
|
|
("ctrl+g", "global", "Toggle global"),
|
|
("ctrl+s", "save", "Save"),
|
|
("escape", "cancel", "Cancel"),
|
|
]
|
|
|
|
customer_id = None
|
|
project_id = None
|
|
activity_id = None
|
|
customer = ""
|
|
project = ""
|
|
activity = ""
|
|
description = ""
|
|
tags = ""
|
|
|
|
def __init__(self, category, activity, mapping=None):
|
|
self.hamster_category = category
|
|
self.hamster_activity = activity
|
|
|
|
if mapping is not None:
|
|
self.customer_id = mapping.kimai_customer_id
|
|
self.customer = mapping.kimai_customer.name
|
|
self.project_id = mapping.kimai_project_id
|
|
self.project = mapping.kimai_project.name
|
|
self.activity_id = mapping.kimai_activity_id
|
|
self.activity = mapping.kimai_activity.name
|
|
self.description = mapping.kimai_description
|
|
self.tags = mapping.kimai_tags
|
|
|
|
super().__init__()
|
|
|
|
@staticmethod
|
|
def _filter_dropdowns(options, value):
|
|
matches = [c for c in options if value.lower() in c.main.plain.lower()]
|
|
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
|
|
|
|
def _get_customers(self, input_state):
|
|
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
|
|
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
|
|
|
|
def _get_projects(self, input_state):
|
|
projects = [
|
|
DropdownItem(p.name, str(p.id))
|
|
for p in KimaiProject.select().where(
|
|
KimaiProject.customer_id == self.customer_id
|
|
)
|
|
]
|
|
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
|
|
|
|
def _get_activities(self, input_state):
|
|
activities = KimaiActivity.select()
|
|
|
|
if self.query_one("#global").value:
|
|
activities = activities.where(
|
|
KimaiActivity.project_id.is_null(),
|
|
)
|
|
else:
|
|
activities = activities.where(KimaiActivity.project_id == self.project_id)
|
|
|
|
return ActivityMappingScreen._filter_dropdowns(
|
|
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
|
|
)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Vertical(
|
|
Horizontal(
|
|
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
|
|
),
|
|
Horizontal(
|
|
Label("Customer"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="customer",
|
|
value=self.customer,
|
|
),
|
|
Dropdown(items=self._get_customers),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Project"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="project",
|
|
value=self.project,
|
|
),
|
|
Dropdown(items=self._get_projects),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Activity"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="activity",
|
|
value=self.activity,
|
|
),
|
|
Dropdown(items=self._get_activities),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Description"),
|
|
Input(id="description", value=self.description),
|
|
),
|
|
Horizontal(
|
|
Label("Tags"),
|
|
Input(id="activity_tags", value=self.tags),
|
|
),
|
|
Horizontal(Checkbox("Global", id="global")),
|
|
)
|
|
|
|
@on(Input.Submitted, "#customer")
|
|
def customer_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.customer_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#project").focus()
|
|
|
|
@on(DescendantBlur, "#customer")
|
|
def customer_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.customer_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
@on(Input.Submitted, "#project")
|
|
def project_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#project")
|
|
def project_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
|
|
|
|
@on(Input.Submitted, "#activity")
|
|
def activity_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.activity_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#activity")
|
|
def activity_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.activity_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
def action_global(self):
|
|
self.query_one("#global").value = not self.query_one("#global").value
|
|
|
|
def action_save(self):
|
|
self.dismiss(
|
|
{
|
|
"kimai_customer_id": self.customer_id,
|
|
"kimai_project_id": self.project_id,
|
|
"kimai_activity_id": self.activity_id,
|
|
"kimai_description": self.query_one("#description").value,
|
|
"kimai_tags": self.query_one("#activity_tags").value,
|
|
"global": self.query_one("#global").value,
|
|
}
|
|
)
|
|
|
|
def action_cancel(self):
|
|
self.dismiss(None)
|
|
|
|
|
|
class ActivityDeleteConfirmScreen(ModalScreen):
|
|
BINDINGS = [
|
|
("escape", "cancel", "Cancel"),
|
|
]
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Grid(
|
|
Label("Are you sure you want to delete this activity?", id="question"),
|
|
Button("Confirm", variant="error", id="confirm"),
|
|
Button("Cancel", variant="primary", id="cancel"),
|
|
id="dialog",
|
|
)
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "quit":
|
|
self.dismiss(True)
|
|
else:
|
|
self.dismiss(False)
|
|
|
|
def action_cancel(self):
|
|
self.dismiss(False)
|
|
|
|
|
|
class ActivityList(ListPane):
|
|
BINDINGS = [
|
|
("s", "sort", "Sort"),
|
|
("r", "refresh", "Refresh"),
|
|
("/", "filter", "Search"),
|
|
("d", "delete", "Delete"),
|
|
("f", "move_facts", "Move"),
|
|
("e", "edit", "Edit"),
|
|
("m", "mapping", "Mapping"),
|
|
Binding(key="escape", action="cancelfilter", show=False),
|
|
]
|
|
|
|
def _refresh(self):
|
|
self.table.clear()
|
|
|
|
facts_count_query = (
|
|
HamsterFact.select(
|
|
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
|
|
)
|
|
.group_by(HamsterFact.activity_id)
|
|
.alias("facts_count_query")
|
|
)
|
|
|
|
mappings_count_query = (
|
|
HamsterActivityKimaiMapping.select(
|
|
HamsterActivityKimaiMapping.hamster_activity_id,
|
|
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
|
|
)
|
|
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
|
|
.alias("mappings_count_query")
|
|
)
|
|
|
|
activities = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
HamsterCategory.id,
|
|
HamsterFact.start_time,
|
|
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
|
|
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
|
|
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
|
|
"mappings_count"
|
|
),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.switch(HamsterActivity)
|
|
.join(HamsterFact, JOIN.LEFT_OUTER)
|
|
.switch(HamsterActivity)
|
|
.join(
|
|
facts_count_query,
|
|
JOIN.LEFT_OUTER,
|
|
on=(HamsterActivity.id == facts_count_query.c.activity_id),
|
|
)
|
|
.switch(HamsterActivity)
|
|
.join(
|
|
mappings_count_query,
|
|
JOIN.LEFT_OUTER,
|
|
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
|
|
)
|
|
.group_by(HamsterActivity)
|
|
)
|
|
|
|
filter_search = self.query_one("#filter #search").value
|
|
if filter_search is not None:
|
|
activities = activities.where(
|
|
HamsterActivity.name.contains(filter_search)
|
|
| HamsterCategory.name.contains(filter_search)
|
|
)
|
|
|
|
filter_date = self.query_one("#filter #date").value
|
|
if filter_date is not None:
|
|
try:
|
|
activities = activities.where(
|
|
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
|
|
)
|
|
except ValueError:
|
|
pass
|
|
|
|
self.table.add_rows(
|
|
[
|
|
[
|
|
activity.category_id,
|
|
activity.category_name,
|
|
activity.id,
|
|
activity.name,
|
|
activity.facts_count,
|
|
activity.mappings_count,
|
|
]
|
|
for activity in activities
|
|
]
|
|
)
|
|
|
|
self.table.sort(*self.sort)
|
|
|
|
def on_mount(self) -> None:
|
|
self.table = self.query_one(DataTable)
|
|
self.table.cursor_type = "row"
|
|
self.columns = self.table.add_columns(
|
|
"category id", "category", "activity id", "activity", "entries", "mappings"
|
|
)
|
|
self.sort = (self.columns[1], self.columns[3])
|
|
self._refresh()
|
|
|
|
def action_delete(self) -> None:
|
|
# get the keys for the row and column under the cursor.
|
|
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
|
|
|
|
activity_id = self.table.get_cell_at(
|
|
Coordinate(self.table.cursor_coordinate.row, 2),
|
|
)
|
|
|
|
activity = HamsterActivity.get(id=activity_id)
|
|
|
|
def check_delete(delete: bool) -> None:
|
|
"""Called when QuitScreen is dismissed."""
|
|
if delete:
|
|
activity.delete_instance()
|
|
|
|
# supply the row key to `remove_row` to delete the row.
|
|
self.table.remove_row(row_key)
|
|
|
|
if activity.facts.count() > 0:
|
|
self.app.push_screen(ActivityDeleteConfirmScreen(), check_delete)
|
|
else:
|
|
check_delete(True)
|
|
|
|
def action_move_facts(self) -> None:
|
|
row_idx: int = self.table.cursor_row
|
|
row_cells = self.table.get_row_at(row_idx)
|
|
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
|
|
for col_idx, cell_value in enumerate(row_cells):
|
|
cell_coordinate = Coordinate(row_idx, col_idx)
|
|
self.table.update_cell_at(
|
|
cell_coordinate,
|
|
f"[red]{cell_value}[/red]",
|
|
)
|
|
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
|
|
|
|
def on_data_table_row_selected(self, event):
|
|
if getattr(self, "move_from_activity", None) is not None:
|
|
move_to_activity = HamsterActivity.get(
|
|
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
|
|
)
|
|
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
|
|
HamsterFact.activity == self.move_from_activity
|
|
).execute()
|
|
self._refresh()
|
|
del self.move_from_activity
|
|
|
|
def action_edit(self):
|
|
row_idx: int = self.table.cursor_row
|
|
row_cells = self.table.get_row_at(row_idx)
|
|
|
|
try:
|
|
category = HamsterCategory.get(id=row_cells[0])
|
|
except HamsterCategory.DoesNotExist:
|
|
category = None
|
|
activity = HamsterActivity.get(id=row_cells[2])
|
|
|
|
def handle_edit(properties):
|
|
if properties is None:
|
|
return
|
|
activity.name = properties["activity"]
|
|
activity.category_id = properties["category"]
|
|
activity.save()
|
|
self._refresh()
|
|
|
|
self.app.push_screen(
|
|
ActivityEditScreen(category=category, activity=activity), handle_edit
|
|
)
|
|
|
|
def action_mapping(self):
|
|
selected_activity = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.where(
|
|
HamsterActivity.id
|
|
== self.table.get_cell_at(
|
|
Coordinate(self.table.cursor_coordinate.row, 2),
|
|
)
|
|
)
|
|
.get()
|
|
)
|
|
|
|
mapping = None
|
|
try:
|
|
mapping = HamsterActivityKimaiMapping.get(
|
|
hamster_activity=selected_activity
|
|
)
|
|
except HamsterActivityKimaiMapping.DoesNotExist:
|
|
pass
|
|
|
|
def handle_mapping(mapping_data):
|
|
if mapping_data is None:
|
|
return
|
|
if mapping is not None:
|
|
mapping_ = mapping
|
|
for key, value in mapping_data.items():
|
|
setattr(mapping_, key, value)
|
|
else:
|
|
mapping_ = HamsterActivityKimaiMapping.create(
|
|
hamster_activity=selected_activity, **mapping_data
|
|
)
|
|
mapping_.save()
|
|
self._refresh()
|
|
|
|
self.app.push_screen(
|
|
ActivityMappingScreen(
|
|
category=selected_activity.category_name,
|
|
activity=selected_activity.name,
|
|
mapping=mapping,
|
|
),
|
|
handle_mapping,
|
|
)
|