686 lines
22 KiB
Python
686 lines
22 KiB
Python
from datetime import datetime
|
|
|
|
from textual import on
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Grid
|
|
from textual.events import DescendantBlur
|
|
from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox
|
|
from textual.containers import Horizontal, Vertical
|
|
from textual.coordinate import Coordinate
|
|
from textual.screen import Screen, ModalScreen
|
|
|
|
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
|
|
|
|
from peewee import fn, JOIN
|
|
|
|
from .db import (
|
|
db,
|
|
HamsterCategory,
|
|
HamsterActivity,
|
|
HamsterFact,
|
|
KimaiProject,
|
|
KimaiCustomer,
|
|
KimaiActivity,
|
|
HamsterActivityKimaiMapping,
|
|
)
|
|
from .kimai import (
|
|
KimaiAPI,
|
|
Customer as KimaiAPICustomer,
|
|
Project as KimaiAPIProject,
|
|
Activity as KimaiAPIActivity,
|
|
)
|
|
from .sync import sync
|
|
|
|
|
|
class ListScreen(Screen):
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
with Vertical():
|
|
yield DataTable()
|
|
with Horizontal(id="filter"):
|
|
yield Input(
|
|
id="search", placeholder="Category/activity name contains text"
|
|
)
|
|
yield Input(
|
|
id="date",
|
|
placeholder="After date, in {0} format".format(
|
|
datetime.now().strftime("%Y-%m-%d")
|
|
),
|
|
)
|
|
yield Footer()
|
|
|
|
def action_refresh(self) -> None:
|
|
self._refresh()
|
|
|
|
def action_sort(self) -> None:
|
|
self.table.cursor_type = "column"
|
|
|
|
def on_data_table_column_selected(self, event):
|
|
self.sort = (event.column_key,)
|
|
event.data_table.sort(*self.sort)
|
|
event.data_table.cursor_type = "row"
|
|
|
|
def action_filter(self) -> None:
|
|
self.query_one("#filter").display = True
|
|
self._refresh()
|
|
self.query_one("#filter #search").focus()
|
|
|
|
def on_input_submitted(self, event):
|
|
self.table.focus()
|
|
|
|
def action_cancelfilter(self) -> None:
|
|
self.query_one("#filter").display = False
|
|
self.query_one("#filter #search").clear()
|
|
self.query_one("#filter #date").clear()
|
|
self.table.focus()
|
|
self._refresh()
|
|
|
|
@on(Input.Changed, "#filter Input")
|
|
def filter(self, event):
|
|
self._refresh()
|
|
|
|
|
|
class ActivityEditScreen(ModalScreen):
|
|
BINDINGS = [
|
|
("escape", "cancel", "Cancel"),
|
|
("ctrl+s", "save", "Save"),
|
|
]
|
|
|
|
category_id = None
|
|
category_name = ""
|
|
|
|
def _get_categories(self, input_state):
|
|
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
|
|
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
|
|
|
|
def __init__(self, category, activity):
|
|
if category is not None:
|
|
self.category_id = category.id
|
|
self.category_name = category.name
|
|
self.activity_name = activity.name
|
|
super().__init__()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Vertical(
|
|
Horizontal(
|
|
Label("Category:"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="category",
|
|
value=self.category_name,
|
|
),
|
|
Dropdown(items=self._get_categories),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Activity:"), Input(value=self.activity_name, id="activity")
|
|
),
|
|
)
|
|
|
|
@on(Input.Submitted, "#category")
|
|
def category_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.category_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#category")
|
|
def category_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.category_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
def action_cancel(self):
|
|
self.dismiss(None)
|
|
|
|
def action_save(self):
|
|
self.dismiss(
|
|
{
|
|
"category": self.category_id,
|
|
"activity": self.query_one("#activity").value,
|
|
}
|
|
)
|
|
|
|
|
|
class ActivityMappingScreen(ModalScreen):
|
|
BINDINGS = [
|
|
("ctrl+g", "global", "Toggle global"),
|
|
("ctrl+s", "save", "Save"),
|
|
("escape", "cancel", "Cancel"),
|
|
]
|
|
|
|
customer_id = None
|
|
project_id = None
|
|
activity_id = None
|
|
customer = ""
|
|
project = ""
|
|
activity = ""
|
|
description = ""
|
|
tags = ""
|
|
|
|
def __init__(self, category, activity, mapping=None):
|
|
self.hamster_category = category
|
|
self.hamster_activity = activity
|
|
|
|
if mapping is not None:
|
|
self.customer_id = mapping.kimai_customer_id
|
|
self.customer = mapping.kimai_customer.name
|
|
self.project_id = mapping.kimai_project_id
|
|
self.project = mapping.kimai_project.name
|
|
self.activity_id = mapping.kimai_activity_id
|
|
self.activity = mapping.kimai_activity.name
|
|
self.description = mapping.kimai_description
|
|
self.tags = mapping.kimai_tags
|
|
|
|
super().__init__()
|
|
|
|
@staticmethod
|
|
def _filter_dropdowns(options, value):
|
|
matches = [c for c in options if value.lower() in c.main.plain.lower()]
|
|
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
|
|
|
|
def _get_customers(self, input_state):
|
|
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
|
|
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
|
|
|
|
def _get_projects(self, input_state):
|
|
projects = [
|
|
DropdownItem(p.name, str(p.id))
|
|
for p in KimaiProject.select().where(
|
|
KimaiProject.customer_id == self.customer_id
|
|
)
|
|
]
|
|
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
|
|
|
|
def _get_activities(self, input_state):
|
|
activities = KimaiActivity.select()
|
|
|
|
if self.query_one("#global").value:
|
|
activities = activities.where(
|
|
KimaiActivity.project_id.is_null(),
|
|
)
|
|
else:
|
|
activities = activities.where(KimaiActivity.project_id == self.project_id)
|
|
|
|
return ActivityMappingScreen._filter_dropdowns(
|
|
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
|
|
)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Vertical(
|
|
Horizontal(
|
|
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
|
|
),
|
|
Horizontal(
|
|
Label("Customer"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="customer",
|
|
value=self.customer,
|
|
),
|
|
Dropdown(items=self._get_customers),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Project"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="project",
|
|
value=self.project,
|
|
),
|
|
Dropdown(items=self._get_projects),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Activity"),
|
|
AutoComplete(
|
|
Input(
|
|
placeholder="Type to search...",
|
|
id="activity",
|
|
value=self.activity,
|
|
),
|
|
Dropdown(items=self._get_activities),
|
|
),
|
|
),
|
|
Horizontal(
|
|
Label("Description"),
|
|
Input(id="description", value=self.description),
|
|
),
|
|
Horizontal(
|
|
Label("Tags"),
|
|
Input(id="tags", value=self.tags),
|
|
),
|
|
Horizontal(Checkbox("Global", id="global")),
|
|
)
|
|
|
|
@on(Input.Submitted, "#customer")
|
|
def customer_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.customer_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#project").focus()
|
|
|
|
@on(DescendantBlur, "#customer")
|
|
def customer_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.customer_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
@on(Input.Submitted, "#project")
|
|
def project_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#project")
|
|
def project_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
|
|
|
|
@on(Input.Submitted, "#activity")
|
|
def activity_submitted(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.activity_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
self.query_one("#activity").focus()
|
|
|
|
@on(DescendantBlur, "#activity")
|
|
def activity_blur(self, event):
|
|
if event.control.parent.dropdown.selected_item is not None:
|
|
self.activity_id = str(
|
|
event.control.parent.dropdown.selected_item.left_meta
|
|
)
|
|
|
|
def action_global(self):
|
|
self.query_one("#global").value = not self.query_one("#global").value
|
|
|
|
def action_save(self):
|
|
self.dismiss(
|
|
{
|
|
"kimai_customer_id": self.customer_id,
|
|
"kimai_project_id": self.project_id,
|
|
"kimai_activity_id": self.activity_id,
|
|
"kimai_description": self.query_one("#description").value,
|
|
"kimai_tags": self.query_one("#tags").value,
|
|
"global": self.query_one("#global").value,
|
|
}
|
|
)
|
|
|
|
def action_cancel(self):
|
|
self.dismiss(None)
|
|
|
|
|
|
class ActivityListScreen(ListScreen):
|
|
BINDINGS = [
|
|
("s", "sort", "Sort"),
|
|
("r", "refresh", "Refresh"),
|
|
("/", "filter", "Search"),
|
|
("d", "delete", "Delete"),
|
|
("f", "move_facts", "Move facts"),
|
|
("e", "edit", "Edit"),
|
|
("m", "mapping", "Mapping"),
|
|
Binding(key="escape", action="cancelfilter", show=False),
|
|
]
|
|
|
|
def _refresh(self):
|
|
self.table.clear()
|
|
|
|
facts_count_query = (
|
|
HamsterFact.select(
|
|
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
|
|
)
|
|
.group_by(HamsterFact.activity_id)
|
|
.alias("facts_count_query")
|
|
)
|
|
|
|
mappings_count_query = (
|
|
HamsterActivityKimaiMapping.select(
|
|
HamsterActivityKimaiMapping.hamster_activity_id,
|
|
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
|
|
)
|
|
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
|
|
.alias("mappings_count_query")
|
|
)
|
|
|
|
activities = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
HamsterCategory.id,
|
|
HamsterFact.start_time,
|
|
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
|
|
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
|
|
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
|
|
"mappings_count"
|
|
),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.switch(HamsterActivity)
|
|
.join(HamsterFact, JOIN.LEFT_OUTER)
|
|
.switch(HamsterActivity)
|
|
.join(
|
|
facts_count_query,
|
|
JOIN.LEFT_OUTER,
|
|
on=(HamsterActivity.id == facts_count_query.c.activity_id),
|
|
)
|
|
.switch(HamsterActivity)
|
|
.join(
|
|
mappings_count_query,
|
|
JOIN.LEFT_OUTER,
|
|
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
|
|
)
|
|
.group_by(HamsterActivity)
|
|
)
|
|
|
|
filter_search = self.query_one("#filter #search").value
|
|
if filter_search is not None:
|
|
activities = activities.where(
|
|
HamsterActivity.name.contains(filter_search)
|
|
| HamsterCategory.name.contains(filter_search)
|
|
)
|
|
|
|
filter_date = self.query_one("#filter #date").value
|
|
if filter_date is not None:
|
|
try:
|
|
activities = activities.where(
|
|
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
|
|
)
|
|
except ValueError:
|
|
pass
|
|
|
|
self.table.add_rows(
|
|
[
|
|
[
|
|
activity.category_id,
|
|
activity.category_name,
|
|
activity.id,
|
|
activity.name,
|
|
activity.facts_count,
|
|
activity.mappings_count,
|
|
]
|
|
for activity in activities
|
|
]
|
|
)
|
|
|
|
self.table.sort(*self.sort)
|
|
|
|
def on_mount(self) -> None:
|
|
self.table = self.query_one(DataTable)
|
|
self.table.cursor_type = "row"
|
|
self.columns = self.table.add_columns(
|
|
"category id", "category", "activity id", "activity", "entries", "mappings"
|
|
)
|
|
self.sort = (self.columns[1], self.columns[3])
|
|
self._refresh()
|
|
|
|
def action_delete(self) -> None:
|
|
# get the keys for the row and column under the cursor.
|
|
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
|
|
|
|
activity_id = self.table.get_cell_at(
|
|
Coordinate(self.table.cursor_coordinate.row, 2),
|
|
)
|
|
|
|
activity = HamsterActivity.get(id=activity_id)
|
|
activity.delete_instance()
|
|
|
|
# supply the row key to `remove_row` to delete the row.
|
|
self.table.remove_row(row_key)
|
|
|
|
def action_move_facts(self) -> None:
|
|
row_idx: int = self.table.cursor_row
|
|
row_cells = self.table.get_row_at(row_idx)
|
|
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
|
|
for col_idx, cell_value in enumerate(row_cells):
|
|
cell_coordinate = Coordinate(row_idx, col_idx)
|
|
self.table.update_cell_at(
|
|
cell_coordinate,
|
|
f"[red]{cell_value}[/red]",
|
|
)
|
|
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
|
|
|
|
def on_data_table_row_selected(self, event):
|
|
if getattr(self, "move_from_activity", None) is not None:
|
|
move_to_activity = HamsterActivity.get(
|
|
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
|
|
)
|
|
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
|
|
HamsterFact.activity == self.move_from_activity
|
|
).execute()
|
|
self._refresh()
|
|
del self.move_from_activity
|
|
|
|
def action_edit(self):
|
|
row_idx: int = self.table.cursor_row
|
|
row_cells = self.table.get_row_at(row_idx)
|
|
|
|
try:
|
|
category = HamsterCategory.get(id=row_cells[0])
|
|
except HamsterCategory.DoesNotExist:
|
|
category = None
|
|
activity = HamsterActivity.get(id=row_cells[2])
|
|
|
|
def handle_edit(properties):
|
|
if properties is None:
|
|
return
|
|
activity.name = properties["activity"]
|
|
activity.category_id = properties["category"]
|
|
activity.save()
|
|
self._refresh()
|
|
|
|
self.app.push_screen(
|
|
ActivityEditScreen(category=category, activity=activity), handle_edit
|
|
)
|
|
|
|
def action_mapping(self):
|
|
selected_activity = (
|
|
HamsterActivity.select(
|
|
HamsterActivity,
|
|
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
|
|
)
|
|
.join(HamsterCategory, JOIN.LEFT_OUTER)
|
|
.where(
|
|
HamsterActivity.id
|
|
== self.table.get_cell_at(
|
|
Coordinate(self.table.cursor_coordinate.row, 2),
|
|
)
|
|
)
|
|
.get()
|
|
)
|
|
|
|
mapping = None
|
|
try:
|
|
mapping = HamsterActivityKimaiMapping.get(
|
|
hamster_activity=selected_activity
|
|
)
|
|
except HamsterActivityKimaiMapping.DoesNotExist:
|
|
pass
|
|
|
|
def handle_mapping(mapping_data):
|
|
if mapping_data is None:
|
|
return
|
|
if mapping is not None:
|
|
mapping_ = mapping
|
|
for key, value in mapping_data.items():
|
|
setattr(mapping_, key, value)
|
|
else:
|
|
mapping_ = HamsterActivityKimaiMapping.create(
|
|
hamster_activity=selected_activity, **mapping_data
|
|
)
|
|
mapping_.save()
|
|
self._refresh()
|
|
|
|
self.app.push_screen(
|
|
ActivityMappingScreen(
|
|
category=selected_activity.category_name,
|
|
activity=selected_activity.name,
|
|
mapping=mapping,
|
|
),
|
|
handle_mapping,
|
|
)
|
|
|
|
|
|
class CategoryListScreen(ListScreen):
|
|
BINDINGS = [
|
|
("s", "sort", "Sort"),
|
|
("r", "refresh", "Refresh"),
|
|
("/", "filter", "Search"),
|
|
("d", "delete", "Delete category"),
|
|
Binding(key="escape", action="cancelfilter", show=False),
|
|
]
|
|
|
|
def _refresh(self):
|
|
self.table.clear()
|
|
|
|
categories = (
|
|
HamsterCategory.select(
|
|
HamsterCategory,
|
|
fn.Count(HamsterActivity.id).alias("activities_count"),
|
|
HamsterFact.start_time,
|
|
)
|
|
.join(HamsterActivity, JOIN.LEFT_OUTER)
|
|
.join(HamsterFact, JOIN.LEFT_OUTER)
|
|
.group_by(HamsterCategory)
|
|
)
|
|
|
|
filter_search = self.query_one("#filter #search").value
|
|
if filter_search is not None:
|
|
categories = categories.where(HamsterCategory.name.contains(filter_search))
|
|
|
|
filter_date = self.query_one("#filter #date").value
|
|
if filter_date is not None:
|
|
try:
|
|
categories = categories.where(
|
|
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
|
|
)
|
|
except ValueError:
|
|
pass
|
|
|
|
self.table.add_rows(
|
|
[
|
|
[
|
|
category.id,
|
|
category.name,
|
|
category.activities_count,
|
|
]
|
|
for category in categories
|
|
]
|
|
)
|
|
|
|
self.table.sort(self.sort)
|
|
|
|
def on_mount(self) -> None:
|
|
self.table = self.query_one(DataTable)
|
|
self.table.cursor_type = "row"
|
|
self.columns = self.table.add_columns("category id", "category", "activities")
|
|
self.sort = self.columns[1]
|
|
self._refresh()
|
|
|
|
def action_delete(self) -> None:
|
|
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
|
|
|
|
category_id = self.table.get_cell_at(
|
|
Coordinate(self.table.cursor_coordinate.row, 0),
|
|
)
|
|
category = HamsterCategory.get(id=category_id)
|
|
category.delete_instance()
|
|
|
|
self.table.remove_row(row_key)
|
|
|
|
|
|
class KimaiProjectListScreen(ListScreen):
|
|
BINDINGS = [
|
|
("s", "sort", "Sort"),
|
|
("r", "refresh", "Refresh"),
|
|
("g", "get", "Get data"),
|
|
("/", "filter", "Search"),
|
|
Binding(key="escape", action="cancelfilter", show=False),
|
|
]
|
|
|
|
def _refresh(self, filter_query=None):
|
|
self.table.clear()
|
|
|
|
projects = (
|
|
KimaiProject.select(
|
|
KimaiProject,
|
|
KimaiCustomer,
|
|
fn.Count(KimaiActivity.id).alias("activities_count"),
|
|
)
|
|
.join(KimaiCustomer, JOIN.LEFT_OUTER)
|
|
.switch(KimaiProject)
|
|
.join(KimaiActivity, JOIN.LEFT_OUTER)
|
|
.where(KimaiActivity.project.is_null(False))
|
|
.group_by(KimaiProject)
|
|
)
|
|
|
|
if filter_query:
|
|
projects = projects.where(
|
|
KimaiProject.name.contains(filter_query)
|
|
| KimaiCustomer.name.contains(filter_query)
|
|
)
|
|
|
|
self.table.add_rows(
|
|
[
|
|
[
|
|
project.customer.id,
|
|
project.customer.name,
|
|
project.id,
|
|
project.name,
|
|
project.activities_count,
|
|
]
|
|
for project in projects
|
|
]
|
|
)
|
|
|
|
self.table.sort(self.sort)
|
|
|
|
def action_get(self) -> None:
|
|
sync()
|
|
self._refresh()
|
|
|
|
def on_mount(self) -> None:
|
|
self.table = self.query_one(DataTable)
|
|
self.table.cursor_type = "row"
|
|
self.columns = self.table.add_columns(
|
|
"customer id", "customer", "project id", "project", "activities"
|
|
)
|
|
# self.sort = (self.columns[1], self.columns[3])
|
|
self.sort = self.columns[1]
|
|
self._refresh()
|
|
|
|
|
|
class HamsterToolsApp(App):
|
|
CSS_PATH = "app.tcss"
|
|
BINDINGS = [
|
|
("a", "switch_mode('activities')", "Activities"),
|
|
("c", "switch_mode('categories')", "Categories"),
|
|
("k", "switch_mode('kimai')", "Kimai"),
|
|
("q", "quit", "Quit"),
|
|
]
|
|
|
|
def __init__(self):
|
|
db.init("hamster-testing.db")
|
|
|
|
self.MODES = {
|
|
"categories": CategoryListScreen(),
|
|
"activities": ActivityListScreen(),
|
|
"kimai": KimaiProjectListScreen(),
|
|
}
|
|
|
|
super().__init__()
|
|
|
|
def on_mount(self) -> None:
|
|
self.switch_mode("activities")
|
|
|
|
def action_quit(self) -> None:
|
|
db.close()
|
|
self.exit()
|