forked from ruangrupa/konfluks
shuffle paths, libs, templates
This commit is contained in:
207
lumbunglib/calendar.py
Normal file
207
lumbunglib/calendar.py
Normal file
@ -0,0 +1,207 @@
|
||||
#!/bin/python3
|
||||
|
||||
# lumbung.space calendar feed generator
|
||||
# © 2021 roel roscam abbing gplv3 etc
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import arrow
|
||||
import jinja2
|
||||
import requests
|
||||
from ics import Calendar
|
||||
from natural import date
|
||||
from slugify import slugify
|
||||
|
||||
# a publicly accessible ICS calendar
|
||||
calendar_url = os.environ.get("CALENDAR_URL", "")
|
||||
|
||||
# your Hugo content directory
|
||||
output_dir = os.environ.get("OUTPUT_DIR", "")
|
||||
|
||||
cal = Calendar(requests.get(calendar_url).text)
|
||||
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
template = env.get_template("event_template.md")
|
||||
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
|
||||
def findURLs(string):
|
||||
"""
|
||||
return all URLs in a given string
|
||||
"""
|
||||
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
|
||||
url = re.findall(regex, string)
|
||||
return [x[0] for x in url]
|
||||
|
||||
|
||||
def find_imageURLS(string):
|
||||
"""
|
||||
return all image URLS in a given string
|
||||
"""
|
||||
regex = r"(?:http\:|https\:)?\/\/.*?\.(?:png|jpg|jpeg|gif|svg)"
|
||||
|
||||
img_urls = re.findall(regex, string, flags=re.IGNORECASE)
|
||||
return img_urls
|
||||
|
||||
|
||||
def create_metadata(event):
|
||||
"""
|
||||
construct a formatted dict of event metadata for use as frontmatter for HUGO post
|
||||
"""
|
||||
|
||||
if event.location:
|
||||
location_urls = findURLs(event.location)
|
||||
|
||||
if location_urls:
|
||||
location_url = location_urls[0]
|
||||
event.location = "[{}]({})".format(
|
||||
urlparse(location_url).netloc, location_url
|
||||
)
|
||||
|
||||
event_metadata = {
|
||||
"name": event.name,
|
||||
"created": event.created.format(),
|
||||
"description": event.description,
|
||||
"localized_begin": " ".join(
|
||||
localize_time(event.begin)
|
||||
), # non-breaking space characters to defeat markdown
|
||||
"begin": event.begin.format(),
|
||||
"end": event.end.format(),
|
||||
"duration": date.compress(event.duration),
|
||||
"location": event.location,
|
||||
"uid": event.uid,
|
||||
"images": find_imageURLS(event.description), # currently not used in template
|
||||
}
|
||||
|
||||
return event_metadata
|
||||
|
||||
|
||||
def localize_time(date):
|
||||
"""
|
||||
Turn a given date into various timezones
|
||||
Takes arrow objects
|
||||
"""
|
||||
|
||||
# 3 PM Kassel, Germany, 4 PM Ramallah/Jerusalem, Palestina (QoF),
|
||||
# 8 AM Bogota, Colombia (MaMa), 8 PM Jakarta, Indonesia (Gudskul),
|
||||
# 1 PM (+1day) Wellington, New Zealand (Fafswag), 9 AM Havana, Cuba (Instar).
|
||||
|
||||
tzs = [
|
||||
("Kassel", "Europe/Berlin"),
|
||||
("Bamako", "Europe/London"),
|
||||
("Palestine", "Asia/Jerusalem"),
|
||||
("Bogota", "America/Bogota"),
|
||||
("Jakarta", "Asia/Jakarta"),
|
||||
("Makassar", "Asia/Makassar"),
|
||||
("Wellington", "Pacific/Auckland"),
|
||||
]
|
||||
|
||||
localized_begins = []
|
||||
for location, tz in tzs:
|
||||
localized_begins.append( # javascript formatting because of string creation from hell
|
||||
"__{}__ {}".format(
|
||||
str(location), str(date.to(tz).format("YYYY-MM-DD __HH:mm__"))
|
||||
)
|
||||
)
|
||||
return localized_begins
|
||||
|
||||
|
||||
def create_event_post(post_dir, event):
|
||||
"""
|
||||
Create HUGO post based on calendar event metadata
|
||||
Searches for image URLS in description and downloads them
|
||||
Function is also called when post is in need of updating
|
||||
In that case it will also delete images no longer in metadata
|
||||
TODO: split this up into more functions for legibility
|
||||
"""
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.mkdir(post_dir)
|
||||
|
||||
event_metadata = create_metadata(event)
|
||||
|
||||
# list already existing images
|
||||
# so we can later delete them if we dont find them in the event metadata anymore
|
||||
existing_images = os.listdir(post_dir)
|
||||
try:
|
||||
existing_images.remove("index.md")
|
||||
existing_images.remove(".timestamp")
|
||||
except:
|
||||
pass
|
||||
|
||||
for img in event_metadata["images"]:
|
||||
|
||||
# parse img url to safe local image name
|
||||
img_name = img.split("/")[-1]
|
||||
fn, ext = img_name.split(".")
|
||||
img_name = slugify(fn) + "." + ext
|
||||
|
||||
local_image = os.path.join(post_dir, img_name)
|
||||
|
||||
if not os.path.exists(local_image):
|
||||
# download preview image
|
||||
response = requests.get(img, stream=True)
|
||||
with open(local_image, "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print('Downloaded image for event "{}"'.format(event.name))
|
||||
event_metadata["description"] = event_metadata["description"].replace(
|
||||
img, "".format(img_name)
|
||||
)
|
||||
if img_name in existing_images:
|
||||
existing_images.remove(img_name)
|
||||
|
||||
for left_over_image in existing_images:
|
||||
# remove images we found, but which are no longer in remote event
|
||||
os.remove(os.path.join(post_dir, left_over_image))
|
||||
print("deleted image", left_over_image)
|
||||
|
||||
with open(os.path.join(post_dir, "index.md"), "w") as f:
|
||||
post = template.render(event=event_metadata)
|
||||
f.write(post)
|
||||
print("created post for", event.name, "({})".format(event.uid))
|
||||
|
||||
with open(os.path.join(post_dir, ".timestamp"), "w") as f:
|
||||
f.write(event_metadata["created"])
|
||||
|
||||
|
||||
def update_event_post(post_dir, event):
|
||||
"""
|
||||
Update a post based on the VCARD event 'created' field which changes when updated
|
||||
"""
|
||||
if os.path.exists(post_dir):
|
||||
old_timestamp = open(os.path.join(post_dir, ".timestamp")).read()
|
||||
if event.created > arrow.get(old_timestamp):
|
||||
print("Updating", event.name, "({})".format(event.uid))
|
||||
create_event_post(post_dir, event)
|
||||
else:
|
||||
print("Event current: ", event.name, "({})".format(event.uid))
|
||||
|
||||
|
||||
for event in list(cal.events):
|
||||
|
||||
post_dir = os.path.join(output_dir, event.uid)
|
||||
|
||||
if event.uid not in existing_posts:
|
||||
# if there is an event we dont already have, make it
|
||||
create_event_post(post_dir, event)
|
||||
|
||||
elif event.uid in existing_posts:
|
||||
# if we already have it, update
|
||||
update_event_post(post_dir, event)
|
||||
existing_posts.remove(
|
||||
event.uid
|
||||
) # create list of posts which have not been returned by the calendar
|
||||
|
||||
|
||||
for post in existing_posts:
|
||||
# remove events not returned by the calendar (deletion)
|
||||
print("deleted", post)
|
||||
shutil.rmtree(os.path.join(output_dir, post))
|
254
lumbunglib/feed.py
Normal file
254
lumbunglib/feed.py
Normal file
@ -0,0 +1,254 @@
|
||||
#!/bin/python3
|
||||
|
||||
# lumbung.space rss feed aggregator
|
||||
# © 2021 roel roscam abbing gplv3 etc
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from ast import literal_eval as make_tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import arrow
|
||||
import feedparser
|
||||
import jinja2
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from slugify import slugify
|
||||
|
||||
|
||||
def write_etag(feed_name, feed_data):
|
||||
"""
|
||||
save timestamp of when feed was last modified
|
||||
"""
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if "etag" in feed_data:
|
||||
etag = feed_data.etag
|
||||
if "modified" in feed_data:
|
||||
modified = feed_data.modified
|
||||
|
||||
if etag or modified:
|
||||
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
|
||||
f.write(str((etag, modified)))
|
||||
|
||||
|
||||
def get_etag(feed_name):
|
||||
"""
|
||||
return timestamp of when feed was last modified
|
||||
"""
|
||||
fn = os.path.join("etags", feed_name + ".txt")
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if os.path.exists(fn):
|
||||
etag, modified = make_tuple(open(fn, "r").read())
|
||||
|
||||
return etag, modified
|
||||
|
||||
|
||||
def create_frontmatter(entry):
|
||||
"""
|
||||
parse RSS metadata and return as frontmatter
|
||||
"""
|
||||
if "published" in entry:
|
||||
published = entry.published_parsed
|
||||
if "updated" in entry:
|
||||
published = entry.updated_parsed
|
||||
|
||||
published = arrow.get(published)
|
||||
|
||||
if "author" in entry:
|
||||
author = entry.author
|
||||
else:
|
||||
author = ""
|
||||
|
||||
tags = []
|
||||
if "tags" in entry:
|
||||
# TODO finish categories
|
||||
for t in entry.tags:
|
||||
tags.append(t["term"])
|
||||
|
||||
frontmatter = {
|
||||
"title": entry.title,
|
||||
"date": published.format(),
|
||||
"summary": "",
|
||||
"author": author,
|
||||
"original_link": entry.link,
|
||||
"feed_name": entry["feed_name"],
|
||||
"tags": str(tags),
|
||||
}
|
||||
|
||||
return frontmatter
|
||||
|
||||
|
||||
def create_post(post_dir, entry):
|
||||
"""
|
||||
write hugo post based on RSS entry
|
||||
"""
|
||||
frontmatter = create_frontmatter(entry)
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.makedirs(post_dir)
|
||||
|
||||
if "content" in entry:
|
||||
post_content = entry.content[0].value
|
||||
else:
|
||||
post_content = entry.summary
|
||||
|
||||
parsed_content = parse_posts(post_dir, post_content)
|
||||
|
||||
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
|
||||
post = template.render(frontmatter=frontmatter, content=parsed_content)
|
||||
f.write(post)
|
||||
print("created post for", entry.title, "({})".format(entry.link))
|
||||
|
||||
|
||||
def grab_media(post_directory, url):
|
||||
"""
|
||||
download media linked in post to have local copy
|
||||
if download succeeds return new local path otherwise return url
|
||||
"""
|
||||
image = urlparse(url).path.split("/")[-1]
|
||||
|
||||
try:
|
||||
if not os.path.exists(os.path.join(post_directory, image)):
|
||||
# TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
|
||||
response = requests.get(url, stream=True)
|
||||
if response.ok:
|
||||
with open(os.path.join(post_directory, image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image", image)
|
||||
return image
|
||||
return image
|
||||
elif os.path.exists(os.path.join(post_directory, image)):
|
||||
return image
|
||||
|
||||
except Exception as e:
|
||||
print("Failed to download image", url)
|
||||
print(e)
|
||||
return url
|
||||
|
||||
|
||||
def parse_posts(post_dir, post_content):
|
||||
"""
|
||||
parse the post content to for media items
|
||||
replace foreign image with local copy
|
||||
filter out iframe sources not in allowlist
|
||||
"""
|
||||
soup = BeautifulSoup(post_content, "html.parser")
|
||||
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
|
||||
media = []
|
||||
|
||||
for img in soup(["img", "object"]):
|
||||
local_image = grab_media(post_dir, img["src"])
|
||||
if img["src"] != local_image:
|
||||
img["src"] = local_image
|
||||
|
||||
for iframe in soup(["iframe"]):
|
||||
if not any(source in iframe["src"] for source in allowed_iframe_sources):
|
||||
print("filtered iframe: {}...".format(iframe["src"][:25]))
|
||||
iframe.decompose()
|
||||
return soup.decode()
|
||||
|
||||
|
||||
def grab_feed(feed_url):
|
||||
"""
|
||||
check whether feed has been updated
|
||||
download & return it if it has
|
||||
"""
|
||||
feed_name = urlparse(feed_url).netloc
|
||||
|
||||
etag, modified = get_etag(feed_name)
|
||||
|
||||
try:
|
||||
if modified:
|
||||
data = feedparser.parse(feed_url, modified=modified)
|
||||
elif etag:
|
||||
data = feedparser.parse(feed_url, etag=etag)
|
||||
else:
|
||||
data = feedparser.parse(feed_url)
|
||||
except Exception as e:
|
||||
print("Error grabbing feed")
|
||||
print(feed_name)
|
||||
print(e)
|
||||
return False
|
||||
|
||||
print(data.status, feed_url)
|
||||
if data.status == 200:
|
||||
# 304 means the feed has not been modified since we last checked
|
||||
write_etag(feed_name, data)
|
||||
return data
|
||||
return False
|
||||
|
||||
|
||||
feed_urls = open("feeds_list.txt", "r").read().splitlines()
|
||||
|
||||
start = time.time()
|
||||
|
||||
if not os.path.exists("etags"):
|
||||
os.mkdir("etags")
|
||||
|
||||
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
|
||||
|
||||
output_dir = os.environ.get(
|
||||
"OUTPUT_DIR", "/home/r/Programming/lumbung.space/lumbung.space-web/content/posts/"
|
||||
)
|
||||
# output_dir = os.environ.get('OUTPUT_DIR', 'network/')
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
template = env.get_template("post_template.md")
|
||||
|
||||
# add iframe to the allowlist of feedparser's sanitizer,
|
||||
# this is now handled in parse_post()
|
||||
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
|
||||
|
||||
for feed_url in feed_urls:
|
||||
|
||||
feed_name = urlparse(feed_url).netloc
|
||||
|
||||
feed_dir = os.path.join(output_dir, feed_name)
|
||||
|
||||
if not os.path.exists(feed_dir):
|
||||
os.makedirs(feed_dir)
|
||||
|
||||
existing_posts = os.listdir(feed_dir)
|
||||
|
||||
data = grab_feed(feed_url)
|
||||
|
||||
if data:
|
||||
for entry in data.entries:
|
||||
# if 'tags' in entry:
|
||||
# for tag in entry.tags:
|
||||
# for x in ['lumbung.space', 'D15', 'lumbung']:
|
||||
# if x in tag['term']:
|
||||
# print(entry.title)
|
||||
entry["feed_name"] = feed_name
|
||||
|
||||
post_name = slugify(entry.title)
|
||||
post_dir = os.path.join(output_dir, feed_name, post_name)
|
||||
|
||||
if post_name not in existing_posts:
|
||||
# if there is a blog entry we dont already have, make it
|
||||
create_post(post_dir, entry)
|
||||
|
||||
elif post_name in existing_posts:
|
||||
# if we already have it, update it
|
||||
create_post(post_dir, entry)
|
||||
existing_posts.remove(
|
||||
post_name
|
||||
) # create list of posts which have not been returned by the feed
|
||||
|
||||
for post in existing_posts:
|
||||
# remove blog posts no longer returned by the RSS feed
|
||||
print("deleted", post)
|
||||
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
|
||||
|
||||
|
||||
end = time.time()
|
||||
|
||||
print(end - start)
|
160
lumbunglib/hashtag.py
Normal file
160
lumbunglib/hashtag.py
Normal file
@ -0,0 +1,160 @@
|
||||
# lumbung.space hashtag publishing bot
|
||||
# © 2021 roel roscam abbing agplv3
|
||||
# Makes Hugo posts out of hashtag feeds on Mastodon.
|
||||
# Requires an account on the Mastodon instance configured.
|
||||
# Currently does not do any thread recreation and only handles images
|
||||
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import jinja2
|
||||
import requests
|
||||
from mastodon import Mastodon
|
||||
|
||||
# Which instance to login to
|
||||
instance = "https://social.lumbung.space"
|
||||
|
||||
# n.b. if it is the first time you use this script
|
||||
# You need to register the app:
|
||||
# https://mastodonpy.readthedocs.io/en/stable/#module-mastodon
|
||||
|
||||
# Login credentials for bot account
|
||||
email = ""
|
||||
password = ""
|
||||
|
||||
# Which hashtags to publish
|
||||
hashtags = ["jalansesama"]
|
||||
|
||||
# your Hugo content directory
|
||||
output_dir = os.environ.get("OUTPUT_DIR", "path/to/hugo/content")
|
||||
|
||||
|
||||
def login_mastodon_bot():
|
||||
mastodon = Mastodon(
|
||||
client_id="publishbot_clientcred.secret",
|
||||
api_base_url=instance,
|
||||
)
|
||||
|
||||
mastodon.log_in(
|
||||
email,
|
||||
password,
|
||||
to_file="publishbot_usercred.secret",
|
||||
scopes=["read"],
|
||||
)
|
||||
|
||||
return mastodon
|
||||
|
||||
|
||||
def create_frontmatter(post_metadata):
|
||||
"""
|
||||
Parse post metadata and return it as HUGO frontmatter
|
||||
"""
|
||||
|
||||
frontmatter = ""
|
||||
return frontmatter
|
||||
|
||||
|
||||
def download_media(post_directory, media_attachments):
|
||||
"""
|
||||
Download media attached to posts. N.b. currently only images
|
||||
See: https://mastodonpy.readthedocs.io/en/stable/#media-dicts
|
||||
"""
|
||||
|
||||
for item in media_attachments:
|
||||
if item["type"] == "image":
|
||||
image = localize_media_url(item["url"])
|
||||
# TODO check whether this needs to handle delete & redraft with different images
|
||||
if not os.path.exists(os.path.join(post_directory, image)):
|
||||
# download image
|
||||
response = requests.get(item["url"], stream=True)
|
||||
with open(os.path.join(post_directory, image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image", image)
|
||||
|
||||
|
||||
def create_post(post_directory, post_metadata):
|
||||
"""
|
||||
Create Hugo posts based on Toots/posts retuned in timeline.
|
||||
See: https://mastodonpy.readthedocs.io/en/stable/#toot-dicts
|
||||
"""
|
||||
|
||||
if not os.path.exists(post_directory):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
with open(os.path.join(post_directory, "index.html"), "w") as f:
|
||||
post = template.render(post_metadata=post_metadata)
|
||||
f.write(post)
|
||||
|
||||
download_media(post_directory, post_metadata["media_attachments"])
|
||||
|
||||
|
||||
def localize_media_url(url):
|
||||
"""
|
||||
Returns the filename, used also as custom jinja filter
|
||||
"""
|
||||
return url.split("/")[-1]
|
||||
|
||||
|
||||
def filter_mastodon_urls(content):
|
||||
"""
|
||||
Filters out Mastodon generated URLS for tags
|
||||
e.g. <a href="https://social.lumbung.space/tags/jalankita" class="mention hashtag" rel="tag">
|
||||
Used also as custom jinja filter
|
||||
"""
|
||||
# TODO
|
||||
return content
|
||||
|
||||
|
||||
mastodon = login_mastodon_bot()
|
||||
|
||||
output_dir = output_dir
|
||||
|
||||
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
|
||||
|
||||
env.filters["localize_media_url"] = localize_media_url
|
||||
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
|
||||
|
||||
template = env.get_template("post_template.md")
|
||||
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
|
||||
for hashtag in hashtags:
|
||||
|
||||
hashtag_dir = os.path.join(output_dir, hashtag)
|
||||
if not os.path.exists(hashtag_dir):
|
||||
os.mkdir(hashtag_dir)
|
||||
|
||||
existing_posts = os.listdir(hashtag_dir) # list all existing posts
|
||||
|
||||
timeline = mastodon.timeline_hashtag(
|
||||
hashtag, local=True, only_media=True
|
||||
) # returns max 20 queries and only with media
|
||||
timeline = mastodon.fetch_remaining(
|
||||
timeline
|
||||
) # returns all the rest n.b. can take a while because of rate limit
|
||||
|
||||
for post_metadata in timeline:
|
||||
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
|
||||
|
||||
# if there is a post in the feed we dont already have locally, make it
|
||||
if str(post_metadata["id"]) not in existing_posts:
|
||||
|
||||
if not post_metadata[
|
||||
"local_only"
|
||||
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
|
||||
create_post(post_dir, post_metadata)
|
||||
|
||||
# if we already have the post do nothing, possibly update
|
||||
elif str(post_metadata["id"]) in existing_posts:
|
||||
# update_post(post_dir, post_metadata)
|
||||
existing_posts.remove(
|
||||
str(post_metadata["id"])
|
||||
) # create list of posts which have not been returned in the feed
|
||||
|
||||
for post in existing_posts:
|
||||
print("deleted", post) # rm posts that exist but are no longer returned in feed
|
||||
shutil.rmtree(os.path.join(hashtag_dir, post))
|
20
lumbunglib/templates/calendar.md
Normal file
20
lumbunglib/templates/calendar.md
Normal file
@ -0,0 +1,20 @@
|
||||
---
|
||||
title: "{{ event.name }}"
|
||||
date: "{{ event.begin }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
categories: "calendar"
|
||||
event_begin: "{{ event.begin }}"
|
||||
event_end: "{{ event.end }}"
|
||||
duration: "{{ event.duration }}"
|
||||
localized_begin: "{{ event.localized_begin }}"
|
||||
uid: "{{ event.uid }}"
|
||||
{% if event.location %}
|
||||
location: "{{ event.location }}"
|
||||
{% endif %}
|
||||
---
|
||||
|
||||
{% if event.description %}
|
||||
|
||||
{{ event.description }}
|
||||
|
||||
{% endif %}
|
13
lumbunglib/templates/feed.md
Normal file
13
lumbunglib/templates/feed.md
Normal file
@ -0,0 +1,13 @@
|
||||
---
|
||||
title: "{{ frontmatter.title }}"
|
||||
date: "{{ frontmatter.date }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
summary: "{{ frontmatter.summary }}"
|
||||
author: "{{ frontmatter.author }}"
|
||||
original_link: "{{ frontmatter.original_link }}"
|
||||
feed_name: "{{ frontmatter.feed_name}}"
|
||||
categories: ["network", "{{ frontmatter.feed_name}}"]
|
||||
tags: { { frontmatter.tags } }
|
||||
---
|
||||
|
||||
{{ content }}
|
14
lumbunglib/templates/hashtag.md
Normal file
14
lumbunglib/templates/hashtag.md
Normal file
@ -0,0 +1,14 @@
|
||||
---
|
||||
date: "{{ post_metadata.created_at }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
author: "{{ post_metadata.account.display_name }}"
|
||||
avatar: "{{ post_metadata.account.avatar }}"
|
||||
categories: ["shouts"]
|
||||
tags: [{% for i in post_metadata.tags %} "{{ i.name }}", {% endfor %}]
|
||||
---
|
||||
|
||||
{% for item in post_metadata.media_attachments %}
|
||||
<img src="{{item.url | localize_media_url }}" alt="{{item.description}}">
|
||||
{% endfor %}
|
||||
|
||||
{{ post_metadata.content | filter_mastodon_urls }}
|
14
lumbunglib/templates/video.md
Normal file
14
lumbunglib/templates/video.md
Normal file
@ -0,0 +1,14 @@
|
||||
---
|
||||
title: "{{ v.name }}"
|
||||
date: "{{ v.published_at }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
uuid: "{{v.uuid}}"
|
||||
video_duration: "{{ v.duration | duration }} "
|
||||
video_channel: "{{ v.channel.display_name }}"
|
||||
channel_url: "{{ v.channel.url }}"
|
||||
preview_image: "{{ preview_image }}"
|
||||
categories: ["tv","{{ v.channel.display_name }}"]
|
||||
is_live: {{ v.is_live }}
|
||||
---
|
||||
|
||||
{{ v.description }}
|
144
lumbunglib/video.py
Normal file
144
lumbunglib/video.py
Normal file
@ -0,0 +1,144 @@
|
||||
#!/bin/python3
|
||||
|
||||
# lumbung.space video feed generator
|
||||
# c 2021 roel roscam abbing gpvl3 etc
|
||||
|
||||
import ast
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import arrow
|
||||
import jinja2
|
||||
import peertube
|
||||
import requests
|
||||
|
||||
|
||||
# jinja filters & config
|
||||
def duration(n):
|
||||
"""
|
||||
convert '6655' in '1:50:55'
|
||||
|
||||
"""
|
||||
return str(datetime.timedelta(seconds=n))
|
||||
|
||||
|
||||
def linebreaks(text):
|
||||
if not text:
|
||||
return text
|
||||
else:
|
||||
import re
|
||||
|
||||
br = re.compile(r"(\r\n|\r|\n)")
|
||||
return br.sub(r"<br />\n", text)
|
||||
|
||||
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
|
||||
env.filters["duration"] = duration
|
||||
env.filters["linebreaks"] = linebreaks
|
||||
|
||||
host = "https://tv.lumbung.space"
|
||||
|
||||
configuration = peertube.Configuration(host=host + "/api/v1")
|
||||
|
||||
client = peertube.ApiClient(configuration)
|
||||
|
||||
v = peertube.VideoApi(client)
|
||||
|
||||
response = v.videos_get(count=100, filter="local", tags_one_of="publish")
|
||||
|
||||
videos = response.to_dict()
|
||||
videos = videos["data"]
|
||||
|
||||
|
||||
def create_post(post_directory, video_metadata):
|
||||
global client # lazy
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
preview_image = video_metadata["preview_path"].split("/")[-1]
|
||||
|
||||
if not os.path.exists(os.path.join(post_directory, preview_image)):
|
||||
# download preview image
|
||||
response = requests.get(host + video_metadata["preview_path"], stream=True)
|
||||
with open(os.path.join(post_directory, preview_image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image")
|
||||
|
||||
# replace the truncated description with the full video description
|
||||
# peertube api is some broken thing in between a py dict and a json file
|
||||
api_response = peertube.VideoApi(client).videos_id_description_get(
|
||||
video_metadata["uuid"]
|
||||
)
|
||||
long_description = ast.literal_eval(api_response)
|
||||
video_metadata["description"] = long_description["description"]
|
||||
|
||||
with open(os.path.join(post_directory, "index.md"), "w") as f:
|
||||
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
|
||||
f.write(post)
|
||||
|
||||
with open(os.path.join(post_directory, ".timestamp"), "w") as f:
|
||||
timestamp = arrow.get(video_metadata["updated_at"])
|
||||
f.write(timestamp.format("X"))
|
||||
|
||||
|
||||
def update_post(post_directory, video_metadata):
|
||||
if os.path.exists(post_directory):
|
||||
if os.path.exists(os.path.join(post_directory, ".timestamp")):
|
||||
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
|
||||
|
||||
# FIXME: this is ugly but I need to do this because arrow removes miliseconds
|
||||
current_timestamp = arrow.get(video_metadata["updated_at"])
|
||||
current_timestamp = arrow.get(current_timestamp.format("X"))
|
||||
|
||||
if current_timestamp > arrow.get(old_timestamp):
|
||||
print(
|
||||
"Updating",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
create_post(post_dir, video_metadata)
|
||||
else:
|
||||
print(
|
||||
"Video current: ",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
else:
|
||||
# compat for when there is no timestamp yet..
|
||||
create_post(post_dir, video_metadata)
|
||||
|
||||
|
||||
output_dir = os.environ.get(
|
||||
"OUTPUT_DIR", "/home/r/Programming/lumbung.space/lumbung.space-web/content/video"
|
||||
)
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
template = env.get_template("index_template.md")
|
||||
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
for video_metadata in videos:
|
||||
post_dir = os.path.join(output_dir, video_metadata["uuid"])
|
||||
|
||||
if (
|
||||
video_metadata["uuid"] not in existing_posts
|
||||
): # if there is a video we dont already have, make it
|
||||
print("New: ", video_metadata["name"], "({})".format(video_metadata["uuid"]))
|
||||
create_post(post_dir, video_metadata)
|
||||
|
||||
elif (
|
||||
video_metadata["uuid"] in existing_posts
|
||||
): # if we already have the video do nothing, possibly update
|
||||
update_post(post_dir, video_metadata)
|
||||
existing_posts.remove(
|
||||
video_metadata["uuid"]
|
||||
) # create list of posts which have not been returned by peertube
|
||||
|
||||
for post in existing_posts:
|
||||
print("deleted", post) # rm posts not returned
|
||||
shutil.rmtree(os.path.join(output_dir, post))
|
Reference in New Issue
Block a user