forked from ruangrupa/konfluks
deps and autoformat
This commit is contained in:
@ -1,27 +0,0 @@
|
||||
# video feed prototypes
|
||||
|
||||
These scripts poll a peertube instance to return a list of videos and construct a static page for it using jinja2.
|
||||
|
||||
See it in action on <https://roelof.info/lumbung/>
|
||||
|
||||
## video-feed.py
|
||||
|
||||
Utility that returns Peertube videos tagged as `publish` and turns them in to `hugo` page bundles. Videos no longer tagged as `publish` are deleted.
|
||||
|
||||
### index-template.md
|
||||
|
||||
Jinja2 template of a hugo post for use with the above.
|
||||
|
||||
## streams-feed.py
|
||||
|
||||
Returns only livestreams and displays them differently depending on the tags associated with the video. E.g. audio stream or video stream. WIP.
|
||||
|
||||
### video-feed.html
|
||||
The jinja template for creating video feeds. This is now used in the HUGO theme.
|
||||
|
||||
### video-feed-prototype.html
|
||||
rendered example of above
|
||||
|
||||
|
||||
|
||||
|
@ -9,7 +9,6 @@ channel_url: "{{ v.channel.url }}"
|
||||
preview_image: "{{ preview_image }}"
|
||||
categories: ["tv","{{ v.channel.display_name }}"]
|
||||
is_live: {{ v.is_live }}
|
||||
|
||||
---
|
||||
|
||||
{{ v.description }}
|
||||
|
@ -1,12 +0,0 @@
|
||||
# Automatically generated by https://github.com/damnever/pigar.
|
||||
|
||||
# video_feed/streams-feed.py: 7
|
||||
# video_feed/video-feed.py: 7
|
||||
Jinja2 == 2.10
|
||||
|
||||
# video_feed/streams-feed.py: 6
|
||||
# video_feed/video-feed.py: 6
|
||||
git+https://framagit.org/framasoft/peertube/clients/python.git
|
||||
|
||||
# video_feed/video-feed.py: 12
|
||||
requests == 2.21.0
|
@ -1,131 +1,144 @@
|
||||
#!/bin/python3
|
||||
|
||||
#lumbung.space video feed generator
|
||||
#c 2021 roel roscam abbing gpvl3 etc
|
||||
# lumbung.space video feed generator
|
||||
# c 2021 roel roscam abbing gpvl3 etc
|
||||
|
||||
import peertube
|
||||
import jinja2
|
||||
import ast
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import datetime
|
||||
import shutil
|
||||
import requests
|
||||
import ast
|
||||
|
||||
import arrow
|
||||
import jinja2
|
||||
import peertube
|
||||
import requests
|
||||
|
||||
|
||||
#jinja filters & config
|
||||
# jinja filters & config
|
||||
def duration(n):
|
||||
"""
|
||||
convert '6655' in '1:50:55'
|
||||
|
||||
"""
|
||||
return str(datetime.timedelta(seconds = n))
|
||||
return str(datetime.timedelta(seconds=n))
|
||||
|
||||
|
||||
def linebreaks(text):
|
||||
if not text:
|
||||
return text
|
||||
else:
|
||||
import re
|
||||
|
||||
br = re.compile(r"(\r\n|\r|\n)")
|
||||
return br.sub(r"<br />\n", text)
|
||||
|
||||
|
||||
env = jinja2.Environment(
|
||||
loader=jinja2.FileSystemLoader(os.path.curdir)
|
||||
)
|
||||
env.filters['duration'] = duration
|
||||
env.filters['linebreaks'] = linebreaks
|
||||
|
||||
host = 'https://tv.lumbung.space'
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
|
||||
env.filters["duration"] = duration
|
||||
env.filters["linebreaks"] = linebreaks
|
||||
|
||||
configuration = peertube.Configuration(
|
||||
host = host+"/api/v1"
|
||||
)
|
||||
host = "https://tv.lumbung.space"
|
||||
|
||||
client = peertube.ApiClient(configuration)
|
||||
configuration = peertube.Configuration(host=host + "/api/v1")
|
||||
|
||||
client = peertube.ApiClient(configuration)
|
||||
|
||||
v = peertube.VideoApi(client)
|
||||
|
||||
response = v.videos_get(count=100, filter='local', tags_one_of='publish')
|
||||
response = v.videos_get(count=100, filter="local", tags_one_of="publish")
|
||||
|
||||
videos = response.to_dict()
|
||||
videos = videos['data']
|
||||
videos = videos["data"]
|
||||
|
||||
|
||||
def create_post(post_directory, video_metadata):
|
||||
global client #lazy
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
preview_image = video_metadata['preview_path'].split('/')[-1]
|
||||
|
||||
if not os.path.exists(os.path.join(post_directory, preview_image)):
|
||||
#download preview image
|
||||
response = requests.get(host+video_metadata['preview_path'], stream=True)
|
||||
with open(os.path.join(post_directory, preview_image), 'wb') as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print('Downloaded cover image')
|
||||
global client # lazy
|
||||
|
||||
#replace the truncated description with the full video description
|
||||
#peertube api is some broken thing in between a py dict and a json file
|
||||
api_response = peertube.VideoApi(client).videos_id_description_get(video_metadata['uuid'])
|
||||
long_description = ast.literal_eval(api_response)
|
||||
video_metadata['description'] = long_description['description']
|
||||
if not os.path.exists(post_dir):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
preview_image = video_metadata["preview_path"].split("/")[-1]
|
||||
|
||||
with open(os.path.join(post_directory,'index.md'),'w') as f:
|
||||
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
|
||||
f.write(post)
|
||||
|
||||
if not os.path.exists(os.path.join(post_directory, preview_image)):
|
||||
# download preview image
|
||||
response = requests.get(host + video_metadata["preview_path"], stream=True)
|
||||
with open(os.path.join(post_directory, preview_image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image")
|
||||
|
||||
# replace the truncated description with the full video description
|
||||
# peertube api is some broken thing in between a py dict and a json file
|
||||
api_response = peertube.VideoApi(client).videos_id_description_get(
|
||||
video_metadata["uuid"]
|
||||
)
|
||||
long_description = ast.literal_eval(api_response)
|
||||
video_metadata["description"] = long_description["description"]
|
||||
|
||||
with open(os.path.join(post_directory, "index.md"), "w") as f:
|
||||
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
|
||||
f.write(post)
|
||||
|
||||
with open(os.path.join(post_directory, ".timestamp"), "w") as f:
|
||||
timestamp = arrow.get(video_metadata["updated_at"])
|
||||
f.write(timestamp.format("X"))
|
||||
|
||||
with open(os.path.join(post_directory, '.timestamp'), 'w') as f:
|
||||
timestamp = arrow.get(video_metadata['updated_at'])
|
||||
f.write(timestamp.format('X'))
|
||||
|
||||
def update_post(post_directory, video_metadata):
|
||||
if os.path.exists(post_directory):
|
||||
if os.path.exists(os.path.join(post_directory,'.timestamp')):
|
||||
old_timestamp = open(os.path.join(post_directory,'.timestamp')).read()
|
||||
if os.path.exists(os.path.join(post_directory, ".timestamp")):
|
||||
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
|
||||
|
||||
#FIXME: this is ugly but I need to do this because arrow removes miliseconds
|
||||
current_timestamp = arrow.get(video_metadata['updated_at'])
|
||||
current_timestamp = arrow.get(current_timestamp.format('X'))
|
||||
# FIXME: this is ugly but I need to do this because arrow removes miliseconds
|
||||
current_timestamp = arrow.get(video_metadata["updated_at"])
|
||||
current_timestamp = arrow.get(current_timestamp.format("X"))
|
||||
|
||||
if current_timestamp > arrow.get(old_timestamp):
|
||||
print('Updating', video_metadata['name'], '({})'.format(video_metadata['uuid']))
|
||||
print(
|
||||
"Updating",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
create_post(post_dir, video_metadata)
|
||||
else:
|
||||
print('Video current: ', video_metadata['name'], '({})'.format(video_metadata['uuid']))
|
||||
print(
|
||||
"Video current: ",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
else:
|
||||
#compat for when there is no timestamp yet..
|
||||
# compat for when there is no timestamp yet..
|
||||
create_post(post_dir, video_metadata)
|
||||
|
||||
|
||||
output_dir = os.environ.get('OUTPUT_DIR', '/home/r/Programming/lumbung.space/lumbung.space-web/content/video')
|
||||
output_dir = os.environ.get(
|
||||
"OUTPUT_DIR", "/home/r/Programming/lumbung.space/lumbung.space-web/content/video"
|
||||
)
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
template = env.get_template('index_template.md')
|
||||
template = env.get_template("index_template.md")
|
||||
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
for video_metadata in videos:
|
||||
post_dir = os.path.join(output_dir, video_metadata['uuid'])
|
||||
post_dir = os.path.join(output_dir, video_metadata["uuid"])
|
||||
|
||||
if video_metadata['uuid'] not in existing_posts: #if there is a video we dont already have, make it
|
||||
print('New: ', video_metadata['name'], '({})'.format(video_metadata['uuid']))
|
||||
if (
|
||||
video_metadata["uuid"] not in existing_posts
|
||||
): # if there is a video we dont already have, make it
|
||||
print("New: ", video_metadata["name"], "({})".format(video_metadata["uuid"]))
|
||||
create_post(post_dir, video_metadata)
|
||||
|
||||
elif video_metadata['uuid'] in existing_posts: # if we already have the video do nothing, possibly update
|
||||
elif (
|
||||
video_metadata["uuid"] in existing_posts
|
||||
): # if we already have the video do nothing, possibly update
|
||||
update_post(post_dir, video_metadata)
|
||||
existing_posts.remove(video_metadata['uuid']) # create list of posts which have not been returned by peertube
|
||||
|
||||
for post in existing_posts:
|
||||
print('deleted', post) #rm posts not returned
|
||||
shutil.rmtree(os.path.join(output_dir,post))
|
||||
|
||||
|
||||
existing_posts.remove(
|
||||
video_metadata["uuid"]
|
||||
) # create list of posts which have not been returned by peertube
|
||||
|
||||
for post in existing_posts:
|
||||
print("deleted", post) # rm posts not returned
|
||||
shutil.rmtree(os.path.join(output_dir, post))
|
||||
|
Reference in New Issue
Block a user