import ast import datetime import json import os import shutil from pathlib import Path import arrow import jinja2 import peertube import requests host = "https://tv.lumbung.space" configuration = peertube.Configuration(host=host + "/api/v1") client = peertube.ApiClient(configuration) # jinja filters & config def duration(n): """ convert '6655' in '1:50:55' """ return str(datetime.timedelta(seconds=n)) def linebreaks(text): if not text: return text else: import re br = re.compile(r"(\r\n|\r|\n)") return br.sub(r"
\n", text) def create_post(post_directory, video_metadata, host): global client # lazy if not os.path.exists(post_directory): os.mkdir(post_directory) preview_image = video_metadata["preview_path"].split("/")[-1] if not os.path.exists(os.path.join(post_directory, preview_image)): # download preview image response = requests.get(host + video_metadata["preview_path"], stream=True) with open(os.path.join(post_directory, preview_image), "wb") as img_file: shutil.copyfileobj(response.raw, img_file) print("Downloaded cover image") # replace the truncated description with the full video description # peertube api is some broken thing in between a py dict and a json file api_response = peertube.VideoApi(client).videos_id_description_get( video_metadata["uuid"] ) long_description = ast.literal_eval(api_response) video_metadata["description"] = long_description["description"] template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) env.filters["duration"] = duration env.filters["linebreaks"] = linebreaks template = env.get_template("video.md") with open(os.path.join(post_directory, "index.md"), "w") as f: post = template.render(v=video_metadata, host=host, preview_image=preview_image) f.write(post) with open(os.path.join(post_directory, ".timestamp"), "w") as f: timestamp = arrow.get(video_metadata["updated_at"]) f.write(timestamp.format("X")) def update_post(post_directory, video_metadata, host): if os.path.exists(post_directory): if os.path.exists(os.path.join(post_directory, ".timestamp")): old_timestamp = open(os.path.join(post_directory, ".timestamp")).read() # FIXME: this is ugly but I need to do this because arrow removes miliseconds current_timestamp = arrow.get(video_metadata["updated_at"]) current_timestamp = arrow.get(current_timestamp.format("X")) if current_timestamp > arrow.get(old_timestamp): print( "Updating", video_metadata["name"], "({})".format(video_metadata["uuid"]), ) create_post(post_directory, video_metadata, host) else: print( "Video current: ", video_metadata["name"], "({})".format(video_metadata["uuid"]), ) else: # compat for when there is no timestamp yet.. create_post(post_directory, video_metadata, host) def sanitize_name(name): sanitized = "".join([c if c.isalnum() or c == " " else "-" for c in name]) if len(sanitized) > 40: return sanitized[:40] return sanitized def main(): v = peertube.VideoApi(client) response = v.videos_get(count=100, filter="local", tags_one_of="publish") videos = response.to_dict() videos = videos["data"] output_dir = os.environ.get("OUTPUT_DIR") if not os.path.exists(output_dir): os.mkdir(output_dir) existing_posts = os.listdir(output_dir) for video_metadata in videos: post_dir = os.path.join(output_dir, sanitize_name(video_metadata["name"]) + "-" + video_metadata["uuid"]) if ( video_metadata["uuid"] not in existing_posts ): # if there is a video we dont already have, make it print( "New: ", video_metadata["name"], "({})".format(video_metadata["uuid"]) ) create_post(post_dir, video_metadata, host) elif ( video_metadata["uuid"] in existing_posts ): # if we already have the video do nothing, possibly update update_post(post_dir, video_metadata, host) existing_posts.remove( video_metadata["uuid"] ) # create list of posts which have not been returned by peertube for post in existing_posts: print("deleted", post) # rm posts not returned shutil.rmtree(os.path.join(output_dir, post))