rename project to konfluks for legibility, add docs

This commit is contained in:
rra
2022-06-02 09:23:58 +02:00
parent b0f77831bd
commit 00f795f16d
12 changed files with 96 additions and 15 deletions

208
konfluks/cloudcal.py Normal file
View File

@ -0,0 +1,208 @@
import os
import re
import shutil
from pathlib import Path
from urllib.parse import urlparse
from slugify import slugify
import arrow
import jinja2
import requests
from ics import Calendar
from natural import date
from slugify import slugify
# a publicly accessible ICS calendar
calendar_url = os.environ.get("CALENDAR_URL")
# your Hugo content directory
output_dir = os.environ.get("OUTPUT_DIR")
cal = Calendar(requests.get(calendar_url).text)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
if not os.path.exists(output_dir):
os.mkdir(output_dir)
template = env.get_template("calendar.md")
existing_posts = os.listdir(output_dir)
def findURLs(string):
"""
return all URLs in a given string
"""
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
url = re.findall(regex, string)
return [x[0] for x in url]
def find_imageURLS(string):
"""
return all image URLS in a given string
"""
regex = r"(?:http\:|https\:)?\/\/.*?\.(?:png|jpg|jpeg|gif|svg)"
img_urls = re.findall(regex, string, flags=re.IGNORECASE)
return img_urls
def create_metadata(event):
"""
construct a formatted dict of event metadata for use as frontmatter for HUGO post
"""
if event.location:
location_urls = findURLs(event.location)
if location_urls:
location_url = location_urls[0]
event.location = "[{}]({})".format(
urlparse(location_url).netloc, location_url
)
event_metadata = {
"name": event.name,
"created": event.created.format(),
"description": event.description,
"localized_begin": "           ".join(
localize_time(event.begin)
), # non-breaking space characters to defeat markdown
"begin": event.begin.format(),
"end": event.end.format(),
"duration": date.compress(event.duration),
"location": event.location,
"uid": event.uid,
"featured_image": "",
"images": find_imageURLS(event.description), # currently not used in template
}
return event_metadata
def localize_time(date):
"""
Turn a given date into various timezones
Takes arrow objects
"""
# 3 PM Kassel, Germany, 4 PM Ramallah/Jerusalem, Palestina (QoF),
# 8 AM Bogota, Colombia (MaMa), 8 PM Jakarta, Indonesia (Gudskul),
# 1 PM (+1day) Wellington, New Zealand (Fafswag), 9 AM Havana, Cuba (Instar).
tzs = [
("Kassel", "Europe/Berlin"),
("Bamako", "Europe/London"),
("Palestine", "Asia/Jerusalem"),
("Bogota", "America/Bogota"),
("Jakarta", "Asia/Jakarta"),
("Makassar", "Asia/Makassar"),
("Wellington", "Pacific/Auckland"),
]
localized_begins = []
for location, tz in tzs:
localized_begins.append( # javascript formatting because of string creation from hell
"__{}__ {}".format(
str(location), str(date.to(tz).format("YYYY-MM-DD __HH:mm__"))
)
)
return localized_begins
def create_event_post(post_dir, event):
"""
Create HUGO post based on calendar event metadata
Searches for image URLS in description and downloads them
Function is also called when post is in need of updating
In that case it will also delete images no longer in metadata
TODO: split this up into more functions for legibility
"""
if not os.path.exists(post_dir):
os.mkdir(post_dir)
event_metadata = create_metadata(event)
# list already existing images
# so we can later delete them if we dont find them in the event metadata anymore
existing_images = os.listdir(post_dir)
try:
existing_images.remove("index.md")
existing_images.remove(".timestamp")
except:
pass
for img in event_metadata["images"]:
# parse img url to safe local image name
img_name = img.split("/")[-1]
fn, ext = img_name.split(".")
img_name = slugify(fn) + "." + ext
local_image = os.path.join(post_dir, img_name)
if not os.path.exists(local_image):
# download preview image
response = requests.get(img, stream=True)
if response.status_code == 200:
with open(local_image, "wb") as img_file:
shutil.copyfileobj(response.raw, img_file)
print('Downloaded image for event "{}"'.format(event.name))
event_metadata["description"] = event_metadata["description"].replace(
img, "![]({})".format(img_name)
)
if event_metadata["featured_image"] == "":
event_metadata["featured_image"] = img_name
if img_name in existing_images:
existing_images.remove(img_name)
for left_over_image in existing_images:
# remove images we found, but which are no longer in remote event
os.remove(os.path.join(post_dir, left_over_image))
print("deleted image", left_over_image)
with open(os.path.join(post_dir, "index.md"), "w") as f:
post = template.render(event=event_metadata)
f.write(post)
print("created post for", event.name, "({})".format(event.uid))
with open(os.path.join(post_dir, ".timestamp"), "w") as f:
f.write(event_metadata["created"])
def update_event_post(post_dir, event):
"""
Update a post based on the VCARD event 'created' field which changes when updated
"""
if os.path.exists(post_dir):
old_timestamp = open(os.path.join(post_dir, ".timestamp")).read()
if event.created > arrow.get(old_timestamp):
print("Updating", event.name, "({})".format(event.uid))
create_event_post(post_dir, event)
else:
print("Event current: ", event.name, "({})".format(event.uid))
def main():
for event in list(cal.events):
post_name = slugify(event.name) + "-" + event.uid
post_dir = os.path.join(output_dir, post_name)
if post_name not in existing_posts:
# if there is an event we dont already have, make it
create_event_post(post_dir, event)
elif post_name in existing_posts:
# if we already have it, update
update_event_post(post_dir, event)
existing_posts.remove(
post_name
) # create list of posts which have not been returned by the calendar
for post in existing_posts:
# remove events not returned by the calendar (deletion)
print("deleted", post)
shutil.rmtree(os.path.join(output_dir, post))

