lumbunglib/lumbung-feed-aggregator/rss_aggregator.py
2021-12-15 11:41:35 +01:00

255 lines
6.8 KiB
Python

#!/bin/python3
# lumbung.space rss feed aggregator
# © 2021 roel roscam abbing gplv3 etc
import os
import shutil
import time
from ast import literal_eval as make_tuple
from urllib.parse import urlparse
import arrow
import feedparser
import jinja2
import requests
from bs4 import BeautifulSoup
from slugify import slugify
def write_etag(feed_name, feed_data):
"""
save timestamp of when feed was last modified
"""
etag = ""
modified = ""
if "etag" in feed_data:
etag = feed_data.etag
if "modified" in feed_data:
modified = feed_data.modified
if etag or modified:
with open(os.path.join("etags", feed_name + ".txt"), "w") as f:
f.write(str((etag, modified)))
def get_etag(feed_name):
"""
return timestamp of when feed was last modified
"""
fn = os.path.join("etags", feed_name + ".txt")
etag = ""
modified = ""
if os.path.exists(fn):
etag, modified = make_tuple(open(fn, "r").read())
return etag, modified
def create_frontmatter(entry):
"""
parse RSS metadata and return as frontmatter
"""
if "published" in entry:
published = entry.published_parsed
if "updated" in entry:
published = entry.updated_parsed
published = arrow.get(published)
if "author" in entry:
author = entry.author
else:
author = ""
tags = []
if "tags" in entry:
# TODO finish categories
for t in entry.tags:
tags.append(t["term"])
frontmatter = {
"title": entry.title,
"date": published.format(),
"summary": "",
"author": author,
"original_link": entry.link,
"feed_name": entry["feed_name"],
"tags": str(tags),
}
return frontmatter
def create_post(post_dir, entry):
"""
write hugo post based on RSS entry
"""
frontmatter = create_frontmatter(entry)
if not os.path.exists(post_dir):
os.makedirs(post_dir)
if "content" in entry:
post_content = entry.content[0].value
else:
post_content = entry.summary
parsed_content = parse_posts(post_dir, post_content)
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
post = template.render(frontmatter=frontmatter, content=parsed_content)
f.write(post)
print("created post for", entry.title, "({})".format(entry.link))
def grab_media(post_directory, url):
"""
download media linked in post to have local copy
if download succeeds return new local path otherwise return url
"""
image = urlparse(url).path.split("/")[-1]
try:
if not os.path.exists(os.path.join(post_directory, image)):
# TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
response = requests.get(url, stream=True)
if response.ok:
with open(os.path.join(post_directory, image), "wb") as img_file:
shutil.copyfileobj(response.raw, img_file)
print("Downloaded cover image", image)
return image
return image
elif os.path.exists(os.path.join(post_directory, image)):
return image
except Exception as e:
print("Failed to download image", url)
print(e)
return url
def parse_posts(post_dir, post_content):
"""
parse the post content to for media items
replace foreign image with local copy
filter out iframe sources not in allowlist
"""
soup = BeautifulSoup(post_content, "html.parser")
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
media = []
for img in soup(["img", "object"]):
local_image = grab_media(post_dir, img["src"])
if img["src"] != local_image:
img["src"] = local_image
for iframe in soup(["iframe"]):
if not any(source in iframe["src"] for source in allowed_iframe_sources):
print("filtered iframe: {}...".format(iframe["src"][:25]))
iframe.decompose()
return soup.decode()
def grab_feed(feed_url):
"""
check whether feed has been updated
download & return it if it has
"""
feed_name = urlparse(feed_url).netloc
etag, modified = get_etag(feed_name)
try:
if modified:
data = feedparser.parse(feed_url, modified=modified)
elif etag:
data = feedparser.parse(feed_url, etag=etag)
else:
data = feedparser.parse(feed_url)
except Exception as e:
print("Error grabbing feed")
print(feed_name)
print(e)
return False
print(data.status, feed_url)
if data.status == 200:
# 304 means the feed has not been modified since we last checked
write_etag(feed_name, data)
return data
return False
feed_urls = open("feeds_list.txt", "r").read().splitlines()
start = time.time()
if not os.path.exists("etags"):
os.mkdir("etags")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.curdir))
output_dir = os.environ.get(
"OUTPUT_DIR", "/home/r/Programming/lumbung.space/lumbung.space-web/content/posts/"
)
# output_dir = os.environ.get('OUTPUT_DIR', 'network/')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
template = env.get_template("post_template.md")
# add iframe to the allowlist of feedparser's sanitizer,
# this is now handled in parse_post()
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
for feed_url in feed_urls:
feed_name = urlparse(feed_url).netloc
feed_dir = os.path.join(output_dir, feed_name)
if not os.path.exists(feed_dir):
os.makedirs(feed_dir)
existing_posts = os.listdir(feed_dir)
data = grab_feed(feed_url)
if data:
for entry in data.entries:
# if 'tags' in entry:
# for tag in entry.tags:
# for x in ['lumbung.space', 'D15', 'lumbung']:
# if x in tag['term']:
# print(entry.title)
entry["feed_name"] = feed_name
post_name = slugify(entry.title)
post_dir = os.path.join(output_dir, feed_name, post_name)
if post_name not in existing_posts:
# if there is a blog entry we dont already have, make it
create_post(post_dir, entry)
elif post_name in existing_posts:
# if we already have it, update it
create_post(post_dir, entry)
existing_posts.remove(
post_name
) # create list of posts which have not been returned by the feed
for post in existing_posts:
# remove blog posts no longer returned by the RSS feed
print("deleted", post)
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
end = time.time()
print(end - start)