forked from ruangrupa/konfluks
162 lines
5.6 KiB
Python
162 lines
5.6 KiB
Python
import ast
|
|
import datetime
|
|
import json
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from slugify import slugify
|
|
|
|
import arrow
|
|
import jinja2
|
|
import peertube
|
|
import requests
|
|
|
|
host = "https://tv.lumbung.space"
|
|
configuration = peertube.Configuration(host=host + "/api/v1")
|
|
client = peertube.ApiClient(configuration)
|
|
|
|
# jinja filters & config
|
|
def duration(n):
|
|
"""
|
|
convert '6655' in '1:50:55'
|
|
|
|
"""
|
|
return str(datetime.timedelta(seconds=n))
|
|
|
|
|
|
def linebreaks(text):
|
|
if not text:
|
|
return text
|
|
else:
|
|
import re
|
|
|
|
br = re.compile(r"(\r\n|\r|\n)")
|
|
return br.sub(r"<br />\n", text)
|
|
|
|
|
|
def create_post(post_directory, video_metadata, host):
|
|
global client # lazy
|
|
|
|
if not os.path.exists(post_directory):
|
|
os.mkdir(post_directory)
|
|
|
|
preview_image = video_metadata["preview_path"].split("/")[-1]
|
|
|
|
if not os.path.exists(os.path.join(post_directory, preview_image)):
|
|
# download preview image
|
|
response = requests.get(host + video_metadata["preview_path"], stream=True)
|
|
with open(os.path.join(post_directory, preview_image), "wb") as img_file:
|
|
shutil.copyfileobj(response.raw, img_file)
|
|
print("Downloaded cover image")
|
|
|
|
# replace the truncated description with the full video description
|
|
# peertube api is some broken thing in between a py dict and a json file
|
|
api_response = peertube.VideoApi(client).videos_id_description_get(
|
|
video_metadata["uuid"]
|
|
)
|
|
long_description = ast.literal_eval(api_response)
|
|
video_metadata["description"] = long_description["description"]
|
|
|
|
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
|
env.filters["duration"] = duration
|
|
env.filters["linebreaks"] = linebreaks
|
|
template = env.get_template("video.md")
|
|
|
|
with open(os.path.join(post_directory, "index.md"), "w") as f:
|
|
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
|
|
f.write(post)
|
|
|
|
with open(os.path.join(post_directory, ".timestamp"), "w") as f:
|
|
timestamp = arrow.get(video_metadata["updated_at"])
|
|
f.write(timestamp.format("X"))
|
|
|
|
|
|
def update_post(post_directory, video_metadata, host):
|
|
if os.path.exists(post_directory):
|
|
if os.path.exists(os.path.join(post_directory, ".timestamp")):
|
|
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
|
|
|
|
# FIXME: this is ugly but I need to do this because arrow removes miliseconds
|
|
current_timestamp = arrow.get(video_metadata["updated_at"])
|
|
current_timestamp = arrow.get(current_timestamp.format("X"))
|
|
|
|
if current_timestamp > arrow.get(old_timestamp):
|
|
print(
|
|
"Updating",
|
|
video_metadata["name"],
|
|
"({})".format(video_metadata["uuid"]),
|
|
)
|
|
create_post(post_directory, video_metadata, host)
|
|
else:
|
|
print(
|
|
"Video current: ",
|
|
video_metadata["name"],
|
|
"({})".format(video_metadata["uuid"]),
|
|
)
|
|
else:
|
|
# compat for when there is no timestamp yet..
|
|
create_post(post_directory, video_metadata, host)
|
|
|
|
def main():
|
|
v = peertube.VideoApi(client)
|
|
count = 100
|
|
page = 0
|
|
try:
|
|
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
|
videos = response.to_dict()
|
|
total = videos['total']
|
|
videos = videos['data']
|
|
total -= count
|
|
if total > 0:
|
|
to_download = total // count
|
|
last_page = total % count
|
|
for i in range(to_download):
|
|
page += 1
|
|
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
|
videos += response.to_dict()['data']
|
|
if last_page > 0:
|
|
page += 1
|
|
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
|
videos += response.to_dict()['data'][-1*last_page:]
|
|
|
|
|
|
output_dir = os.environ.get("OUTPUT_DIR")
|
|
|
|
if not os.path.exists(output_dir):
|
|
os.mkdir(output_dir)
|
|
|
|
existing_posts = os.listdir(output_dir)
|
|
|
|
for video_metadata in videos:
|
|
post_name = slugify(video_metadata["name"]) + "-" + video_metadata["uuid"]
|
|
post_dir = os.path.join(output_dir, post_name)
|
|
|
|
if (
|
|
post_name not in existing_posts
|
|
): # if there is a video we dont already have, make it
|
|
print(
|
|
"New: ", video_metadata["name"], "({})".format(video_metadata["uuid"])
|
|
)
|
|
create_post(post_dir, video_metadata, host)
|
|
|
|
elif (
|
|
post_name in existing_posts
|
|
): # if we already have the video do nothing, possibly update
|
|
update_post(post_dir, video_metadata, host)
|
|
existing_posts.remove(
|
|
post_name
|
|
) # create list of posts which have not been returned by peertube
|
|
|
|
except:
|
|
print("didn't get a response from peertube, instance might have been taken down or made private. removing all posts.")
|
|
output_dir = os.environ.get("OUTPUT_DIR")
|
|
if not os.path.exists(output_dir):
|
|
os.mkdir(output_dir)
|
|
existing_posts = os.listdir(output_dir)
|
|
|
|
for post in existing_posts:
|
|
print("deleted", post) # rm posts not returned
|
|
shutil.rmtree(os.path.join(output_dir, post))
|
|
|