435
konfluks/feed.py Normal file
View File

@ -0,0 +1,435 @@
import os
import shutil
import time
from hashlib import md5
from ast import literal_eval as make_tuple
from pathlib import Path
from urllib.parse import urlparse
from re import sub
import arrow
import feedparser
import jinja2
import requests
from bs4 import BeautifulSoup
from slugify import slugify
from re import compile as re_compile
yamlre = re_compile('"')
def write_etag(feed_name, feed_data):
"""
save timestamp of when feed was last modified
"""
etag = ""
modified = ""
if "etag" in feed_data:
etag = feed_data.etag
if "modified" in feed_data:
modified = feed_data.modified
if etag or modified:
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
f.write(str((etag, modified)))
def get_etag(feed_name):
"""
return timestamp of when feed was last modified
"""
fn = os.path.join("etags", feed_name + ".txt")
etag = ""
modified = ""
if os.path.exists(fn):
etag, modified = make_tuple(open(fn, "r").read())
return etag, modified
def create_frontmatter(entry):
"""
parse RSS metadata and return as frontmatter
"""
if 'published' in entry:
published = entry.published_parsed
if 'updated' in entry:
published = entry.updated_parsed
published = arrow.get(published)
if 'author' in entry:
author = entry.author
else:
author = ''
if 'authors' in entry:
authors = []
for a in entry.authors:
authors.append(a['name'])
if 'summary' in entry:
summary = entry.summary
else:
summary = ''
if 'publisher' in entry:
publisher = entry.publisher
else:
publisher = ''
tags = []
if 'tags' in entry:
#TODO finish categories
for t in entry.tags:
tags.append(t['term'])
if "featured_image" in entry:
featured_image = entry.featured_image
else:
featured_image = ''
card_type = "network"
if entry.feed_name == "pen.lumbung.space":
card_type = "pen"
if "opds" in entry:
frontmatter = {
'title':entry.title,
'date': published.format(),
'summary': summary,
'author': ",".join(authors),
'publisher': publisher,
'original_link': entry.links[0]['href'].replace('opds/cover/','books/'),
'feed_name': entry['feed_name'],
'tags': str(tags),
'category': "books"
}
else:
frontmatter = {
'title':entry.title,
'date': published.format(),
'summary': '',
'author': author,
'original_link': entry.link,
'feed_name': entry['feed_name'],
'tags': str(tags),
'card_type': card_type,
'featured_image': featured_image
}
return frontmatter
def sanitize_yaml (frontmatter):
"""
Escapes any occurences of double quotes
in any of the frontmatter fields
See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types
"""
for k, v in frontmatter.items():
if type(v) == type([]):
#some fields are lists
l = []
for i in v:
i = yamlre.sub('\\"', i)
l.append(i)
frontmatter[k] = l
else:
v = yamlre.sub('\\"', v)
frontmatter[k] = v
return frontmatter
def parse_enclosures(post_dir, entry):
"""
Parses feed enclosures which are featured media
Can be featured image but also podcast entries
https://pythonhosted.org/feedparser/reference-entry-enclosures.html
"""
#TODO parse more than images
#TODO handle the fact it could be multiple items
for e in entry.enclosures:
if "type" in e:
print("found enclosed media", e.type)
if "image/" in e.type:
featured_image = grab_media(post_dir, e.href)
entry["featured_image"] = featured_image
else:
print("FIXME:ignoring enclosed", e.type)
return entry
def create_post(post_dir, entry):
"""
write hugo post based on RSS entry
"""
if "enclosures" in entry:
entry = parse_enclosures(post_dir, entry)
frontmatter = create_frontmatter(entry)
if not os.path.exists(post_dir):
os.makedirs(post_dir)
if "content" in entry:
post_content = entry.content[0].value
else:
post_content = entry.summary
parsed_content = parse_posts(post_dir, post_content)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template("feed.md")
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content)
f.write(post)
print("created post for", entry.title, "({})".format(entry.link))
def grab_media(post_directory, url, prefered_name=None):
"""
download media linked in post to have local copy
if download succeeds return new local path otherwise return url
"""
media_item = urlparse(url).path.split('/')[-1]
headers = {
'User-Agent': 'https://git.autonomic.zone/ruangrupa/lumbunglib',
'From': 'info@lumbung.space' # This is another valid field
}
if prefered_name:
media_item = prefered_name
try:
if not os.path.exists(os.path.join(post_directory, media_item)):
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
response = requests.get(url, headers=headers, stream=True)
if response.ok:
with open(os.path.join(post_directory, media_item), 'wb') as media_file:
shutil.copyfileobj(response.raw, media_file)
print('Downloaded media item', media_item)
return media_item
else:
print("Download failed", response.status_code)
return url
return media_item
elif os.path.exists(os.path.join(post_directory, media_item)):
return media_item
except Exception as e:
print('Failed to download image', url)
print(e)
return url
def parse_posts(post_dir, post_content):
"""
parse the post content to for media items
replace foreign image with local copy
filter out iframe sources not in allowlist
"""
soup = BeautifulSoup(post_content, "html.parser")
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
for img in soup(["img", "object"]):
if img.get("src") != None:
local_image = grab_media(post_dir, img["src"])
if img["src"] != local_image:
img["src"] = local_image
for iframe in soup(["iframe"]):
if not any(source in iframe["src"] for source in allowed_iframe_sources):
print("filtered iframe: {}...".format(iframe["src"][:25]))
iframe.decompose()
return soup.decode()
def grab_feed(feed_url):
"""
check whether feed has been updated
download & return it if it has
"""
feed_name = urlparse(feed_url).netloc
etag, modified = get_etag(feed_name)
try:
if modified:
data = feedparser.parse(feed_url, modified=modified)
elif etag:
data = feedparser.parse(feed_url, etag=etag)
else:
data = feedparser.parse(feed_url)
except Exception as e:
print("Error grabbing feed")
print(feed_name)
print(e)
return False
if "status" in data:
print(data.status, feed_url)
if data.status == 200:
# 304 means the feed has not been modified since we last checked
write_etag(feed_name, data)
return data
return False
def create_opds_post(post_dir, entry):
"""
create a HUGO post based on OPDS entry
or update it if the timestamp is newer
Downloads the cover & file
"""
frontmatter = create_frontmatter(entry)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template("feed.md")
if not os.path.exists(post_dir):
os.makedirs(post_dir)
if os.path.exists(os.path.join(post_dir, '.timestamp')):
old_timestamp = open(os.path.join(post_dir, '.timestamp')).read()
old_timestamp = arrow.get(float(old_timestamp))
current_timestamp = arrow.get(entry['updated_parsed'])
if current_timestamp > old_timestamp:
pass
else:
print('Book "{}..." already up to date'.format(entry['title'][:32]))
return
for item in entry.links:
ft = item['type'].split('/')[-1]
fn = item['rel'].split('/')[-1]
if fn == "acquisition":
fn = "publication" #calling the publications acquisition is weird
prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft)
grab_media(post_dir, item['href'], prefered_name)
if "summary" in entry:
summary = entry.summary
else:
summary = ""
with open(os.path.join(post_dir,'index.md'),'w') as f:
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary)
f.write(post)
print('created post for Book', entry.title)
with open(os.path.join(post_dir, '.timestamp'), 'w') as f:
timestamp = arrow.get(entry['updated_parsed'])
f.write(timestamp.format('X'))
def main():
feed_urls = open("feeds_list.txt", "r").read().splitlines()
start = time.time()
if not os.path.exists("etags"):
os.mkdir("etags")
output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
feed_dict = dict()
for url in feed_urls:
feed_name = urlparse(url).netloc
feed_dict[url] = feed_name
feed_names = feed_dict.values()
content_dirs = os.listdir(output_dir)
for i in content_dirs:
if i not in feed_names:
shutil.rmtree(os.path.join(output_dir, i))
print("%s not in feeds_list.txt, removing local data" %(i))
# add iframe to the allowlist of feedparser's sanitizer,
# this is now handled in parse_post()
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
for feed_url in feed_urls:
feed_name = feed_dict[feed_url]
feed_dir = os.path.join(output_dir, feed_name)
if not os.path.exists(feed_dir):
os.makedirs(feed_dir)
existing_posts = os.listdir(feed_dir)
data = grab_feed(feed_url)
if data:
opds_feed = False
for i in data.feed['links']:
if i['rel'] == 'self':
if 'opds' in i['type']:
opds_feed = True
print("OPDS type feed!")
for entry in data.entries:
# if 'tags' in entry:
# for tag in entry.tags:
# for x in ['lumbung.space', 'D15', 'lumbung']:
# if x in tag['term']:
# print(entry.title)
entry["feed_name"] = feed_name
post_name = slugify(entry.title)
# pixelfed returns the whole post text as the post name. max
# filename length is 255 on many systems. here we're shortening
# the name and adding a hash to it to avoid a conflict in a
# situation where 2 posts start with exactly the same text.
if len(post_name) > 150:
post_hash = md5(bytes(post_name, "utf-8"))
post_name = post_name[:150] + "-" + post_hash.hexdigest()
if opds_feed:
entry['opds'] = True
#format: Beyond-Debiasing-Report_Online-75535a4886e3
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1]
post_dir = os.path.join(output_dir, feed_name, post_name)
if post_name not in existing_posts:
# if there is a blog entry we dont already have, make it
if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
elif post_name in existing_posts:
# if we already have it, update it
if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
existing_posts.remove(
post_name
) # create list of posts which have not been returned by the feed
for post in existing_posts:
# remove blog posts no longer returned by the RSS feed
print("deleted", post)
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
end = time.time()
print(end - start)

