konfluks/lumbunglib/feed.py

392 lines
12 KiB
Python
Raw Normal View History

2021-12-15 10:30:10 +00:00
import os
import shutil
import time
2022-02-03 14:57:01 +00:00
from hashlib import md5
2021-12-15 10:41:35 +00:00
from ast import literal_eval as make_tuple
2021-12-15 10:55:51 +00:00
from pathlib import Path
2021-12-15 10:41:35 +00:00
from urllib.parse import urlparse
2021-12-15 10:30:10 +00:00
import arrow
2021-12-15 10:41:35 +00:00
import feedparser
import jinja2
import requests
from bs4 import BeautifulSoup
from slugify import slugify
from re import sub
2021-12-15 10:30:10 +00:00
def write_etag(feed_name, feed_data):
"""
save timestamp of when feed was last modified
"""
2021-12-15 10:41:35 +00:00
etag = ""
modified = ""
if "etag" in feed_data:
2021-12-15 10:30:10 +00:00
etag = feed_data.etag
2021-12-15 10:41:35 +00:00
if "modified" in feed_data:
2021-12-15 10:30:10 +00:00
modified = feed_data.modified
if etag or modified:
2021-12-15 10:41:35 +00:00
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
2021-12-15 10:30:10 +00:00
f.write(str((etag, modified)))
2021-12-15 10:41:35 +00:00
2021-12-15 10:30:10 +00:00
def get_etag(feed_name):
"""
return timestamp of when feed was last modified
"""
2021-12-15 10:41:35 +00:00
fn = os.path.join("etags", feed_name + ".txt")
etag = ""
modified = ""
2021-12-15 10:30:10 +00:00
if os.path.exists(fn):
2021-12-15 10:41:35 +00:00
etag, modified = make_tuple(open(fn, "r").read())
2021-12-15 10:30:10 +00:00
return etag, modified
2021-12-15 10:41:35 +00:00
2021-12-15 10:30:10 +00:00
def create_frontmatter(entry):
"""
parse RSS metadata and return as frontmatter
2021-12-15 10:30:10 +00:00
"""
if 'published' in entry:
2021-12-15 10:30:10 +00:00
published = entry.published_parsed
if 'updated' in entry:
2021-12-15 10:30:10 +00:00
published = entry.updated_parsed
published = arrow.get(published)
if 'author' in entry:
2021-12-15 10:30:10 +00:00
author = entry.author
else:
author = ''
if 'authors' in entry:
authors = []
for a in entry.authors:
authors.append(a['name'])
if 'summary' in entry:
summary = entry.summary
else:
summary = ''
2021-12-15 10:30:10 +00:00
if 'publisher' in entry:
publisher = entry.publisher
else:
publisher = ''
2021-12-15 10:30:10 +00:00
tags = []
if 'tags' in entry:
#TODO finish categories
2021-12-15 10:30:10 +00:00
for t in entry.tags:
tags.append(t['term'])
if "opds" in entry:
frontmatter = {
'title':entry.title,
'date': published.format(),
'summary': summary,
'author': ",".join(authors),
'publisher': publisher,
'original_link': entry.links[0]['href'].replace('opds/cover/','books/'),
'feed_name': entry['feed_name'],
'tags': str(tags),
'category': "books"
}
else:
frontmatter = {
'title':entry.title,
'date': published.format(),
'summary': '',
'author': author,
'original_link': entry.link,
'feed_name': entry['feed_name'],
'tags': str(tags)
}
2021-12-15 10:30:10 +00:00
return frontmatter
def sanitize_yaml (frontmatter):
"""
Escapes any occurences of double quotes
in any of the frontmatter fields
See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types
"""
for k, v in frontmatter.items():
if type(v) == type([]):
#some fields are lists
l = []
for i in v:
i = sub('"', '\\"', i)
l.append(i)
frontmatter[k] = l
else:
v = sub('"', '\\"', v)
frontmatter[k] = v
return frontmatter
2021-12-15 10:41:35 +00:00
2021-12-15 10:30:10 +00:00
def create_post(post_dir, entry):
"""
write hugo post based on RSS entry
"""
frontmatter = create_frontmatter(entry)
if not os.path.exists(post_dir):
os.makedirs(post_dir)
2021-12-15 10:41:35 +00:00
if "content" in entry:
2021-12-15 10:30:10 +00:00
post_content = entry.content[0].value
else:
post_content = entry.summary
parsed_content = parse_posts(post_dir, post_content)
2021-12-15 11:23:37 +00:00
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template("feed.md")
2021-12-15 10:41:35 +00:00
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content)
2021-12-15 10:30:10 +00:00
f.write(post)
2021-12-15 10:41:35 +00:00
print("created post for", entry.title, "({})".format(entry.link))
2021-12-15 10:30:10 +00:00
def grab_media(post_directory, url, prefered_name=None):
2021-12-15 10:30:10 +00:00
"""
download media linked in post to have local copy
if download succeeds return new local path otherwise return url
"""
media_item = urlparse(url).path.split('/')[-1]
if prefered_name:
media_item = prefered_name
2021-12-15 10:30:10 +00:00
try:
if not os.path.exists(os.path.join(post_directory, media_item)):
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
2021-12-15 10:30:10 +00:00
response = requests.get(url, stream=True)
if response.ok:
with open(os.path.join(post_directory, media_item), 'wb') as media_file:
shutil.copyfileobj(response.raw, media_file)
print('Downloaded media item', media_item)
return media_item
return media_item
elif os.path.exists(os.path.join(post_directory, media_item)):
return media_item
2021-12-15 10:30:10 +00:00
except Exception as e:
print('Failed to download image', url)
2021-12-15 10:30:10 +00:00
print(e)
return url
def parse_posts(post_dir, post_content):
"""
parse the post content to for media items
replace foreign image with local copy
filter out iframe sources not in allowlist
"""
soup = BeautifulSoup(post_content, "html.parser")
2021-12-15 10:41:35 +00:00
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
2021-12-15 10:30:10 +00:00
2021-12-15 10:41:35 +00:00
for img in soup(["img", "object"]):
local_image = grab_media(post_dir, img["src"])
if img["src"] != local_image:
img["src"] = local_image
2021-12-15 10:30:10 +00:00
2021-12-15 10:41:35 +00:00
for iframe in soup(["iframe"]):
if not any(source in iframe["src"] for source in allowed_iframe_sources):
print("filtered iframe: {}...".format(iframe["src"][:25]))
2021-12-15 10:30:10 +00:00
iframe.decompose()
return soup.decode()
2021-12-15 10:41:35 +00:00
2021-12-15 10:30:10 +00:00
def grab_feed(feed_url):
"""
check whether feed has been updated
2021-12-15 10:41:35 +00:00
download & return it if it has
2021-12-15 10:30:10 +00:00
"""
feed_name = urlparse(feed_url).netloc
2021-12-15 10:41:35 +00:00
2021-12-15 10:30:10 +00:00
etag, modified = get_etag(feed_name)
try:
if modified:
data = feedparser.parse(feed_url, modified=modified)
elif etag:
data = feedparser.parse(feed_url, etag=etag)
else:
data = feedparser.parse(feed_url)
except Exception as e:
2021-12-15 10:41:35 +00:00
print("Error grabbing feed")
2021-12-15 10:30:10 +00:00
print(feed_name)
print(e)
return False
print(data.status, feed_url)
if data.status == 200:
2021-12-15 10:41:35 +00:00
# 304 means the feed has not been modified since we last checked
2021-12-15 10:30:10 +00:00
write_etag(feed_name, data)
return data
return False
def create_opds_post(post_dir, entry):
"""
create a HUGO post based on OPDS entry
or update it if the timestamp is newer
Downloads the cover & file
"""
frontmatter = create_frontmatter(entry)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template("feed.md")
if not os.path.exists(post_dir):
os.makedirs(post_dir)
if os.path.exists(os.path.join(post_dir, '.timestamp')):
old_timestamp = open(os.path.join(post_dir, '.timestamp')).read()
old_timestamp = arrow.get(float(old_timestamp))
current_timestamp = arrow.get(entry['updated_parsed'])
if current_timestamp > old_timestamp:
pass
else:
print('Book "{}..." already up to date'.format(entry['title'][:32]))
return
for item in entry.links:
ft = item['type'].split('/')[-1]
fn = item['rel'].split('/')[-1]
if fn == "acquisition":
fn = "publication" #calling the publications acquisition is weird
prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft)
grab_media(post_dir, item['href'], prefered_name)
if "summary" in entry:
summary = entry.summary
else:
summary = ""
with open(os.path.join(post_dir,'index.md'),'w') as f:
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary)
f.write(post)
print('created post for Book', entry.title)
with open(os.path.join(post_dir, '.timestamp'), 'w') as f:
timestamp = arrow.get(entry['updated_parsed'])
f.write(timestamp.format('X'))
2021-12-15 10:41:35 +00:00
2021-12-15 11:23:37 +00:00
def main():
feed_urls = open("feeds_list.txt", "r").read().splitlines()
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
start = time.time()
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
if not os.path.exists("etags"):
os.mkdir("etags")
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
output_dir = os.environ.get("OUTPUT_DIR")
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
if not os.path.exists(output_dir):
os.makedirs(output_dir)
2021-12-15 10:30:10 +00:00
feed_dict = dict()
for url in feed_urls:
feed_name = urlparse(url).netloc
feed_dict[url] = feed_name
feed_names = feed_dict.values()
content_dirs = os.listdir(output_dir)
for i in content_dirs:
if i not in feed_names:
shutil.rmtree(os.path.join(output_dir, i))
print("%s not in feeds_list.txt, removing local data" %(i))
2021-12-15 11:23:37 +00:00
# add iframe to the allowlist of feedparser's sanitizer,
# this is now handled in parse_post()
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
for feed_url in feed_urls:
2021-12-15 10:30:10 +00:00
feed_name = feed_dict[feed_url]
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
feed_dir = os.path.join(output_dir, feed_name)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
if not os.path.exists(feed_dir):
os.makedirs(feed_dir)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
existing_posts = os.listdir(feed_dir)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
data = grab_feed(feed_url)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
if data:
opds_feed = False
for i in data.feed['links']:
if i['rel'] == 'self':
if 'opds' in i['type']:
opds_feed = True
print("OPDS type feed!")
2021-12-15 11:23:37 +00:00
for entry in data.entries:
# if 'tags' in entry:
# for tag in entry.tags:
# for x in ['lumbung.space', 'D15', 'lumbung']:
# if x in tag['term']:
# print(entry.title)
entry["feed_name"] = feed_name
2021-12-15 10:41:35 +00:00
2021-12-15 11:23:37 +00:00
post_name = slugify(entry.title)
2022-02-03 14:57:01 +00:00
# pixelfed returns the whole post text as the post name. max
# filename length is 255 on many systems. here we're shortening
# the name and adding a hash to it to avoid a conflict in a
# situation where 2 posts start with exactly the same text.
if len(post_name) > 150:
post_hash = md5(bytes(post_name, "utf-8"))
post_name = post_name[:150] + "-" + post_hash.hexdigest()
if opds_feed:
entry['opds'] = True
#format: Beyond-Debiasing-Report_Online-75535a4886e3
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1]
2021-12-15 11:23:37 +00:00
post_dir = os.path.join(output_dir, feed_name, post_name)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
if post_name not in existing_posts:
# if there is a blog entry we dont already have, make it
if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
elif post_name in existing_posts:
# if we already have it, update it
if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
2021-12-15 11:23:37 +00:00
existing_posts.remove(
post_name
) # create list of posts which have not been returned by the feed
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
for post in existing_posts:
# remove blog posts no longer returned by the RSS feed
print("deleted", post)
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
end = time.time()
2021-12-15 10:30:10 +00:00
2021-12-15 11:23:37 +00:00
print(end - start)