forked from ruangrupa/konfluks
Merge remote-tracking branch 'konfluks/konfluks-renaming'
This commit is contained in:
208
konfluks/cloudcal.py
Normal file
208
konfluks/cloudcal.py
Normal file
@ -0,0 +1,208 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
from slugify import slugify
|
||||
|
||||
import arrow
|
||||
import jinja2
|
||||
import requests
|
||||
from ics import Calendar
|
||||
from natural import date
|
||||
from slugify import slugify
|
||||
|
||||
# a publicly accessible ICS calendar
|
||||
calendar_url = os.environ.get("CALENDAR_URL")
|
||||
|
||||
# your Hugo content directory
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
|
||||
cal = Calendar(requests.get(calendar_url).text)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
template = env.get_template("calendar.md")
|
||||
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
|
||||
def findURLs(string):
|
||||
"""
|
||||
return all URLs in a given string
|
||||
"""
|
||||
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
|
||||
url = re.findall(regex, string)
|
||||
return [x[0] for x in url]
|
||||
|
||||
|
||||
def find_imageURLS(string):
|
||||
"""
|
||||
return all image URLS in a given string
|
||||
"""
|
||||
regex = r"(?:http\:|https\:)?\/\/.*?\.(?:png|jpg|jpeg|gif|svg)"
|
||||
|
||||
img_urls = re.findall(regex, string, flags=re.IGNORECASE)
|
||||
return img_urls
|
||||
|
||||
|
||||
def create_metadata(event):
|
||||
"""
|
||||
construct a formatted dict of event metadata for use as frontmatter for HUGO post
|
||||
"""
|
||||
|
||||
if event.location:
|
||||
location_urls = findURLs(event.location)
|
||||
|
||||
if location_urls:
|
||||
location_url = location_urls[0]
|
||||
event.location = "[{}]({})".format(
|
||||
urlparse(location_url).netloc, location_url
|
||||
)
|
||||
|
||||
event_metadata = {
|
||||
"name": event.name,
|
||||
"created": event.created.format(),
|
||||
"description": event.description,
|
||||
"localized_begin": " ".join(
|
||||
localize_time(event.begin)
|
||||
), # non-breaking space characters to defeat markdown
|
||||
"begin": event.begin.format(),
|
||||
"end": event.end.format(),
|
||||
"duration": date.compress(event.duration),
|
||||
"location": event.location,
|
||||
"uid": event.uid,
|
||||
"featured_image": "",
|
||||
"images": find_imageURLS(event.description), # currently not used in template
|
||||
}
|
||||
|
||||
return event_metadata
|
||||
|
||||
|
||||
def localize_time(date):
|
||||
"""
|
||||
Turn a given date into various timezones
|
||||
Takes arrow objects
|
||||
"""
|
||||
|
||||
# 3 PM Kassel, Germany, 4 PM Ramallah/Jerusalem, Palestina (QoF),
|
||||
# 8 AM Bogota, Colombia (MaMa), 8 PM Jakarta, Indonesia (Gudskul),
|
||||
# 1 PM (+1day) Wellington, New Zealand (Fafswag), 9 AM Havana, Cuba (Instar).
|
||||
|
||||
tzs = [
|
||||
("Kassel", "Europe/Berlin"),
|
||||
("Bamako", "Europe/London"),
|
||||
("Palestine", "Asia/Jerusalem"),
|
||||
("Bogota", "America/Bogota"),
|
||||
("Jakarta", "Asia/Jakarta"),
|
||||
("Makassar", "Asia/Makassar"),
|
||||
("Wellington", "Pacific/Auckland"),
|
||||
]
|
||||
|
||||
localized_begins = []
|
||||
for location, tz in tzs:
|
||||
localized_begins.append( # javascript formatting because of string creation from hell
|
||||
"__{}__ {}".format(
|
||||
str(location), str(date.to(tz).format("YYYY-MM-DD __HH:mm__"))
|
||||
)
|
||||
)
|
||||
return localized_begins
|
||||
|
||||
def create_event_post(post_dir, event):
|
||||
"""
|
||||
Create HUGO post based on calendar event metadata
|
||||
Searches for image URLS in description and downloads them
|
||||
Function is also called when post is in need of updating
|
||||
In that case it will also delete images no longer in metadata
|
||||
TODO: split this up into more functions for legibility
|
||||
"""
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.mkdir(post_dir)
|
||||
|
||||
event_metadata = create_metadata(event)
|
||||
|
||||
# list already existing images
|
||||
# so we can later delete them if we dont find them in the event metadata anymore
|
||||
existing_images = os.listdir(post_dir)
|
||||
try:
|
||||
existing_images.remove("index.md")
|
||||
existing_images.remove(".timestamp")
|
||||
except:
|
||||
pass
|
||||
|
||||
for img in event_metadata["images"]:
|
||||
|
||||
# parse img url to safe local image name
|
||||
img_name = img.split("/")[-1]
|
||||
fn, ext = img_name.split(".")
|
||||
img_name = slugify(fn) + "." + ext
|
||||
|
||||
local_image = os.path.join(post_dir, img_name)
|
||||
|
||||
if not os.path.exists(local_image):
|
||||
# download preview image
|
||||
response = requests.get(img, stream=True)
|
||||
if response.status_code == 200:
|
||||
with open(local_image, "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print('Downloaded image for event "{}"'.format(event.name))
|
||||
event_metadata["description"] = event_metadata["description"].replace(
|
||||
img, "".format(img_name)
|
||||
)
|
||||
if event_metadata["featured_image"] == "":
|
||||
event_metadata["featured_image"] = img_name
|
||||
if img_name in existing_images:
|
||||
existing_images.remove(img_name)
|
||||
|
||||
for left_over_image in existing_images:
|
||||
# remove images we found, but which are no longer in remote event
|
||||
os.remove(os.path.join(post_dir, left_over_image))
|
||||
print("deleted image", left_over_image)
|
||||
|
||||
with open(os.path.join(post_dir, "index.md"), "w") as f:
|
||||
post = template.render(event=event_metadata)
|
||||
f.write(post)
|
||||
print("created post for", event.name, "({})".format(event.uid))
|
||||
|
||||
with open(os.path.join(post_dir, ".timestamp"), "w") as f:
|
||||
f.write(event_metadata["created"])
|
||||
|
||||
|
||||
def update_event_post(post_dir, event):
|
||||
"""
|
||||
Update a post based on the VCARD event 'created' field which changes when updated
|
||||
"""
|
||||
if os.path.exists(post_dir):
|
||||
old_timestamp = open(os.path.join(post_dir, ".timestamp")).read()
|
||||
if event.created > arrow.get(old_timestamp):
|
||||
print("Updating", event.name, "({})".format(event.uid))
|
||||
create_event_post(post_dir, event)
|
||||
else:
|
||||
print("Event current: ", event.name, "({})".format(event.uid))
|
||||
|
||||
|
||||
def main():
|
||||
for event in list(cal.events):
|
||||
post_name = slugify(event.name) + "-" + event.uid
|
||||
post_dir = os.path.join(output_dir, post_name)
|
||||
|
||||
if post_name not in existing_posts:
|
||||
# if there is an event we dont already have, make it
|
||||
create_event_post(post_dir, event)
|
||||
|
||||
elif post_name in existing_posts:
|
||||
# if we already have it, update
|
||||
update_event_post(post_dir, event)
|
||||
existing_posts.remove(
|
||||
post_name
|
||||
) # create list of posts which have not been returned by the calendar
|
||||
|
||||
for post in existing_posts:
|
||||
# remove events not returned by the calendar (deletion)
|
||||
print("deleted", post)
|
||||
shutil.rmtree(os.path.join(output_dir, post))
|
435
konfluks/feed.py
Normal file
435
konfluks/feed.py
Normal file
@ -0,0 +1,435 @@
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from hashlib import md5
|
||||
from ast import literal_eval as make_tuple
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
from re import sub
|
||||
|
||||
import arrow
|
||||
import feedparser
|
||||
import jinja2
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from slugify import slugify
|
||||
from re import compile as re_compile
|
||||
yamlre = re_compile('"')
|
||||
|
||||
|
||||
def write_etag(feed_name, feed_data):
|
||||
"""
|
||||
save timestamp of when feed was last modified
|
||||
"""
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if "etag" in feed_data:
|
||||
etag = feed_data.etag
|
||||
if "modified" in feed_data:
|
||||
modified = feed_data.modified
|
||||
|
||||
if etag or modified:
|
||||
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
|
||||
f.write(str((etag, modified)))
|
||||
|
||||
|
||||
def get_etag(feed_name):
|
||||
"""
|
||||
return timestamp of when feed was last modified
|
||||
"""
|
||||
fn = os.path.join("etags", feed_name + ".txt")
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if os.path.exists(fn):
|
||||
etag, modified = make_tuple(open(fn, "r").read())
|
||||
|
||||
return etag, modified
|
||||
|
||||
|
||||
def create_frontmatter(entry):
|
||||
"""
|
||||
parse RSS metadata and return as frontmatter
|
||||
"""
|
||||
if 'published' in entry:
|
||||
published = entry.published_parsed
|
||||
if 'updated' in entry:
|
||||
published = entry.updated_parsed
|
||||
|
||||
published = arrow.get(published)
|
||||
|
||||
if 'author' in entry:
|
||||
author = entry.author
|
||||
else:
|
||||
author = ''
|
||||
|
||||
if 'authors' in entry:
|
||||
authors = []
|
||||
for a in entry.authors:
|
||||
authors.append(a['name'])
|
||||
|
||||
if 'summary' in entry:
|
||||
summary = entry.summary
|
||||
else:
|
||||
summary = ''
|
||||
|
||||
if 'publisher' in entry:
|
||||
publisher = entry.publisher
|
||||
else:
|
||||
publisher = ''
|
||||
|
||||
tags = []
|
||||
if 'tags' in entry:
|
||||
#TODO finish categories
|
||||
for t in entry.tags:
|
||||
tags.append(t['term'])
|
||||
|
||||
if "featured_image" in entry:
|
||||
featured_image = entry.featured_image
|
||||
else:
|
||||
featured_image = ''
|
||||
|
||||
card_type = "network"
|
||||
if entry.feed_name == "pen.lumbung.space":
|
||||
card_type = "pen"
|
||||
|
||||
if "opds" in entry:
|
||||
frontmatter = {
|
||||
'title':entry.title,
|
||||
'date': published.format(),
|
||||
'summary': summary,
|
||||
'author': ",".join(authors),
|
||||
'publisher': publisher,
|
||||
'original_link': entry.links[0]['href'].replace('opds/cover/','books/'),
|
||||
'feed_name': entry['feed_name'],
|
||||
'tags': str(tags),
|
||||
'category': "books"
|
||||
}
|
||||
else:
|
||||
frontmatter = {
|
||||
'title':entry.title,
|
||||
'date': published.format(),
|
||||
'summary': '',
|
||||
'author': author,
|
||||
'original_link': entry.link,
|
||||
'feed_name': entry['feed_name'],
|
||||
'tags': str(tags),
|
||||
'card_type': card_type,
|
||||
'featured_image': featured_image
|
||||
}
|
||||
|
||||
return frontmatter
|
||||
|
||||
def sanitize_yaml (frontmatter):
|
||||
"""
|
||||
Escapes any occurences of double quotes
|
||||
in any of the frontmatter fields
|
||||
See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types
|
||||
"""
|
||||
for k, v in frontmatter.items():
|
||||
if type(v) == type([]):
|
||||
#some fields are lists
|
||||
l = []
|
||||
for i in v:
|
||||
i = yamlre.sub('\\"', i)
|
||||
l.append(i)
|
||||
frontmatter[k] = l
|
||||
|
||||
else:
|
||||
v = yamlre.sub('\\"', v)
|
||||
frontmatter[k] = v
|
||||
|
||||
return frontmatter
|
||||
|
||||
def parse_enclosures(post_dir, entry):
|
||||
"""
|
||||
Parses feed enclosures which are featured media
|
||||
Can be featured image but also podcast entries
|
||||
https://pythonhosted.org/feedparser/reference-entry-enclosures.html
|
||||
"""
|
||||
#TODO parse more than images
|
||||
#TODO handle the fact it could be multiple items
|
||||
|
||||
for e in entry.enclosures:
|
||||
if "type" in e:
|
||||
print("found enclosed media", e.type)
|
||||
if "image/" in e.type:
|
||||
featured_image = grab_media(post_dir, e.href)
|
||||
entry["featured_image"] = featured_image
|
||||
else:
|
||||
print("FIXME:ignoring enclosed", e.type)
|
||||
return entry
|
||||
|
||||
|
||||
def create_post(post_dir, entry):
|
||||
"""
|
||||
write hugo post based on RSS entry
|
||||
"""
|
||||
if "enclosures" in entry:
|
||||
entry = parse_enclosures(post_dir, entry)
|
||||
|
||||
frontmatter = create_frontmatter(entry)
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.makedirs(post_dir)
|
||||
|
||||
if "content" in entry:
|
||||
post_content = entry.content[0].value
|
||||
else:
|
||||
post_content = entry.summary
|
||||
|
||||
parsed_content = parse_posts(post_dir, post_content)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
template = env.get_template("feed.md")
|
||||
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
|
||||
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content)
|
||||
f.write(post)
|
||||
print("created post for", entry.title, "({})".format(entry.link))
|
||||
|
||||
|
||||
def grab_media(post_directory, url, prefered_name=None):
|
||||
"""
|
||||
download media linked in post to have local copy
|
||||
if download succeeds return new local path otherwise return url
|
||||
"""
|
||||
media_item = urlparse(url).path.split('/')[-1]
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'https://git.autonomic.zone/ruangrupa/lumbunglib',
|
||||
'From': 'info@lumbung.space' # This is another valid field
|
||||
}
|
||||
if prefered_name:
|
||||
media_item = prefered_name
|
||||
|
||||
try:
|
||||
if not os.path.exists(os.path.join(post_directory, media_item)):
|
||||
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
|
||||
response = requests.get(url, headers=headers, stream=True)
|
||||
if response.ok:
|
||||
with open(os.path.join(post_directory, media_item), 'wb') as media_file:
|
||||
shutil.copyfileobj(response.raw, media_file)
|
||||
print('Downloaded media item', media_item)
|
||||
return media_item
|
||||
else:
|
||||
print("Download failed", response.status_code)
|
||||
return url
|
||||
return media_item
|
||||
elif os.path.exists(os.path.join(post_directory, media_item)):
|
||||
return media_item
|
||||
|
||||
except Exception as e:
|
||||
print('Failed to download image', url)
|
||||
print(e)
|
||||
return url
|
||||
|
||||
|
||||
def parse_posts(post_dir, post_content):
|
||||
"""
|
||||
parse the post content to for media items
|
||||
replace foreign image with local copy
|
||||
filter out iframe sources not in allowlist
|
||||
"""
|
||||
soup = BeautifulSoup(post_content, "html.parser")
|
||||
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
|
||||
|
||||
for img in soup(["img", "object"]):
|
||||
if img.get("src") != None:
|
||||
local_image = grab_media(post_dir, img["src"])
|
||||
if img["src"] != local_image:
|
||||
img["src"] = local_image
|
||||
|
||||
for iframe in soup(["iframe"]):
|
||||
if not any(source in iframe["src"] for source in allowed_iframe_sources):
|
||||
print("filtered iframe: {}...".format(iframe["src"][:25]))
|
||||
iframe.decompose()
|
||||
return soup.decode()
|
||||
|
||||
|
||||
def grab_feed(feed_url):
|
||||
"""
|
||||
check whether feed has been updated
|
||||
download & return it if it has
|
||||
"""
|
||||
feed_name = urlparse(feed_url).netloc
|
||||
|
||||
etag, modified = get_etag(feed_name)
|
||||
|
||||
try:
|
||||
if modified:
|
||||
data = feedparser.parse(feed_url, modified=modified)
|
||||
elif etag:
|
||||
data = feedparser.parse(feed_url, etag=etag)
|
||||
else:
|
||||
data = feedparser.parse(feed_url)
|
||||
except Exception as e:
|
||||
print("Error grabbing feed")
|
||||
print(feed_name)
|
||||
print(e)
|
||||
return False
|
||||
|
||||
if "status" in data:
|
||||
print(data.status, feed_url)
|
||||
if data.status == 200:
|
||||
# 304 means the feed has not been modified since we last checked
|
||||
write_etag(feed_name, data)
|
||||
return data
|
||||
return False
|
||||
|
||||
def create_opds_post(post_dir, entry):
|
||||
"""
|
||||
create a HUGO post based on OPDS entry
|
||||
or update it if the timestamp is newer
|
||||
Downloads the cover & file
|
||||
"""
|
||||
|
||||
frontmatter = create_frontmatter(entry)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
template = env.get_template("feed.md")
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.makedirs(post_dir)
|
||||
|
||||
if os.path.exists(os.path.join(post_dir, '.timestamp')):
|
||||
old_timestamp = open(os.path.join(post_dir, '.timestamp')).read()
|
||||
old_timestamp = arrow.get(float(old_timestamp))
|
||||
current_timestamp = arrow.get(entry['updated_parsed'])
|
||||
|
||||
if current_timestamp > old_timestamp:
|
||||
pass
|
||||
else:
|
||||
print('Book "{}..." already up to date'.format(entry['title'][:32]))
|
||||
return
|
||||
|
||||
for item in entry.links:
|
||||
ft = item['type'].split('/')[-1]
|
||||
fn = item['rel'].split('/')[-1]
|
||||
|
||||
if fn == "acquisition":
|
||||
fn = "publication" #calling the publications acquisition is weird
|
||||
|
||||
prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft)
|
||||
|
||||
grab_media(post_dir, item['href'], prefered_name)
|
||||
|
||||
if "summary" in entry:
|
||||
summary = entry.summary
|
||||
else:
|
||||
summary = ""
|
||||
|
||||
with open(os.path.join(post_dir,'index.md'),'w') as f:
|
||||
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary)
|
||||
f.write(post)
|
||||
print('created post for Book', entry.title)
|
||||
|
||||
with open(os.path.join(post_dir, '.timestamp'), 'w') as f:
|
||||
timestamp = arrow.get(entry['updated_parsed'])
|
||||
f.write(timestamp.format('X'))
|
||||
|
||||
|
||||
def main():
|
||||
feed_urls = open("feeds_list.txt", "r").read().splitlines()
|
||||
|
||||
start = time.time()
|
||||
|
||||
if not os.path.exists("etags"):
|
||||
os.mkdir("etags")
|
||||
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
feed_dict = dict()
|
||||
for url in feed_urls:
|
||||
feed_name = urlparse(url).netloc
|
||||
feed_dict[url] = feed_name
|
||||
|
||||
feed_names = feed_dict.values()
|
||||
content_dirs = os.listdir(output_dir)
|
||||
for i in content_dirs:
|
||||
if i not in feed_names:
|
||||
shutil.rmtree(os.path.join(output_dir, i))
|
||||
print("%s not in feeds_list.txt, removing local data" %(i))
|
||||
|
||||
# add iframe to the allowlist of feedparser's sanitizer,
|
||||
# this is now handled in parse_post()
|
||||
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
|
||||
|
||||
for feed_url in feed_urls:
|
||||
|
||||
feed_name = feed_dict[feed_url]
|
||||
|
||||
feed_dir = os.path.join(output_dir, feed_name)
|
||||
|
||||
if not os.path.exists(feed_dir):
|
||||
os.makedirs(feed_dir)
|
||||
|
||||
existing_posts = os.listdir(feed_dir)
|
||||
|
||||
data = grab_feed(feed_url)
|
||||
|
||||
if data:
|
||||
|
||||
opds_feed = False
|
||||
for i in data.feed['links']:
|
||||
if i['rel'] == 'self':
|
||||
if 'opds' in i['type']:
|
||||
opds_feed = True
|
||||
print("OPDS type feed!")
|
||||
|
||||
|
||||
for entry in data.entries:
|
||||
# if 'tags' in entry:
|
||||
# for tag in entry.tags:
|
||||
# for x in ['lumbung.space', 'D15', 'lumbung']:
|
||||
# if x in tag['term']:
|
||||
# print(entry.title)
|
||||
entry["feed_name"] = feed_name
|
||||
|
||||
post_name = slugify(entry.title)
|
||||
|
||||
# pixelfed returns the whole post text as the post name. max
|
||||
# filename length is 255 on many systems. here we're shortening
|
||||
# the name and adding a hash to it to avoid a conflict in a
|
||||
# situation where 2 posts start with exactly the same text.
|
||||
if len(post_name) > 150:
|
||||
post_hash = md5(bytes(post_name, "utf-8"))
|
||||
post_name = post_name[:150] + "-" + post_hash.hexdigest()
|
||||
|
||||
if opds_feed:
|
||||
entry['opds'] = True
|
||||
#format: Beyond-Debiasing-Report_Online-75535a4886e3
|
||||
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1]
|
||||
|
||||
post_dir = os.path.join(output_dir, feed_name, post_name)
|
||||
|
||||
if post_name not in existing_posts:
|
||||
# if there is a blog entry we dont already have, make it
|
||||
if opds_feed:
|
||||
create_opds_post(post_dir, entry)
|
||||
else:
|
||||
create_post(post_dir, entry)
|
||||
|
||||
elif post_name in existing_posts:
|
||||
# if we already have it, update it
|
||||
if opds_feed:
|
||||
create_opds_post(post_dir, entry)
|
||||
else:
|
||||
create_post(post_dir, entry)
|
||||
existing_posts.remove(
|
||||
post_name
|
||||
) # create list of posts which have not been returned by the feed
|
||||
|
||||
for post in existing_posts:
|
||||
# remove blog posts no longer returned by the RSS feed
|
||||
print("deleted", post)
|
||||
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
|
||||
|
||||
end = time.time()
|
||||
|
||||
print(end - start)
|
164
konfluks/hashtag.py
Normal file
164
konfluks/hashtag.py
Normal file
@ -0,0 +1,164 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from re import sub
|
||||
|
||||
import jinja2
|
||||
import requests
|
||||
from mastodon import Mastodon
|
||||
|
||||
instance = "https://social.lumbung.space"
|
||||
email = ""
|
||||
password = ""
|
||||
hashtags = [
|
||||
"documentafifteen",
|
||||
"harvestedbyputra",
|
||||
"jalansesama",
|
||||
"lumbungdotspace",
|
||||
"majelisakakbar",
|
||||
"majelisakbar",
|
||||
"warungkopi",
|
||||
"lumbungkios",
|
||||
"kassel_ecosystem",
|
||||
"ruruhaus",
|
||||
"offbeatentrack_kassel",
|
||||
"lumbungofpublishers",
|
||||
"lumbungkiosproducts",
|
||||
]
|
||||
|
||||
|
||||
def login_mastodon_bot():
|
||||
mastodon = Mastodon(
|
||||
access_token=os.environ.get("MASTODON_AUTH_TOKEN"), api_base_url=instance
|
||||
)
|
||||
|
||||
return mastodon
|
||||
|
||||
|
||||
def create_frontmatter(post_metadata):
|
||||
"""
|
||||
Parse post metadata and return it as HUGO frontmatter
|
||||
"""
|
||||
|
||||
frontmatter = ""
|
||||
return frontmatter
|
||||
|
||||
|
||||
def download_media(post_directory, media_attachments):
|
||||
"""
|
||||
Download media attached to posts. N.b. currently only images
|
||||
See: https://mastodonpy.readthedocs.io/en/stable/#media-dicts
|
||||
"""
|
||||
|
||||
for item in media_attachments:
|
||||
if item["type"] == "image":
|
||||
image = localize_media_url(item["url"])
|
||||
# TODO check whether this needs to handle delete & redraft with different images
|
||||
if not os.path.exists(os.path.join(post_directory, image)):
|
||||
# download image
|
||||
response = requests.get(item["url"], stream=True)
|
||||
with open(os.path.join(post_directory, image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image", image)
|
||||
|
||||
|
||||
def create_post(post_directory, post_metadata):
|
||||
"""
|
||||
Create Hugo posts based on Toots/posts retuned in timeline.
|
||||
See: https://mastodonpy.readthedocs.io/en/stable/#toot-dicts
|
||||
"""
|
||||
|
||||
if not os.path.exists(post_directory):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
name = post_metadata["account"]["display_name"]
|
||||
name = sub('"', '\\"', name)
|
||||
post_metadata["account"]["display_name"] = name
|
||||
env.filters["localize_media_url"] = localize_media_url
|
||||
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
|
||||
|
||||
template = env.get_template("hashtag.md")
|
||||
|
||||
with open(os.path.join(post_directory, "index.html"), "w") as f:
|
||||
post = template.render(post_metadata=post_metadata)
|
||||
f.write(post)
|
||||
|
||||
download_media(post_directory, post_metadata["media_attachments"])
|
||||
|
||||
|
||||
def localize_media_url(url):
|
||||
"""
|
||||
Returns the filename, used also as custom jinja filter
|
||||
"""
|
||||
return url.split("/")[-1]
|
||||
|
||||
|
||||
def filter_mastodon_urls(content):
|
||||
"""
|
||||
Filters out Mastodon generated URLS for tags
|
||||
e.g. <a href="https://social.lumbung.space/tags/jalankita" class="mention hashtag" rel="tag">
|
||||
Used also as custom jinja filter
|
||||
"""
|
||||
# TODO
|
||||
return content
|
||||
|
||||
|
||||
def main():
|
||||
mastodon = login_mastodon_bot()
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
all_existing_posts = []
|
||||
for i in os.listdir(output_dir):
|
||||
all_existing_posts += os.listdir(os.path.join(output_dir, i))
|
||||
|
||||
for hashtag in hashtags:
|
||||
|
||||
hashtag_dir = os.path.join(output_dir, hashtag)
|
||||
if not os.path.exists(hashtag_dir):
|
||||
os.mkdir(hashtag_dir)
|
||||
|
||||
existing_posts = os.listdir(hashtag_dir) # list all existing posts
|
||||
|
||||
timeline = mastodon.timeline_hashtag(
|
||||
hashtag, local=True, only_media=True
|
||||
) # returns max 20 queries and only with media
|
||||
timeline = mastodon.fetch_remaining(
|
||||
timeline
|
||||
) # returns all the rest n.b. can take a while because of rate limit
|
||||
|
||||
for post_metadata in timeline:
|
||||
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
|
||||
# if there is a post in the feed we dont already have locally, make it
|
||||
if str(post_metadata["id"]) not in all_existing_posts:
|
||||
if not post_metadata[
|
||||
"local_only"
|
||||
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
|
||||
create_post(post_dir, post_metadata)
|
||||
all_existing_posts.append(str(post_metadata["id"]))
|
||||
else:
|
||||
print(
|
||||
"not pulling post %s (post is local only)"
|
||||
% (post_metadata["id"])
|
||||
)
|
||||
|
||||
# if we already have the post do nothing, possibly update
|
||||
elif str(post_metadata["id"]) in existing_posts:
|
||||
# update_post(post_dir, post_metadata)
|
||||
existing_posts.remove(
|
||||
str(post_metadata["id"])
|
||||
) # create list of posts which have not been returned in the feed
|
||||
elif str(post_metadata["id"]) in all_existing_posts:
|
||||
print(
|
||||
"skipping post %s as it was already pulled with a different hashtag."
|
||||
% (str(post_metadata["id"]))
|
||||
)
|
||||
|
||||
for post in existing_posts:
|
||||
print(
|
||||
"deleted", post
|
||||
) # rm posts that exist but are no longer returned in feed
|
||||
shutil.rmtree(os.path.join(hashtag_dir, post))
|
23
konfluks/templates/calendar.md
Normal file
23
konfluks/templates/calendar.md
Normal file
@ -0,0 +1,23 @@
|
||||
---
|
||||
title: "{{ event.name }}"
|
||||
date: "{{ event.begin }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
categories: "calendar"
|
||||
event_begin: "{{ event.begin }}"
|
||||
event_end: "{{ event.end }}"
|
||||
duration: "{{ event.duration }}"
|
||||
localized_begin: "{{ event.localized_begin }}"
|
||||
uid: "{{ event.uid }}"
|
||||
{% if event.featured_image %}
|
||||
featured_image: "{{ event.featured_image }}"
|
||||
{% endif %}
|
||||
{% if event.location %}
|
||||
location: "{{ event.location }}"
|
||||
{% endif %}
|
||||
---
|
||||
|
||||
{% if event.description %}
|
||||
|
||||
{{ event.description }}
|
||||
|
||||
{% endif %}
|
15
konfluks/templates/feed.md
Normal file
15
konfluks/templates/feed.md
Normal file
@ -0,0 +1,15 @@
|
||||
---
|
||||
title: "{{ frontmatter.title }}"
|
||||
date: "{{ frontmatter.date }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
summary: "{{ frontmatter.summary }}"
|
||||
authors: {% if frontmatter.author %} ["{{ frontmatter.author }}"] {% endif %}
|
||||
original_link: "{{ frontmatter.original_link }}"
|
||||
feed_name: "{{ frontmatter.feed_name}}"
|
||||
categories: ["{{ frontmatter.card_type }}", "{{ frontmatter.feed_name}}"]
|
||||
contributors: ["{{ frontmatter.feed_name}}"]
|
||||
tags: {{ frontmatter.tags }}
|
||||
{% if frontmatter.featured_image %}featured_image: "{{frontmatter.featured_image}}"{% endif %}
|
||||
---
|
||||
|
||||
{{ content }}
|
17
konfluks/templates/hashtag.md
Normal file
17
konfluks/templates/hashtag.md
Normal file
@ -0,0 +1,17 @@
|
||||
---
|
||||
date: {{ post_metadata.created_at }} #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
authors: ["{{ post_metadata.account.display_name }}"]
|
||||
contributors: ["{{ post_metadata.account.acct}}"]
|
||||
avatar: {{ post_metadata.account.avatar }}
|
||||
categories: ["shouts"]
|
||||
images: [{% for i in post_metadata.media_attachments %} {{ i.url }}, {% endfor %}]
|
||||
title: {{ post_metadata.account.display_name }}
|
||||
tags: [{% for i in post_metadata.tags %} "{{ i.name }}", {% endfor %}]
|
||||
---
|
||||
|
||||
{% for item in post_metadata.media_attachments %}
|
||||
<img src="{{item.url | localize_media_url }}" alt="{{item.description}}">
|
||||
{% endfor %}
|
||||
|
||||
{{ post_metadata.content | filter_mastodon_urls }}
|
14
konfluks/templates/timeline.md
Normal file
14
konfluks/templates/timeline.md
Normal file
@ -0,0 +1,14 @@
|
||||
---
|
||||
title: "{{ frontmatter.title }}"
|
||||
date: "{{ frontmatter.date }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
summary: "{{ frontmatter.summary }}"
|
||||
authors: {% if frontmatter.author %} ["{{ frontmatter.author }}"] {% endif %}
|
||||
original_link: "{{ frontmatter.original_link }}"
|
||||
feed_name: "{{ frontmatter.feed_name}}"
|
||||
categories: ["timeline", "{{ frontmatter.feed_name}}"]
|
||||
timelines: {{ frontmatter.timelines }}
|
||||
hidden: true
|
||||
---
|
||||
|
||||
{{ content }}
|
16
konfluks/templates/video.md
Normal file
16
konfluks/templates/video.md
Normal file
@ -0,0 +1,16 @@
|
||||
---
|
||||
title: "{{ v.name }}"
|
||||
date: "{{ v.published_at }}" #2021-06-10T10:46:33+02:00
|
||||
draft: false
|
||||
uuid: "{{v.uuid}}"
|
||||
video_duration: "{{ v.duration | duration }} "
|
||||
video_channel: "{{ v.channel.display_name }}"
|
||||
channel_url: "{{ v.channel.url }}"
|
||||
contributors: ["{{ v.account.display_name }}"]
|
||||
preview_image: "{{ preview_image }}"
|
||||
images: ["./{{ preview_image }}"]
|
||||
categories: ["tv","{{ v.channel.display_name }}"]
|
||||
is_live: {{ v.is_live }}
|
||||
---
|
||||
|
||||
{{ v.description }}
|
381
konfluks/timeline.py
Normal file
381
konfluks/timeline.py
Normal file
@ -0,0 +1,381 @@
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from hashlib import md5
|
||||
from ast import literal_eval as make_tuple
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
from re import sub
|
||||
|
||||
import arrow
|
||||
import feedparser
|
||||
import jinja2
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from slugify import slugify
|
||||
from re import compile as re_compile
|
||||
yamlre = re_compile('"')
|
||||
|
||||
|
||||
def write_etag(feed_name, feed_data):
|
||||
"""
|
||||
save timestamp of when feed was last modified
|
||||
"""
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if "etag" in feed_data:
|
||||
etag = feed_data.etag
|
||||
if "modified" in feed_data:
|
||||
modified = feed_data.modified
|
||||
|
||||
if etag or modified:
|
||||
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
|
||||
f.write(str((etag, modified)))
|
||||
|
||||
|
||||
def get_etag(feed_name):
|
||||
"""
|
||||
return timestamp of when feed was last modified
|
||||
"""
|
||||
fn = os.path.join("etags", feed_name + ".txt")
|
||||
etag = ""
|
||||
modified = ""
|
||||
|
||||
if os.path.exists(fn):
|
||||
etag, modified = make_tuple(open(fn, "r").read())
|
||||
|
||||
return etag, modified
|
||||
|
||||
|
||||
def create_frontmatter(entry):
|
||||
"""
|
||||
parse RSS metadata and return as frontmatter
|
||||
"""
|
||||
if 'published' in entry:
|
||||
published = entry.published_parsed
|
||||
if 'updated' in entry:
|
||||
published = entry.updated_parsed
|
||||
|
||||
published = arrow.get(published)
|
||||
|
||||
if 'author' in entry:
|
||||
author = entry.author
|
||||
else:
|
||||
author = ''
|
||||
|
||||
if 'authors' in entry:
|
||||
authors = []
|
||||
for a in entry.authors:
|
||||
authors.append(a['name'])
|
||||
|
||||
if 'summary' in entry:
|
||||
summary = entry.summary
|
||||
else:
|
||||
summary = ''
|
||||
|
||||
if 'publisher' in entry:
|
||||
publisher = entry.publisher
|
||||
else:
|
||||
publisher = ''
|
||||
|
||||
tags = []
|
||||
if 'tags' in entry:
|
||||
#TODO finish categories
|
||||
for t in entry.tags:
|
||||
tags.append(t['term'])
|
||||
|
||||
frontmatter = {
|
||||
'title':entry.title,
|
||||
'date': published.format(),
|
||||
'summary': '',
|
||||
'author': author,
|
||||
'original_link': entry.link,
|
||||
'feed_name': entry['feed_name'],
|
||||
'timelines': str(tags),
|
||||
}
|
||||
|
||||
return frontmatter
|
||||
|
||||
def sanitize_yaml (frontmatter):
|
||||
"""
|
||||
Escapes any occurences of double quotes
|
||||
in any of the frontmatter fields
|
||||
See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types
|
||||
"""
|
||||
for k, v in frontmatter.items():
|
||||
if type(v) == type([]):
|
||||
#some fields are lists
|
||||
l = []
|
||||
for i in v:
|
||||
i = yamlre.sub('\\"', i)
|
||||
l.append(i)
|
||||
frontmatter[k] = l
|
||||
|
||||
else:
|
||||
v = yamlre.sub('\\"', v)
|
||||
frontmatter[k] = v
|
||||
|
||||
return frontmatter
|
||||
|
||||
|
||||
def create_post(post_dir, entry):
|
||||
"""
|
||||
write hugo post based on RSS entry
|
||||
"""
|
||||
frontmatter = create_frontmatter(entry)
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.makedirs(post_dir)
|
||||
|
||||
if "content" in entry:
|
||||
post_content = entry.content[0].value
|
||||
else:
|
||||
post_content = entry.summary
|
||||
|
||||
parsed_content = parse_posts(post_dir, post_content)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
template = env.get_template("timeline.md")
|
||||
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
|
||||
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content)
|
||||
f.write(post)
|
||||
print("created post for", entry.title, "({})".format(entry.link))
|
||||
|
||||
|
||||
def grab_media(post_directory, url, prefered_name=None):
|
||||
"""
|
||||
download media linked in post to have local copy
|
||||
if download succeeds return new local path otherwise return url
|
||||
"""
|
||||
media_item = urlparse(url).path.split('/')[-1]
|
||||
|
||||
if prefered_name:
|
||||
media_item = prefered_name
|
||||
|
||||
try:
|
||||
if not os.path.exists(os.path.join(post_directory, media_item)):
|
||||
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
|
||||
response = requests.get(url, stream=True)
|
||||
if response.ok:
|
||||
with open(os.path.join(post_directory, media_item), 'wb') as media_file:
|
||||
shutil.copyfileobj(response.raw, media_file)
|
||||
print('Downloaded media item', media_item)
|
||||
return media_item
|
||||
return media_item
|
||||
elif os.path.exists(os.path.join(post_directory, media_item)):
|
||||
return media_item
|
||||
|
||||
except Exception as e:
|
||||
print('Failed to download image', url)
|
||||
print(e)
|
||||
return url
|
||||
|
||||
|
||||
def parse_posts(post_dir, post_content):
|
||||
"""
|
||||
parse the post content to for media items
|
||||
replace foreign image with local copy
|
||||
filter out iframe sources not in allowlist
|
||||
"""
|
||||
soup = BeautifulSoup(post_content, "html.parser")
|
||||
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
|
||||
|
||||
for img in soup(["img", "object"]):
|
||||
if img.get("src") != None:
|
||||
local_image = grab_media(post_dir, img["src"])
|
||||
if img["src"] != local_image:
|
||||
img["src"] = local_image
|
||||
|
||||
for iframe in soup(["iframe"]):
|
||||
if not any(source in iframe["src"] for source in allowed_iframe_sources):
|
||||
print("filtered iframe: {}...".format(iframe["src"][:25]))
|
||||
iframe.decompose()
|
||||
return soup.decode()
|
||||
|
||||
|
||||
def grab_feed(feed_url):
|
||||
"""
|
||||
check whether feed has been updated
|
||||
download & return it if it has
|
||||
"""
|
||||
feed_name = urlparse(feed_url).netloc
|
||||
|
||||
etag, modified = get_etag(feed_name)
|
||||
|
||||
try:
|
||||
if modified:
|
||||
data = feedparser.parse(feed_url, modified=modified)
|
||||
elif etag:
|
||||
data = feedparser.parse(feed_url, etag=etag)
|
||||
else:
|
||||
data = feedparser.parse(feed_url)
|
||||
except Exception as e:
|
||||
print("Error grabbing feed")
|
||||
print(feed_name)
|
||||
print(e)
|
||||
return False
|
||||
|
||||
print(data.status, feed_url)
|
||||
if data.status == 200:
|
||||
# 304 means the feed has not been modified since we last checked
|
||||
write_etag(feed_name, data)
|
||||
return data
|
||||
return False
|
||||
|
||||
def create_opds_post(post_dir, entry):
|
||||
"""
|
||||
create a HUGO post based on OPDS entry
|
||||
or update it if the timestamp is newer
|
||||
Downloads the cover & file
|
||||
"""
|
||||
|
||||
frontmatter = create_frontmatter(entry)
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
template = env.get_template("feed.md")
|
||||
|
||||
if not os.path.exists(post_dir):
|
||||
os.makedirs(post_dir)
|
||||
|
||||
if os.path.exists(os.path.join(post_dir, '.timestamp')):
|
||||
old_timestamp = open(os.path.join(post_dir, '.timestamp')).read()
|
||||
old_timestamp = arrow.get(float(old_timestamp))
|
||||
current_timestamp = arrow.get(entry['updated_parsed'])
|
||||
|
||||
if current_timestamp > old_timestamp:
|
||||
pass
|
||||
else:
|
||||
print('Book "{}..." already up to date'.format(entry['title'][:32]))
|
||||
return
|
||||
|
||||
for item in entry.links:
|
||||
ft = item['type'].split('/')[-1]
|
||||
fn = item['rel'].split('/')[-1]
|
||||
|
||||
if fn == "acquisition":
|
||||
fn = "publication" #calling the publications acquisition is weird
|
||||
|
||||
prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft)
|
||||
|
||||
grab_media(post_dir, item['href'], prefered_name)
|
||||
|
||||
if "summary" in entry:
|
||||
summary = entry.summary
|
||||
else:
|
||||
summary = ""
|
||||
|
||||
with open(os.path.join(post_dir,'index.md'),'w') as f:
|
||||
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary)
|
||||
f.write(post)
|
||||
print('created post for Book', entry.title)
|
||||
|
||||
with open(os.path.join(post_dir, '.timestamp'), 'w') as f:
|
||||
timestamp = arrow.get(entry['updated_parsed'])
|
||||
f.write(timestamp.format('X'))
|
||||
|
||||
|
||||
def main():
|
||||
feed_urls = open("feeds_list_timeline.txt", "r").read().splitlines()
|
||||
|
||||
start = time.time()
|
||||
|
||||
if not os.path.exists("etags"):
|
||||
os.mkdir("etags")
|
||||
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
feed_dict = dict()
|
||||
for url in feed_urls:
|
||||
feed_name = urlparse(url).netloc
|
||||
feed_dict[url] = feed_name
|
||||
|
||||
feed_names = feed_dict.values()
|
||||
content_dirs = os.listdir(output_dir)
|
||||
for i in content_dirs:
|
||||
if i not in feed_names:
|
||||
shutil.rmtree(os.path.join(output_dir, i))
|
||||
print("%s not in feeds_list.txt, removing local data" %(i))
|
||||
|
||||
# add iframe to the allowlist of feedparser's sanitizer,
|
||||
# this is now handled in parse_post()
|
||||
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
|
||||
|
||||
for feed_url in feed_urls:
|
||||
|
||||
feed_name = feed_dict[feed_url]
|
||||
|
||||
feed_dir = os.path.join(output_dir, feed_name)
|
||||
|
||||
if not os.path.exists(feed_dir):
|
||||
os.makedirs(feed_dir)
|
||||
|
||||
existing_posts = os.listdir(feed_dir)
|
||||
|
||||
data = grab_feed(feed_url)
|
||||
|
||||
if data:
|
||||
|
||||
opds_feed = False
|
||||
for i in data.feed['links']:
|
||||
if i['rel'] == 'self':
|
||||
if 'opds' in i['type']:
|
||||
opds_feed = True
|
||||
print("OPDS type feed!")
|
||||
|
||||
|
||||
for entry in data.entries:
|
||||
# if 'tags' in entry:
|
||||
# for tag in entry.tags:
|
||||
# for x in ['lumbung.space', 'D15', 'lumbung']:
|
||||
# if x in tag['term']:
|
||||
# print(entry.title)
|
||||
entry["feed_name"] = feed_name
|
||||
|
||||
post_name = slugify(entry.title)
|
||||
|
||||
# pixelfed returns the whole post text as the post name. max
|
||||
# filename length is 255 on many systems. here we're shortening
|
||||
# the name and adding a hash to it to avoid a conflict in a
|
||||
# situation where 2 posts start with exactly the same text.
|
||||
if len(post_name) > 150:
|
||||
post_hash = md5(bytes(post_name, "utf-8"))
|
||||
post_name = post_name[:150] + "-" + post_hash.hexdigest()
|
||||
|
||||
if opds_feed:
|
||||
entry['opds'] = True
|
||||
#format: Beyond-Debiasing-Report_Online-75535a4886e3
|
||||
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1]
|
||||
|
||||
post_dir = os.path.join(output_dir, feed_name, post_name)
|
||||
|
||||
if post_name not in existing_posts:
|
||||
# if there is a blog entry we dont already have, make it
|
||||
if opds_feed:
|
||||
create_opds_post(post_dir, entry)
|
||||
else:
|
||||
create_post(post_dir, entry)
|
||||
|
||||
elif post_name in existing_posts:
|
||||
# if we already have it, update it
|
||||
if opds_feed:
|
||||
create_opds_post(post_dir, entry)
|
||||
else:
|
||||
create_post(post_dir, entry)
|
||||
existing_posts.remove(
|
||||
post_name
|
||||
) # create list of posts which have not been returned by the feed
|
||||
|
||||
for post in existing_posts:
|
||||
# remove blog posts no longer returned by the RSS feed
|
||||
print("deleted", post)
|
||||
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
|
||||
|
||||
end = time.time()
|
||||
|
||||
print(end - start)
|
161
konfluks/video.py
Normal file
161
konfluks/video.py
Normal file
@ -0,0 +1,161 @@
|
||||
import ast
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from slugify import slugify
|
||||
|
||||
import arrow
|
||||
import jinja2
|
||||
import peertube
|
||||
import requests
|
||||
|
||||
host = "https://tv.lumbung.space"
|
||||
configuration = peertube.Configuration(host=host + "/api/v1")
|
||||
client = peertube.ApiClient(configuration)
|
||||
|
||||
# jinja filters & config
|
||||
def duration(n):
|
||||
"""
|
||||
convert '6655' in '1:50:55'
|
||||
|
||||
"""
|
||||
return str(datetime.timedelta(seconds=n))
|
||||
|
||||
|
||||
def linebreaks(text):
|
||||
if not text:
|
||||
return text
|
||||
else:
|
||||
import re
|
||||
|
||||
br = re.compile(r"(\r\n|\r|\n)")
|
||||
return br.sub(r"<br />\n", text)
|
||||
|
||||
|
||||
def create_post(post_directory, video_metadata, host):
|
||||
global client # lazy
|
||||
|
||||
if not os.path.exists(post_directory):
|
||||
os.mkdir(post_directory)
|
||||
|
||||
preview_image = video_metadata["preview_path"].split("/")[-1]
|
||||
|
||||
if not os.path.exists(os.path.join(post_directory, preview_image)):
|
||||
# download preview image
|
||||
response = requests.get(host + video_metadata["preview_path"], stream=True)
|
||||
with open(os.path.join(post_directory, preview_image), "wb") as img_file:
|
||||
shutil.copyfileobj(response.raw, img_file)
|
||||
print("Downloaded cover image")
|
||||
|
||||
# replace the truncated description with the full video description
|
||||
# peertube api is some broken thing in between a py dict and a json file
|
||||
api_response = peertube.VideoApi(client).videos_id_description_get(
|
||||
video_metadata["uuid"]
|
||||
)
|
||||
long_description = ast.literal_eval(api_response)
|
||||
video_metadata["description"] = long_description["description"]
|
||||
|
||||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||||
env.filters["duration"] = duration
|
||||
env.filters["linebreaks"] = linebreaks
|
||||
template = env.get_template("video.md")
|
||||
|
||||
with open(os.path.join(post_directory, "index.md"), "w") as f:
|
||||
post = template.render(v=video_metadata, host=host, preview_image=preview_image)
|
||||
f.write(post)
|
||||
|
||||
with open(os.path.join(post_directory, ".timestamp"), "w") as f:
|
||||
timestamp = arrow.get(video_metadata["updated_at"])
|
||||
f.write(timestamp.format("X"))
|
||||
|
||||
|
||||
def update_post(post_directory, video_metadata, host):
|
||||
if os.path.exists(post_directory):
|
||||
if os.path.exists(os.path.join(post_directory, ".timestamp")):
|
||||
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
|
||||
|
||||
# FIXME: this is ugly but I need to do this because arrow removes miliseconds
|
||||
current_timestamp = arrow.get(video_metadata["updated_at"])
|
||||
current_timestamp = arrow.get(current_timestamp.format("X"))
|
||||
|
||||
if current_timestamp > arrow.get(old_timestamp):
|
||||
print(
|
||||
"Updating",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
create_post(post_directory, video_metadata, host)
|
||||
else:
|
||||
print(
|
||||
"Video current: ",
|
||||
video_metadata["name"],
|
||||
"({})".format(video_metadata["uuid"]),
|
||||
)
|
||||
else:
|
||||
# compat for when there is no timestamp yet..
|
||||
create_post(post_directory, video_metadata, host)
|
||||
|
||||
def main():
|
||||
v = peertube.VideoApi(client)
|
||||
count = 100
|
||||
page = 0
|
||||
try:
|
||||
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
||||
videos = response.to_dict()
|
||||
total = videos['total']
|
||||
videos = videos['data']
|
||||
total -= count
|
||||
if total > 0:
|
||||
to_download = total // count
|
||||
last_page = total % count
|
||||
for i in range(to_download):
|
||||
page += 1
|
||||
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
||||
videos += response.to_dict()['data']
|
||||
if last_page > 0:
|
||||
page += 1
|
||||
response = v.videos_get(count=count, filter="local", tags_one_of="publish", start=page)
|
||||
videos += response.to_dict()['data'][-1*last_page:]
|
||||
|
||||
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
for video_metadata in videos:
|
||||
post_name = slugify(video_metadata["name"]) + "-" + video_metadata["uuid"]
|
||||
post_dir = os.path.join(output_dir, post_name)
|
||||
|
||||
if (
|
||||
post_name not in existing_posts
|
||||
): # if there is a video we dont already have, make it
|
||||
print(
|
||||
"New: ", video_metadata["name"], "({})".format(video_metadata["uuid"])
|
||||
)
|
||||
create_post(post_dir, video_metadata, host)
|
||||
|
||||
elif (
|
||||
post_name in existing_posts
|
||||
): # if we already have the video do nothing, possibly update
|
||||
update_post(post_dir, video_metadata, host)
|
||||
existing_posts.remove(
|
||||
post_name
|
||||
) # create list of posts which have not been returned by peertube
|
||||
|
||||
except:
|
||||
print("didn't get a response from peertube, instance might have been taken down or made private. removing all posts.")
|
||||
output_dir = os.environ.get("OUTPUT_DIR")
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
existing_posts = os.listdir(output_dir)
|
||||
|
||||
for post in existing_posts:
|
||||
print("deleted", post) # rm posts not returned
|
||||
shutil.rmtree(os.path.join(output_dir, post))
|
||||
|
Reference in New Issue
Block a user