163
konfluks/hashtag.py Normal file
View File

@ -0,0 +1,163 @@
import os
import shutil
from pathlib import Path
from re import sub
import jinja2
import requests
from mastodon import Mastodon
instance = "https://social.lumbung.space"
email = ""
password = ""
hashtags = [
"documentafifteen",
"harvestedbyputra",
"jalansesama",
"lumbungdotspace",
"majelisakakbar",
"majelisakbar",
"warungkopi",
"lumbungkios",
"kassel_ecosystem",
"ruruhaus",
"offbeatentrack_kassel",
"lumbungofpublishers",
]
def login_mastodon_bot():
mastodon = Mastodon(
access_token=os.environ.get("MASTODON_AUTH_TOKEN"), api_base_url=instance
)
return mastodon
def create_frontmatter(post_metadata):
"""
Parse post metadata and return it as HUGO frontmatter
"""
frontmatter = ""
return frontmatter
def download_media(post_directory, media_attachments):
"""
Download media attached to posts. N.b. currently only images
See: https://mastodonpy.readthedocs.io/en/stable/#media-dicts
"""
for item in media_attachments:
if item["type"] == "image":
image = localize_media_url(item["url"])
# TODO check whether this needs to handle delete & redraft with different images
if not os.path.exists(os.path.join(post_directory, image)):
# download image
response = requests.get(item["url"], stream=True)
with open(os.path.join(post_directory, image), "wb") as img_file:
shutil.copyfileobj(response.raw, img_file)
print("Downloaded cover image", image)
def create_post(post_directory, post_metadata):
"""
Create Hugo posts based on Toots/posts retuned in timeline.
See: https://mastodonpy.readthedocs.io/en/stable/#toot-dicts
"""
if not os.path.exists(post_directory):
os.mkdir(post_directory)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
name = post_metadata["account"]["display_name"]
name = sub('"', '\\"', name)
post_metadata["account"]["display_name"] = name
env.filters["localize_media_url"] = localize_media_url
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
template = env.get_template("hashtag.md")
with open(os.path.join(post_directory, "index.html"), "w") as f:
post = template.render(post_metadata=post_metadata)
f.write(post)
download_media(post_directory, post_metadata["media_attachments"])
def localize_media_url(url):
"""
Returns the filename, used also as custom jinja filter
"""
return url.split("/")[-1]
def filter_mastodon_urls(content):
"""
Filters out Mastodon generated URLS for tags
e.g. <a href="https://social.lumbung.space/tags/jalankita" class="mention hashtag" rel="tag">
Used also as custom jinja filter
"""
# TODO
return content
def main():
mastodon = login_mastodon_bot()
output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
all_existing_posts = []
for i in os.listdir(output_dir):
all_existing_posts += os.listdir(os.path.join(output_dir, i))
for hashtag in hashtags:
hashtag_dir = os.path.join(output_dir, hashtag)
if not os.path.exists(hashtag_dir):
os.mkdir(hashtag_dir)
existing_posts = os.listdir(hashtag_dir) # list all existing posts
timeline = mastodon.timeline_hashtag(
hashtag, local=True, only_media=True
) # returns max 20 queries and only with media
timeline = mastodon.fetch_remaining(
timeline
) # returns all the rest n.b. can take a while because of rate limit
for post_metadata in timeline:
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
# if there is a post in the feed we dont already have locally, make it
if str(post_metadata["id"]) not in all_existing_posts:
if not post_metadata[
"local_only"
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
create_post(post_dir, post_metadata)
all_existing_posts.append(str(post_metadata["id"]))
else:
print(
"not pulling post %s (post is local only)"
% (post_metadata["id"])
)
# if we already have the post do nothing, possibly update
elif str(post_metadata["id"]) in existing_posts:
# update_post(post_dir, post_metadata)
existing_posts.remove(
str(post_metadata["id"])
) # create list of posts which have not been returned in the feed
elif str(post_metadata["id"]) in all_existing_posts:
print(
"skipping post %s as it was already pulled with a different hashtag."
% (str(post_metadata["id"]))
)
for post in existing_posts:
print(
"deleted", post
) # rm posts that exist but are no longer returned in feed
shutil.rmtree(os.path.join(hashtag_dir, post))

View File

@ -0,0 +1,23 @@
---
title: "{{ event.name }}"
date: "{{ event.begin }}" #2021-06-10T10:46:33+02:00
draft: false
categories: "calendar"
event_begin: "{{ event.begin }}"
event_end: "{{ event.end }}"
duration: "{{ event.duration }}"
localized_begin: "{{ event.localized_begin }}"
uid: "{{ event.uid }}"
{% if event.featured_image %}
featured_image: "{{ event.featured_image }}"
{% endif %}
{% if event.location %}
location: "{{ event.location }}"
{% endif %}
---
{% if event.description %}
{{ event.description }}
{% endif %}

View File

@ -0,0 +1,15 @@
---
title: "{{ frontmatter.title }}"
date: "{{ frontmatter.date }}" #2021-06-10T10:46:33+02:00
draft: false
summary: "{{ frontmatter.summary }}"
authors: {% if frontmatter.author %} ["{{ frontmatter.author }}"] {% endif %}
original_link: "{{ frontmatter.original_link }}"
feed_name: "{{ frontmatter.feed_name}}"
categories: ["{{ frontmatter.card_type }}", "{{ frontmatter.feed_name}}"]
contributors: ["{{ frontmatter.feed_name}}"]
tags: {{ frontmatter.tags }}
{% if frontmatter.featured_image %}featured_image: "{{frontmatter.featured_image}}"{% endif %}
---
{{ content }}

View File

@ -0,0 +1,17 @@
---
date: {{ post_metadata.created_at }} #2021-06-10T10:46:33+02:00
draft: false
authors: ["{{ post_metadata.account.display_name }}"]
contributors: ["{{ post_metadata.account.acct}}"]
avatar: {{ post_metadata.account.avatar }}
categories: ["shouts"]
images: [{% for i in post_metadata.media_attachments %} {{ i.url }}, {% endfor %}]
title: {{ post_metadata.account.display_name }}
tags: [{% for i in post_metadata.tags %} "{{ i.name }}", {% endfor %}]
---
{% for item in post_metadata.media_attachments %}
<img src="{{item.url | localize_media_url }}" alt="{{item.description}}">
{% endfor %}
{{ post_metadata.content | filter_mastodon_urls }}

View File

@ -0,0 +1,16 @@
---
title: "{{ v.name }}"
date: "{{ v.published_at }}" #2021-06-10T10:46:33+02:00
draft: false
uuid: "{{v.uuid}}"
video_duration: "{{ v.duration | duration }} "
video_channel: "{{ v.channel.display_name }}"
channel_url: "{{ v.channel.url }}"
contributors: ["{{ v.account.display_name }}"]
preview_image: "{{ preview_image }}"
images: ["./{{ preview_image }}"]
categories: ["tv","{{ v.channel.display_name }}"]
is_live: {{ v.is_live }}
---
{{ v.description }}

161
konfluks/video.py Normal file
View File

@ -0,0 +1,161 @@
import ast
import datetime
import json
import os
import shutil
from pathlib import Path
from slugify import slugify
import arrow
import jinja2
import peertube
import requests
host = "https://tv.lumbung.space"
configuration = peertube.Configuration(host=host + "/api/v1")
client = peertube.ApiClient(configuration)
# jinja filters & config
def duration(n):
"""
convert '6655' in '1:50:55'
"""
return str(datetime.timedelta(seconds=n))
def linebreaks(text):
if not text:
return text
else:
import re
br = re.compile(r"(\r\n|\r|\n)")
return br.sub(r"<br />\n", text)
def create_post(post_directory, video_metadata, host):
global client # lazy
if not os.path.exists(post_directory):
os.mkdir(post_directory)
preview_image = video_metadata["preview_path"].split("/")[-1]
if not os.path.exists(os.path.join(post_directory, preview_image)):
# download preview image
response = requests.get(host + video_metadata["preview_path"], stream=True)
with open(os.path.join(post_directory, preview_image), "wb") as img_file:
shutil.copyfileobj(response.raw, img_file)
print("Downloaded cover image")
# replace the truncated description with the full video description
# peertube api is some broken thing in between a py dict and a json file
api_response = peertube.VideoApi(client).videos_id_description_get(
video_metadata["uuid"]
)
long_description = ast.literal_eval(api_response)
video_metadata["description"] = long_description["description"]
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
env.filters["duration"] = duration
env.filters["linebreaks"] = linebreaks
template = env.get_template("video.md")
with open(os.path.join(post_directory, "index.md"), "w") as f:
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
f.write(post)
with open(os.path.join(post_directory, ".timestamp"), "w") as f:
timestamp = arrow.get(video_metadata["updated_at"])
f.write(timestamp.format("X"))
def update_post(post_directory, video_metadata, host):
if os.path.exists(post_directory):
if os.path.exists(os.path.join(post_directory, ".timestamp")):
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
# FIXME: this is ugly but I need to do this because arrow removes miliseconds
current_timestamp = arrow.get(video_metadata["updated_at"])
current_timestamp = arrow.get(current_timestamp.format("X"))
if current_timestamp > arrow.get(old_timestamp):
print(
"Updating",
video_metadata["name"],
"({})".format(video_metadata["uuid"]),
)
create_post(post_directory, video_metadata, host)
else:
print(
"Video current: ",
video_metadata["name"],
"({})".format(video_metadata["uuid"]),
)
else:
# compat for when there is no timestamp yet..
create_post(post_directory, video_metadata, host)
def main():
v = peertube.VideoApi(client)
count = 100
page = 0
try:
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
videos = response.to_dict()
total = videos['total']
videos = videos['data']
total -= count
if total > 0:
to_download = total // count
last_page = total % count
for i in range(to_download):
page += 1
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
videos += response.to_dict()['data']
if last_page > 0:
page += 1
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
videos += response.to_dict()['data'][-1*last_page:]
output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
existing_posts = os.listdir(output_dir)
for video_metadata in videos:
post_name = slugify(video_metadata["name"]) + "-" + video_metadata["uuid"]
post_dir = os.path.join(output_dir, post_name)
if (
post_name not in existing_posts
): # if there is a video we dont already have, make it
print(
"New: ", video_metadata["name"], "({})".format(video_metadata["uuid"])
)
create_post(post_dir, video_metadata, host)
elif (
post_name in existing_posts
): # if we already have the video do nothing, possibly update
update_post(post_dir, video_metadata, host)
existing_posts.remove(
post_name
) # create list of posts which have not been returned by peertube
except:
print("didn't get a response from peertube, instance might have been taken down or made private. removing all posts.")
output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
existing_posts = os.listdir(output_dir)
for post in existing_posts:
print("deleted", post) # rm posts not returned
shutil.rmtree(os.path.join(output_dir, post))