Compare commits

...

4 Commits

Author SHA1 Message Date
3wc fd28955da0 Add kimai "visible" field, moar kimai screens 2023-11-03 23:42:11 +00:00
3wc d3c2da74e6 Switch to tabs 2023-11-03 21:52:08 +00:00
3wc ff013525af Refucktoring 2023-11-03 00:13:54 +00:00
3wc ce68a03ea9 Mapping editing 2023-11-02 22:43:11 +00:00
10 changed files with 870 additions and 657 deletions

View File

@ -25,8 +25,7 @@ from .db import (
from .sync import sync
HAMSTER_DIR = Path.home() / ".local/share/hamster"
# HAMSTER_FILE = HAMSTER_DIR / 'hamster.db'
HAMSTER_FILE = "hamster-testing.db"
HAMSTER_FILE = HAMSTER_DIR / "hamster.db"
db.init(HAMSTER_FILE)
@ -370,7 +369,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
mapping_files = []
for mapping_path_item in mapping_path:
if not Path(mapping_path_item).exists():
raise click.UsageError(f'{mapping_path_item} does not exist')
raise click.UsageError(f"{mapping_path_item} does not exist")
mapping_file = _get_kimai_mapping_file(mapping_path_item)
next(mapping_file)

View File

@ -1,643 +1,29 @@
from datetime import datetime
from textual.app import App
from textual import on
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Grid
from textual.events import DescendantBlur
from textual.widgets import Header, Footer, DataTable, Input, Button, Label, Checkbox
from textual.containers import Horizontal, Vertical
from textual.coordinate import Coordinate
from textual.screen import Screen, ModalScreen
from .db import db
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from peewee import fn, JOIN
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .kimai import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
Activity as KimaiAPIActivity,
)
from .sync import sync
class ListScreen(Screen):
def compose(self) -> ComposeResult:
yield Header()
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
yield Footer()
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
def __init__(self, category, activity):
self.category = category
self.activity = activity
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.activity}@{self.category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(placeholder="Type to search...", id="customer"),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(placeholder="Type to search...", id="project"),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(placeholder="Type to search...", id="activity"),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description"),
),
Horizontal(
Label("Tags"),
Input(id="tags"),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move facts"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
def handle_mapping(mapping):
if mapping is None:
return
m = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping
)
m.save()
filter_search = self.query_one("#search")
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
),
handle_mapping,
)
class CategoryListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class KimaiProjectListScreen(ListScreen):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self, filter_query=None):
self.table.clear()
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_query:
projects = projects.where(
KimaiProject.name.contains(filter_query)
| KimaiCustomer.name.contains(filter_query)
)
self.table.add_rows(
[
[
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count,
]
for project in projects
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
class HamsterToolsApp(App):
CSS_PATH = "app.tcss"
BINDINGS = [
("a", "switch_mode('activities')", "Activities"),
("c", "switch_mode('categories')", "Categories"),
("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
def __init__(self):
db.init("hamster-testing.db")
self.MODES = {
"categories": CategoryListScreen(),
"activities": ActivityListScreen(),
"kimai": KimaiProjectListScreen(),
"hamster": HamsterScreen(),
"kimai": KimaiScreen(),
}
super().__init__()
def on_mount(self) -> None:
self.switch_mode("activities")
self.switch_mode("hamster")
def action_quit(self) -> None:
db.close()

View File

@ -1,4 +1,5 @@
from datetime import datetime
from peewee import (
SqliteDatabase,
Model,
@ -42,6 +43,7 @@ class HamsterFact(Model):
class KimaiCustomer(Model):
visible = BooleanField(default=True)
name = CharField()
class Meta:
@ -52,6 +54,7 @@ class KimaiCustomer(Model):
class KimaiProject(Model):
name = CharField()
customer = ForeignKeyField(KimaiCustomer, backref="projects")
visible = BooleanField(default=True)
allow_global_activities = BooleanField(default=True)
class Meta:
@ -62,6 +65,7 @@ class KimaiProject(Model):
class KimaiActivity(Model):
name = CharField()
project = ForeignKeyField(KimaiProject, backref="activities", null=True)
visible = BooleanField(default=True)
class Meta:
database = db

View File

@ -12,8 +12,8 @@ class NotFound(Exception):
class KimaiAPI(object):
# temporary hardcoded config
# KIMAI_API_URL = "https://kimai.autonomic.zone/api"
KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_API_URL = "https://kimai.autonomic.zone/api"
# KIMAI_API_URL = "https://kimaitest.autonomic.zone/api"
KIMAI_USERNAME = "3wordchant"
KIMAI_API_KEY = os.environ["KIMAI_API_KEY"]
@ -23,7 +23,7 @@ class KimaiAPI(object):
def __init__(self):
requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3})
self.user_json = self.get("users/me")
@ -44,24 +44,26 @@ class BaseAPI(object):
setattr(self, key, value)
@dataclass
class Customer(BaseAPI):
def __init__(self, api, id, name):
super().__init__(api, id=id, name=name)
api: KimaiAPI = field(repr=False)
id: int
name: str
visible: bool = field(default=True)
@staticmethod
def list(api):
return [Customer(api, c["id"], c["name"]) for c in api.customers_json]
return [
Customer(api, c["id"], c["name"], c["visible"]) for c in api.customers_json
]
@staticmethod
def get_by_id(api, id):
for value in api.customers_json:
if value["id"] == id:
return Customer(api, value["id"], value["name"])
for c in api.customers_json:
if c["id"] == id:
return Customer(api, c["id"], c["name"], c["visible"])
raise NotFound()
def __repr__(self):
return f"Customer (id={self.id}, name={self.name})"
@dataclass
class Project(BaseAPI):
@ -70,6 +72,7 @@ class Project(BaseAPI):
name: str
customer: Customer
allow_global_activities: bool = field(default=True)
visible: bool = field(default=True)
@staticmethod
def list(api):
@ -80,20 +83,22 @@ class Project(BaseAPI):
p["name"],
Customer.get_by_id(api, p["customer"]),
p["globalActivities"],
p["visible"],
)
for p in api.projects_json
]
@staticmethod
def get_by_id(api, id, none=False):
for value in api.projects_json:
if value["id"] == id:
for p in api.projects_json:
if p["id"] == id:
return Project(
api,
value["id"],
value["name"],
Customer.get_by_id(api, value["customer"]),
value["globalActivities"],
p["id"],
p["name"],
Customer.get_by_id(api, p["customer"]),
p["globalActivities"],
p["visible"],
)
if not none:
raise NotFound()
@ -105,25 +110,31 @@ class Activity(BaseAPI):
id: int
name: str
project: Project
visible: bool = field(default=True)
@staticmethod
def list(api):
return [
Activity(
api, a["id"], a["name"], Project.get_by_id(api, a["project"], none=True)
api,
a["id"],
a["name"],
Project.get_by_id(api, a["project"], none=True),
a["visible"],
)
for a in api.activities_json
]
@staticmethod
def get_by_id(api, id, none=False):
for value in api.activities_json:
if value["id"] == id:
for a in api.activities_json:
if a["id"] == id:
return Activity(
api,
value["id"],
value["name"],
Project.get_by_id(api, value["project"]),
a["id"],
a["name"],
Project.get_by_id(api, a["project"]),
a["visible"],
)
if not none:
raise NotFound()

View File

View File

@ -0,0 +1,577 @@
from datetime import datetime
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.coordinate import Coordinate
from textual.containers import Horizontal, Vertical
from textual.events import DescendantBlur
from textual.screen import Screen, ModalScreen
from textual.widgets import (
Header,
Footer,
DataTable,
Input,
Label,
Checkbox,
TabbedContent,
TabPane,
)
from peewee import fn, JOIN
from textual_autocomplete import AutoComplete, Dropdown, DropdownItem
from ..db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
from .list import ListPane
class ActivityEditScreen(ModalScreen):
BINDINGS = [
("escape", "cancel", "Cancel"),
("ctrl+s", "save", "Save"),
]
category_id = None
category_name = ""
def _get_categories(self, input_state):
categories = [DropdownItem(c.name, str(c.id)) for c in HamsterCategory.select()]
return ActivityMappingScreen._filter_dropdowns(categories, input_state.value)
def __init__(self, category, activity):
if category is not None:
self.category_id = category.id
self.category_name = category.name
self.activity_name = activity.name
super().__init__()
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label("Category:"),
AutoComplete(
Input(
placeholder="Type to search...",
id="category",
value=self.category_name,
),
Dropdown(items=self._get_categories),
),
),
Horizontal(
Label("Activity:"), Input(value=self.activity_name, id="activity")
),
)
@on(Input.Submitted, "#category")
def category_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#category")
def category_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.category_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_cancel(self):
self.dismiss(None)
def action_save(self):
self.dismiss(
{
"category": self.category_id,
"activity": self.query_one("#activity").value,
}
)
class ActivityMappingScreen(ModalScreen):
BINDINGS = [
("ctrl+g", "global", "Toggle global"),
("ctrl+s", "save", "Save"),
("escape", "cancel", "Cancel"),
]
customer_id = None
project_id = None
activity_id = None
customer = ""
project = ""
activity = ""
description = ""
tags = ""
def __init__(self, category, activity, mapping=None):
self.hamster_category = category
self.hamster_activity = activity
if mapping is not None:
self.customer_id = mapping.kimai_customer_id
self.customer = mapping.kimai_customer.name
self.project_id = mapping.kimai_project_id
self.project = mapping.kimai_project.name
self.activity_id = mapping.kimai_activity_id
self.activity = mapping.kimai_activity.name
self.description = mapping.kimai_description
self.tags = mapping.kimai_tags
super().__init__()
@staticmethod
def _filter_dropdowns(options, value):
matches = [c for c in options if value.lower() in c.main.plain.lower()]
return sorted(matches, key=lambda v: v.main.plain.startswith(value.lower()))
def _get_customers(self, input_state):
customers = [DropdownItem(c.name, str(c.id)) for c in KimaiCustomer.select()]
return ActivityMappingScreen._filter_dropdowns(customers, input_state.value)
def _get_projects(self, input_state):
projects = [
DropdownItem(p.name, str(p.id))
for p in KimaiProject.select().where(
KimaiProject.customer_id == self.customer_id
)
]
return ActivityMappingScreen._filter_dropdowns(projects, input_state.value)
def _get_activities(self, input_state):
activities = KimaiActivity.select()
if self.query_one("#global").value:
activities = activities.where(
KimaiActivity.project_id.is_null(),
)
else:
activities = activities.where(KimaiActivity.project_id == self.project_id)
return ActivityMappingScreen._filter_dropdowns(
[DropdownItem(a.name, str(a.id)) for a in activities], input_state.value
)
def compose(self) -> ComposeResult:
yield Vertical(
Horizontal(
Label(f"Mapping for {self.hamster_activity}@{self.hamster_category}"),
),
Horizontal(
Label("Customer"),
AutoComplete(
Input(
placeholder="Type to search...",
id="customer",
value=self.customer,
),
Dropdown(items=self._get_customers),
),
),
Horizontal(
Label("Project"),
AutoComplete(
Input(
placeholder="Type to search...",
id="project",
value=self.project,
),
Dropdown(items=self._get_projects),
),
),
Horizontal(
Label("Activity"),
AutoComplete(
Input(
placeholder="Type to search...",
id="activity",
value=self.activity,
),
Dropdown(items=self._get_activities),
),
),
Horizontal(
Label("Description"),
Input(id="description", value=self.description),
),
Horizontal(
Label("Tags"),
Input(id="tags", value=self.tags),
),
Horizontal(Checkbox("Global", id="global")),
)
@on(Input.Submitted, "#customer")
def customer_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#project").focus()
@on(DescendantBlur, "#customer")
def customer_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.customer_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
@on(Input.Submitted, "#project")
def project_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
self.query_one("#activity").focus()
@on(DescendantBlur, "#project")
def project_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.project_id = str(event.control.parent.dropdown.selected_item.left_meta)
@on(Input.Submitted, "#activity")
def activity_submitted(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
self.query_one("#activity").focus()
@on(DescendantBlur, "#activity")
def activity_blur(self, event):
if event.control.parent.dropdown.selected_item is not None:
self.activity_id = str(
event.control.parent.dropdown.selected_item.left_meta
)
def action_global(self):
self.query_one("#global").value = not self.query_one("#global").value
def action_save(self):
self.dismiss(
{
"kimai_customer_id": self.customer_id,
"kimai_project_id": self.project_id,
"kimai_activity_id": self.activity_id,
"kimai_description": self.query_one("#description").value,
"kimai_tags": self.query_one("#tags").value,
"global": self.query_one("#global").value,
}
)
def action_cancel(self):
self.dismiss(None)
class ActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete"),
("f", "move_facts", "Move"),
("e", "edit", "Edit"),
("m", "mapping", "Mapping"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
facts_count_query = (
HamsterFact.select(
HamsterFact.activity_id, fn.COUNT(HamsterFact.id).alias("facts_count")
)
.group_by(HamsterFact.activity_id)
.alias("facts_count_query")
)
mappings_count_query = (
HamsterActivityKimaiMapping.select(
HamsterActivityKimaiMapping.hamster_activity_id,
fn.COUNT(HamsterActivityKimaiMapping.id).alias("mappings_count"),
)
.group_by(HamsterActivityKimaiMapping.hamster_activity_id)
.alias("mappings_count_query")
)
activities = (
HamsterActivity.select(
HamsterActivity,
HamsterCategory.id,
HamsterFact.start_time,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
fn.COALESCE(facts_count_query.c.facts_count, 0).alias("facts_count"),
fn.COALESCE(mappings_count_query.c.mappings_count, 0).alias(
"mappings_count"
),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(HamsterFact, JOIN.LEFT_OUTER)
.switch(HamsterActivity)
.join(
facts_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == facts_count_query.c.activity_id),
)
.switch(HamsterActivity)
.join(
mappings_count_query,
JOIN.LEFT_OUTER,
on=(HamsterActivity.id == mappings_count_query.c.hamster_activity_id),
)
.group_by(HamsterActivity)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
activities = activities.where(
HamsterActivity.name.contains(filter_search)
| HamsterCategory.name.contains(filter_search)
)
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
activities = activities.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
activity.category_id,
activity.category_name,
activity.id,
activity.name,
activity.facts_count,
activity.mappings_count,
]
for activity in activities
]
)
self.table.sort(*self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"category id", "category", "activity id", "activity", "entries", "mappings"
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_delete(self) -> None:
# get the keys for the row and column under the cursor.
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
activity_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
activity = HamsterActivity.get(id=activity_id)
activity.delete_instance()
# supply the row key to `remove_row` to delete the row.
self.table.remove_row(row_key)
def action_move_facts(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
self.move_from_activity = HamsterActivity.get(id=row_cells[2])
for col_idx, cell_value in enumerate(row_cells):
cell_coordinate = Coordinate(row_idx, col_idx)
self.table.update_cell_at(
cell_coordinate,
f"[red]{cell_value}[/red]",
)
self.table.move_cursor(row=self.table.cursor_coordinate[0] + 1)
def on_data_table_row_selected(self, event):
if getattr(self, "move_from_activity", None) is not None:
move_to_activity = HamsterActivity.get(
self.table.get_cell_at(Coordinate(event.cursor_row, 2))
)
HamsterFact.update({HamsterFact.activity: move_to_activity}).where(
HamsterFact.activity == self.move_from_activity
).execute()
self._refresh()
del self.move_from_activity
def action_edit(self):
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
try:
category = HamsterCategory.get(id=row_cells[0])
except HamsterCategory.DoesNotExist:
category = None
activity = HamsterActivity.get(id=row_cells[2])
def handle_edit(properties):
if properties is None:
return
activity.name = properties["activity"]
activity.category_id = properties["category"]
activity.save()
self._refresh()
self.app.push_screen(
ActivityEditScreen(category=category, activity=activity), handle_edit
)
def action_mapping(self):
selected_activity = (
HamsterActivity.select(
HamsterActivity,
fn.COALESCE(HamsterCategory.name, "None").alias("category_name"),
)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
HamsterActivity.id
== self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 2),
)
)
.get()
)
mapping = None
try:
mapping = HamsterActivityKimaiMapping.get(
hamster_activity=selected_activity
)
except HamsterActivityKimaiMapping.DoesNotExist:
pass
def handle_mapping(mapping_data):
if mapping_data is None:
return
if mapping is not None:
mapping_ = mapping
for key, value in mapping_data.items():
setattr(mapping_, key, value)
else:
mapping_ = HamsterActivityKimaiMapping.create(
hamster_activity=selected_activity, **mapping_data
)
mapping_.save()
self._refresh()
self.app.push_screen(
ActivityMappingScreen(
category=selected_activity.category_name,
activity=selected_activity.name,
mapping=mapping,
),
handle_mapping,
)
class CategoryList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("/", "filter", "Search"),
("d", "delete", "Delete category"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
categories = (
HamsterCategory.select(
HamsterCategory,
fn.Count(HamsterActivity.id).alias("activities_count"),
HamsterFact.start_time,
)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterFact, JOIN.LEFT_OUTER)
.group_by(HamsterCategory)
)
filter_search = self.query_one("#filter #search").value
if filter_search is not None:
categories = categories.where(HamsterCategory.name.contains(filter_search))
filter_date = self.query_one("#filter #date").value
if filter_date is not None:
try:
categories = categories.where(
HamsterFact.start_time > datetime.strptime(filter_date, "%Y-%m-%d")
)
except ValueError:
pass
self.table.add_rows(
[
[
category.id,
category.name,
category.activities_count,
]
for category in categories
]
)
self.table.sort(self.sort)
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns("category id", "category", "activities")
self.sort = self.columns[1]
self._refresh()
def action_delete(self) -> None:
row_key, _ = self.table.coordinate_to_cell_key(self.table.cursor_coordinate)
category_id = self.table.get_cell_at(
Coordinate(self.table.cursor_coordinate.row, 0),
)
category = HamsterCategory.get(id=category_id)
category.delete_instance()
self.table.remove_row(row_key)
class HamsterScreen(Screen):
BINDINGS = [
("c", "show_tab('categories')", "Categories"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Hamster"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="activities"):
with TabPane("Categories", id="categories"):
yield CategoryList()
with TabPane("Activities", id="activities"):
yield ActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#activities DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,179 @@
from textual.app import ComposeResult
from textual.binding import Binding
from textual.screen import Screen
from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
from peewee import fn, JOIN
from ..sync import sync
from ..db import (
KimaiProject,
KimaiCustomer,
KimaiActivity,
)
from .list import ListPane
class KimaiCustomerList(ListPane):
pass
class KimaiProjectList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
projects = (
KimaiProject.select(
KimaiProject,
KimaiCustomer,
fn.Count(KimaiActivity.id).alias("activities_count"),
)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.switch(KimaiProject)
.join(KimaiActivity, JOIN.LEFT_OUTER)
.where(KimaiActivity.project.is_null(False))
.group_by(KimaiProject)
)
if filter_search:
projects = projects.where(
KimaiProject.name.contains(filter_search)
| KimaiCustomer.name.contains(filter_search)
)
self.table.add_rows(
[
[
project.customer.id,
project.customer.name,
project.id,
project.name,
project.activities_count,
project.visible,
]
for project in projects
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
"customer id", "customer", "project id", "project", "activities", "visible"
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[1]
self._refresh()
class KimaiActivityList(ListPane):
BINDINGS = [
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
def _refresh(self):
self.table.clear()
filter_search = self.query_one("#filter #search").value
activities = (
KimaiActivity.select(
KimaiActivity,
fn.COALESCE(KimaiProject.name, "None").alias("project_name"),
fn.COALESCE(KimaiCustomer.name, "None").alias("customer_name"),
)
.join(KimaiProject, JOIN.LEFT_OUTER)
.join(KimaiCustomer, JOIN.LEFT_OUTER)
.group_by(KimaiActivity)
)
if filter_search:
activities = activities.where(KimaiActivity.name.contains(filter_search))
self.table.add_rows(
[
[
# activity.project.customer_id if activity.project is not None else '',
# activity.customer_name,
str(activity.project_id) if activity.project_id is not None else "",
activity.project_name,
activity.id,
activity.name,
activity.visible,
]
for activity in activities
]
)
self.table.sort(self.sort)
def action_get(self) -> None:
sync()
self._refresh()
def on_mount(self) -> None:
self.table = self.query_one(DataTable)
self.table.cursor_type = "row"
self.columns = self.table.add_columns(
# "customer id",
# "customer",
"project id",
"project",
"id",
"name",
"visible",
)
# self.sort = (self.columns[1], self.columns[3])
self.sort = self.columns[3]
self._refresh()
class KimaiScreen(Screen):
BINDINGS = [
("c", "show_tab('customers')", "Customers"),
("p", "show_tab('projects')", "Projects"),
("a", "show_tab('activities')", "Activities"),
]
SUB_TITLE = "Kimai"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="projects"):
with TabPane("Customers", id="customers"):
yield KimaiCustomerList()
with TabPane("Projects", id="projects"):
yield KimaiProjectList()
with TabPane("Activities", id="activities"):
yield KimaiActivityList()
yield Footer()
def on_mount(self) -> None:
self.query_one("TabbedContent Tabs").can_focus = False
self.query_one("#projects DataTable").focus()
def action_show_tab(self, tab: str) -> None:
"""Switch to a new tab."""
self.get_child_by_type(TabbedContent).active = tab
self.query_one(f"#{tab} DataTable").focus()

View File

@ -0,0 +1,52 @@
from datetime import datetime
from textual import on
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical, Container
from textual.widgets import DataTable, Input
class ListPane(Container):
def compose(self) -> ComposeResult:
with Vertical():
yield DataTable()
with Horizontal(id="filter"):
yield Input(
id="search", placeholder="Category/activity name contains text"
)
yield Input(
id="date",
placeholder="After date, in {0} format".format(
datetime.now().strftime("%Y-%m-%d")
),
)
def action_refresh(self) -> None:
self._refresh()
def action_sort(self) -> None:
self.table.cursor_type = "column"
def on_data_table_column_selected(self, event):
self.sort = (event.column_key,)
event.data_table.sort(*self.sort)
event.data_table.cursor_type = "row"
def action_filter(self) -> None:
self.query_one("#filter").display = True
self._refresh()
self.query_one("#filter #search").focus()
def on_input_submitted(self, event):
self.table.focus()
def action_cancelfilter(self) -> None:
self.query_one("#filter").display = False
self.query_one("#filter #search").clear()
self.query_one("#filter #date").clear()
self.table.focus()
self._refresh()
@on(Input.Changed, "#filter Input")
def filter(self, event):
self._refresh()

View File

@ -1,4 +1,4 @@
from .kimai import (
from .kimaiapi import (
KimaiAPI,
Customer as KimaiAPICustomer,
Project as KimaiAPIProject,
@ -6,13 +6,9 @@ from .kimai import (
)
from .db import (
db,
HamsterCategory,
HamsterActivity,
HamsterFact,
KimaiProject,
KimaiCustomer,
KimaiActivity,
HamsterActivityKimaiMapping,
)
@ -26,7 +22,14 @@ def sync() -> None:
customers = KimaiAPICustomer.list(api)
with db.atomic():
KimaiCustomer.insert_many(
[{"id": customer.id, "name": customer.name} for customer in customers]
[
{
"id": customer.id,
"name": customer.name,
"visible": customer.visible,
}
for customer in customers
]
).execute()
projects = KimaiAPIProject.list(api)
@ -38,6 +41,7 @@ def sync() -> None:
"name": project.name,
"customer_id": project.customer.id,
"allow_global_activities": project.allow_global_activities,
"visible": project.visible,
}
for project in projects
]
@ -50,7 +54,8 @@ def sync() -> None:
{
"id": activity.id,
"name": activity.name,
"project_id": (activity.project and activity.project.id or None),
"project_id": (activity.project.id if activity.project else None),
"visible": activity.visible,
}
for activity in activities
]

View File

@ -13,7 +13,7 @@ from hamstertools.db import (
HamsterFact,
HamsterFactKimaiImport,
)
from hamstertools.kimai import KimaiAPI, Timesheet, Project, Activity
from hamstertools.kimaiapi import KimaiAPI, Timesheet, Project, Activity
api = KimaiAPI()