Compare commits

...

3 Commits

Author SHA1 Message Date
3wc 26b8b5f334 Re4matting 2023-11-18 11:23:54 +00:00
3wc b62eb5cb22 Yeet upload script into a command 2023-11-17 23:22:21 +00:00
3wc 1145f5e806 API improvements 2023-11-17 22:54:25 +00:00
5 changed files with 150 additions and 112 deletions

View File

@ -22,6 +22,7 @@ from .db import (
HamsterActivityKimaiMapping,
HamsterFactKimaiImport,
)
from .kimaiapi import KimaiAPI, Timesheet
from .sync import sync
HAMSTER_DIR = Path.home() / ".local/share/hamster"
@ -481,7 +482,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
found_activities.append(activity_str)
@kimai.command("import")
@kimai.command("csv")
@click.option(
"--mapping-path",
help="Mapping file (default ~/.local/share/hamster/mapping.kimai.csv)",
@ -492,7 +493,7 @@ def check(username, api_key, just_errors, ignore_activities, mapping_path=None):
@click.option("--after", help="Only show time entries after this date")
@click.option("--show-missing", help="Just report on the missing entries", is_flag=True)
@click.argument("username")
def _import(
def _csv(
username,
mapping_path=None,
output=None,
@ -637,6 +638,103 @@ def _import(
output_file.close()
@kimai.command("import")
@click.argument("search")
@click.argument("after")
@click.argument("before")
def _import(search, after, before):
api = KimaiAPI()
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(after, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(before, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
click.secho(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity}",
fg="red",
)
has_errors = True
continue
if f.imports.count() > 0:
click.secho(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)",
fg="yellow",
)
continue
if has_errors:
sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description
if f.description != ""
else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
)
r = t.upload().json()
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')
@kimai.group("db")
def db_():
pass

View File

@ -1,6 +1,7 @@
from textual.app import App
from .db import db
from .kimaiapi import KimaiAPI
from .screens.hamster import HamsterScreen
from .screens.kimai import KimaiScreen
@ -8,12 +9,21 @@ from .screens.kimai import KimaiScreen
class HamsterToolsApp(App):
CSS_PATH = "app.tcss"
BINDINGS = [
("h", "switch_mode('hamster')", "Hamster"),
("k", "switch_mode('kimai')", "Kimai"),
("q", "quit", "Quit"),
]
api_ = None
@property
def api(self) -> KimaiAPI:
if self.api_ is None:
self.api_ = KimaiAPI()
return self.api_
def __init__(self):
self.MODES = {
"hamster": HamsterScreen(),

View File

@ -21,16 +21,22 @@ class KimaiAPI(object):
auth_headers = {"X-AUTH-USER": KIMAI_USERNAME, "X-AUTH-TOKEN": KIMAI_API_KEY}
def __init__(self):
requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
# requests_cache.install_cache("kimai", backend="sqlite", expire_after=1800)
self.customers_json = self.get("customers", {"visible": 3})
self.projects_json = self.get("projects", {"visible": 3, "ignoreDates": 1})
self.activities_json = self.get("activities", {"visible": 3})
self.user_json = self.get("users/me")
def get(self, endpoint, params=None):
return requests.get(
result = requests.get(
f"{self.KIMAI_API_URL}/{endpoint}", params=params, headers=self.auth_headers
).json()
try:
if result["code"] != 200:
raise NotFound()
except (KeyError, TypeError):
pass
return result
def post(self, endpoint, data):
return requests.post(
@ -133,7 +139,7 @@ class Activity(BaseAPI):
api,
a["id"],
a["name"],
Project.get_by_id(api, a["project"]),
Project.get_by_id(api, a["project"], none),
a["visible"],
)
if not none:
@ -169,6 +175,23 @@ class Timesheet(BaseAPI):
)
]
@staticmethod
def list_by(api, **kwargs):
kwargs["size"] = 10000
return [
Timesheet(
api,
Activity.get_by_id(api, t["activity"], none=True),
Project.get_by_id(api, t["project"], none=True),
t["begin"],
t["end"],
t["id"],
t["description"],
t["tags"],
)
for t in api.get("timesheets", params=kwargs)
]
@staticmethod
def get_by_id(api, id, none=False):
t = api.get(

View File

@ -1,4 +1,5 @@
from textual.app import ComposeResult
from textual.coordinate import Coordinate
from textual.binding import Binding
from textual.screen import Screen
from textual.widgets import DataTable, TabbedContent, TabPane, Header, Footer
@ -12,6 +13,7 @@ from ..db import (
KimaiCustomer,
KimaiActivity,
)
from ..kimaiapi import Timesheet as KimaiAPITimesheet
from .list import ListPane
@ -89,6 +91,7 @@ class KimaiActivityList(ListPane):
("s", "sort", "Sort"),
("r", "refresh", "Refresh"),
("g", "get", "Get data"),
("#", "count", "Count"),
("/", "filter", "Search"),
Binding(key="escape", action="cancelfilter", show=False),
]
@ -122,6 +125,7 @@ class KimaiActivityList(ListPane):
activity.id,
truncate(activity.name, 40),
activity.visible,
"?",
]
for activity in activities
]
@ -144,10 +148,20 @@ class KimaiActivityList(ListPane):
"id",
"name",
"visible",
"times",
)
self.sort = (self.columns[1], self.columns[3])
self._refresh()
def action_count(self) -> None:
row_idx: int = self.table.cursor_row
row_cells = self.table.get_row_at(row_idx)
activity_id = row_cells[2]
count = len(KimaiAPITimesheet.list_by(self.app.api, activity=activity_id))
self.table.update_cell_at(Coordinate(row_idx, 5), count)
class KimaiScreen(Screen):
BINDINGS = [

View File

@ -1,107 +0,0 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from datetime import datetime
from peewee import JOIN
from hamstertools.db import (
HamsterCategory,
HamsterActivity,
HamsterFact,
HamsterFactKimaiImport,
)
from hamstertools.kimaiapi import KimaiAPI, Timesheet, Project, Activity
api = KimaiAPI()
DATE_FROM = "2023-10-01"
DATE_TO = "2023-11-01"
SEARCH = "auto"
facts = (
HamsterFact.select(HamsterFact, HamsterActivity, HamsterCategory)
.join(HamsterActivity, JOIN.LEFT_OUTER)
.join(HamsterCategory, JOIN.LEFT_OUTER)
.where(
(HamsterFact.start_time > datetime.strptime(DATE_FROM, "%Y-%m-%d"))
& (HamsterFact.start_time < datetime.strptime(DATE_TO, "%Y-%m-%d"))
& HamsterCategory.name.contains(SEARCH)
)
)
has_errors = False
# check data
for f in facts:
mappings = f.activity.mappings
if len(mappings) == 0:
print(
f"fact {f.id}: @{f.activity.category.id} {f.activity.category.name} » @{f.activity.id} {f.activity.name} has no mapping"
)
has_errors = True
continue
if len(mappings) > 1:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} has multiple mappings"
)
has_errors = True
continue
if (
mappings[0].kimai_activity.project is None
and not mappings[0].kimai_project.allow_global_activities
):
print(
f"fact {f.id}: project @{mappings[0].kimai_project.id} {mappings[0].kimai_project.name} does not allow global activity {mappings[0].kimai_activity}"
)
has_errors = True
continue
if f.imports.count() > 0:
print(
f"fact {f.id}: activity @{f.activity.id} {f.activity.name} was already imported {f.imports.count()} time(s)"
)
has_errors = True
continue
# if has_errors:
# sys.exit(1)
# upload data
for f in facts:
try:
mapping = f.activity.mappings[0]
except IndexError:
print(
f"no mapping, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
if f.imports.count() > 0:
print(
f"already imported, skipping {f.id} ({f.activity.category.name} » {f.activity.name})"
)
continue
t = Timesheet(
api,
activity=mapping.kimai_activity,
project=mapping.kimai_project,
begin=f.start_time,
end=f.end_time,
description=f.description if f.description != "" else mapping.kimai_description,
# tags=f.tags if f.tags != '' else mapping.kimai_tags
)
r = t.upload().json()
if r.get("code", 200) != 200:
print(r)
print(f"{f.id} ({f.activity.category.name} » {f.activity.name})")
from pdb import set_trace
set_trace()
else:
HamsterFactKimaiImport.create(hamster_fact=f, kimai_id=r["id"]).save()
print(f'Created Kimai timesheet {r["id"]}')