#!/bin/python3 # lumbung.space rss feed aggregator # © 2021 roel roscam abbing gplv3 etc import os import shutil import time from ast import literal_eval as make_tuple from pathlib import Path from urllib.parse import urlparse import arrow import feedparser import jinja2 import requests from bs4 import BeautifulSoup from slugify import slugify def write_etag(feed_name, feed_data): """ save timestamp of when feed was last modified """ etag = "" modified = "" if "etag" in feed_data: etag = feed_data.etag if "modified" in feed_data: modified = feed_data.modified if etag or modified: with open(os.path.join("etags", feed_name + ".txt"), "w") as f: f.write(str((etag, modified))) def get_etag(feed_name): """ return timestamp of when feed was last modified """ fn = os.path.join("etags", feed_name + ".txt") etag = "" modified = "" if os.path.exists(fn): etag, modified = make_tuple(open(fn, "r").read()) return etag, modified def create_frontmatter(entry): """ parse RSS metadata and return as frontmatter """ if "published" in entry: published = entry.published_parsed if "updated" in entry: published = entry.updated_parsed published = arrow.get(published) if "author" in entry: author = entry.author else: author = "" tags = [] if "tags" in entry: # TODO finish categories for t in entry.tags: tags.append(t["term"]) frontmatter = { "title": entry.title, "date": published.format(), "summary": "", "author": author, "original_link": entry.link, "feed_name": entry["feed_name"], "tags": str(tags), } return frontmatter def create_post(post_dir, entry): """ write hugo post based on RSS entry """ frontmatter = create_frontmatter(entry) if not os.path.exists(post_dir): os.makedirs(post_dir) if "content" in entry: post_content = entry.content[0].value else: post_content = entry.summary parsed_content = parse_posts(post_dir, post_content) with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html post = template.render(frontmatter=frontmatter, content=parsed_content) f.write(post) print("created post for", entry.title, "({})".format(entry.link)) def grab_media(post_directory, url): """ download media linked in post to have local copy if download succeeds return new local path otherwise return url """ image = urlparse(url).path.split("/")[-1] try: if not os.path.exists(os.path.join(post_directory, image)): # TODO: stream is true is a conditional so we could check the headers for things, mimetype etc response = requests.get(url, stream=True) if response.ok: with open(os.path.join(post_directory, image), "wb") as img_file: shutil.copyfileobj(response.raw, img_file) print("Downloaded cover image", image) return image return image elif os.path.exists(os.path.join(post_directory, image)): return image except Exception as e: print("Failed to download image", url) print(e) return url def parse_posts(post_dir, post_content): """ parse the post content to for media items replace foreign image with local copy filter out iframe sources not in allowlist """ soup = BeautifulSoup(post_content, "html.parser") allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"] media = [] for img in soup(["img", "object"]): local_image = grab_media(post_dir, img["src"]) if img["src"] != local_image: img["src"] = local_image for iframe in soup(["iframe"]): if not any(source in iframe["src"] for source in allowed_iframe_sources): print("filtered iframe: {}...".format(iframe["src"][:25])) iframe.decompose() return soup.decode() def grab_feed(feed_url): """ check whether feed has been updated download & return it if it has """ feed_name = urlparse(feed_url).netloc etag, modified = get_etag(feed_name) try: if modified: data = feedparser.parse(feed_url, modified=modified) elif etag: data = feedparser.parse(feed_url, etag=etag) else: data = feedparser.parse(feed_url) except Exception as e: print("Error grabbing feed") print(feed_name) print(e) return False print(data.status, feed_url) if data.status == 200: # 304 means the feed has not been modified since we last checked write_etag(feed_name, data) return data return False feed_urls = open("feeds_list.txt", "r").read().splitlines() start = time.time() if not os.path.exists("etags"): os.mkdir("etags") template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) output_dir = os.environ.get("OUTPUT_DIR") if not os.path.exists(output_dir): os.makedirs(output_dir) template = env.get_template("feed.md") # add iframe to the allowlist of feedparser's sanitizer, # this is now handled in parse_post() feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"} for feed_url in feed_urls: feed_name = urlparse(feed_url).netloc feed_dir = os.path.join(output_dir, feed_name) if not os.path.exists(feed_dir): os.makedirs(feed_dir) existing_posts = os.listdir(feed_dir) data = grab_feed(feed_url) if data: for entry in data.entries: # if 'tags' in entry: # for tag in entry.tags: # for x in ['lumbung.space', 'D15', 'lumbung']: # if x in tag['term']: # print(entry.title) entry["feed_name"] = feed_name post_name = slugify(entry.title) post_dir = os.path.join(output_dir, feed_name, post_name) if post_name not in existing_posts: # if there is a blog entry we dont already have, make it create_post(post_dir, entry) elif post_name in existing_posts: # if we already have it, update it create_post(post_dir, entry) existing_posts.remove( post_name ) # create list of posts which have not been returned by the feed for post in existing_posts: # remove blog posts no longer returned by the RSS feed print("deleted", post) shutil.rmtree(os.path.join(feed_dir, slugify(post))) end = time.time() print(end - start)