diff --git a/lumbunglib/templates/timeline.md b/lumbunglib/templates/timeline.md new file mode 100644 index 0000000..0cc9c2b --- /dev/null +++ b/lumbunglib/templates/timeline.md @@ -0,0 +1,14 @@ +--- +title: "{{ frontmatter.title }}" +date: "{{ frontmatter.date }}" #2021-06-10T10:46:33+02:00 +draft: false +summary: "{{ frontmatter.summary }}" +authors: {% if frontmatter.author %} ["{{ frontmatter.author }}"] {% endif %} +original_link: "{{ frontmatter.original_link }}" +feed_name: "{{ frontmatter.feed_name}}" +categories: ["timeline", "{{ frontmatter.feed_name}}"] +timelines: {{ frontmatter.timelines }} +hidden: true +--- + +{{ content }} \ No newline at end of file diff --git a/lumbunglib/timeline.py b/lumbunglib/timeline.py new file mode 100644 index 0000000..d1382de --- /dev/null +++ b/lumbunglib/timeline.py @@ -0,0 +1,381 @@ +import os +import shutil +import time +from hashlib import md5 +from ast import literal_eval as make_tuple +from pathlib import Path +from urllib.parse import urlparse +from re import sub + +import arrow +import feedparser +import jinja2 +import requests +from bs4 import BeautifulSoup +from slugify import slugify +from re import compile as re_compile +yamlre = re_compile('"') + + +def write_etag(feed_name, feed_data): + """ + save timestamp of when feed was last modified + """ + etag = "" + modified = "" + + if "etag" in feed_data: + etag = feed_data.etag + if "modified" in feed_data: + modified = feed_data.modified + + if etag or modified: + with open(os.path.join("etags", feed_name + ".txt"), "w") as f: + f.write(str((etag, modified))) + + +def get_etag(feed_name): + """ + return timestamp of when feed was last modified + """ + fn = os.path.join("etags", feed_name + ".txt") + etag = "" + modified = "" + + if os.path.exists(fn): + etag, modified = make_tuple(open(fn, "r").read()) + + return etag, modified + + +def create_frontmatter(entry): + """ + parse RSS metadata and return as frontmatter + """ + if 'published' in entry: + published = entry.published_parsed + if 'updated' in entry: + published = entry.updated_parsed + + published = arrow.get(published) + + if 'author' in entry: + author = entry.author + else: + author = '' + + if 'authors' in entry: + authors = [] + for a in entry.authors: + authors.append(a['name']) + + if 'summary' in entry: + summary = entry.summary + else: + summary = '' + + if 'publisher' in entry: + publisher = entry.publisher + else: + publisher = '' + + tags = [] + if 'tags' in entry: + #TODO finish categories + for t in entry.tags: + tags.append(t['term']) + + frontmatter = { + 'title':entry.title, + 'date': published.format(), + 'summary': '', + 'author': author, + 'original_link': entry.link, + 'feed_name': entry['feed_name'], + 'timelines': str(tags), + } + + return frontmatter + +def sanitize_yaml (frontmatter): + """ + Escapes any occurences of double quotes + in any of the frontmatter fields + See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types + """ + for k, v in frontmatter.items(): + if type(v) == type([]): + #some fields are lists + l = [] + for i in v: + i = yamlre.sub('\\"', i) + l.append(i) + frontmatter[k] = l + + else: + v = yamlre.sub('\\"', v) + frontmatter[k] = v + + return frontmatter + + +def create_post(post_dir, entry): + """ + write hugo post based on RSS entry + """ + frontmatter = create_frontmatter(entry) + + if not os.path.exists(post_dir): + os.makedirs(post_dir) + + if "content" in entry: + post_content = entry.content[0].value + else: + post_content = entry.summary + + parsed_content = parse_posts(post_dir, post_content) + + template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") + env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) + template = env.get_template("timeline.md") + with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html + post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content) + f.write(post) + print("created post for", entry.title, "({})".format(entry.link)) + + +def grab_media(post_directory, url, prefered_name=None): + """ + download media linked in post to have local copy + if download succeeds return new local path otherwise return url + """ + media_item = urlparse(url).path.split('/')[-1] + + if prefered_name: + media_item = prefered_name + + try: + if not os.path.exists(os.path.join(post_directory, media_item)): + #TODO: stream is true is a conditional so we could check the headers for things, mimetype etc + response = requests.get(url, stream=True) + if response.ok: + with open(os.path.join(post_directory, media_item), 'wb') as media_file: + shutil.copyfileobj(response.raw, media_file) + print('Downloaded media item', media_item) + return media_item + return media_item + elif os.path.exists(os.path.join(post_directory, media_item)): + return media_item + + except Exception as e: + print('Failed to download image', url) + print(e) + return url + + +def parse_posts(post_dir, post_content): + """ + parse the post content to for media items + replace foreign image with local copy + filter out iframe sources not in allowlist + """ + soup = BeautifulSoup(post_content, "html.parser") + allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"] + + for img in soup(["img", "object"]): + if img.get("src") != None: + local_image = grab_media(post_dir, img["src"]) + if img["src"] != local_image: + img["src"] = local_image + + for iframe in soup(["iframe"]): + if not any(source in iframe["src"] for source in allowed_iframe_sources): + print("filtered iframe: {}...".format(iframe["src"][:25])) + iframe.decompose() + return soup.decode() + + +def grab_feed(feed_url): + """ + check whether feed has been updated + download & return it if it has + """ + feed_name = urlparse(feed_url).netloc + + etag, modified = get_etag(feed_name) + + try: + if modified: + data = feedparser.parse(feed_url, modified=modified) + elif etag: + data = feedparser.parse(feed_url, etag=etag) + else: + data = feedparser.parse(feed_url) + except Exception as e: + print("Error grabbing feed") + print(feed_name) + print(e) + return False + + print(data.status, feed_url) + if data.status == 200: + # 304 means the feed has not been modified since we last checked + write_etag(feed_name, data) + return data + return False + +def create_opds_post(post_dir, entry): + """ + create a HUGO post based on OPDS entry + or update it if the timestamp is newer + Downloads the cover & file + """ + + frontmatter = create_frontmatter(entry) + + template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") + env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) + template = env.get_template("feed.md") + + if not os.path.exists(post_dir): + os.makedirs(post_dir) + + if os.path.exists(os.path.join(post_dir, '.timestamp')): + old_timestamp = open(os.path.join(post_dir, '.timestamp')).read() + old_timestamp = arrow.get(float(old_timestamp)) + current_timestamp = arrow.get(entry['updated_parsed']) + + if current_timestamp > old_timestamp: + pass + else: + print('Book "{}..." already up to date'.format(entry['title'][:32])) + return + + for item in entry.links: + ft = item['type'].split('/')[-1] + fn = item['rel'].split('/')[-1] + + if fn == "acquisition": + fn = "publication" #calling the publications acquisition is weird + + prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft) + + grab_media(post_dir, item['href'], prefered_name) + + if "summary" in entry: + summary = entry.summary + else: + summary = "" + + with open(os.path.join(post_dir,'index.md'),'w') as f: + post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary) + f.write(post) + print('created post for Book', entry.title) + + with open(os.path.join(post_dir, '.timestamp'), 'w') as f: + timestamp = arrow.get(entry['updated_parsed']) + f.write(timestamp.format('X')) + + +def main(): + feed_urls = open("feeds_list_timeline.txt", "r").read().splitlines() + + start = time.time() + + if not os.path.exists("etags"): + os.mkdir("etags") + + output_dir = os.environ.get("OUTPUT_DIR") + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + feed_dict = dict() + for url in feed_urls: + feed_name = urlparse(url).netloc + feed_dict[url] = feed_name + + feed_names = feed_dict.values() + content_dirs = os.listdir(output_dir) + for i in content_dirs: + if i not in feed_names: + shutil.rmtree(os.path.join(output_dir, i)) + print("%s not in feeds_list.txt, removing local data" %(i)) + + # add iframe to the allowlist of feedparser's sanitizer, + # this is now handled in parse_post() + feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"} + + for feed_url in feed_urls: + + feed_name = feed_dict[feed_url] + + feed_dir = os.path.join(output_dir, feed_name) + + if not os.path.exists(feed_dir): + os.makedirs(feed_dir) + + existing_posts = os.listdir(feed_dir) + + data = grab_feed(feed_url) + + if data: + + opds_feed = False + for i in data.feed['links']: + if i['rel'] == 'self': + if 'opds' in i['type']: + opds_feed = True + print("OPDS type feed!") + + + for entry in data.entries: + # if 'tags' in entry: + # for tag in entry.tags: + # for x in ['lumbung.space', 'D15', 'lumbung']: + # if x in tag['term']: + # print(entry.title) + entry["feed_name"] = feed_name + + post_name = slugify(entry.title) + + # pixelfed returns the whole post text as the post name. max + # filename length is 255 on many systems. here we're shortening + # the name and adding a hash to it to avoid a conflict in a + # situation where 2 posts start with exactly the same text. + if len(post_name) > 150: + post_hash = md5(bytes(post_name, "utf-8")) + post_name = post_name[:150] + "-" + post_hash.hexdigest() + + if opds_feed: + entry['opds'] = True + #format: Beyond-Debiasing-Report_Online-75535a4886e3 + post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1] + + post_dir = os.path.join(output_dir, feed_name, post_name) + + if post_name not in existing_posts: + # if there is a blog entry we dont already have, make it + if opds_feed: + create_opds_post(post_dir, entry) + else: + create_post(post_dir, entry) + + elif post_name in existing_posts: + # if we already have it, update it + if opds_feed: + create_opds_post(post_dir, entry) + else: + create_post(post_dir, entry) + existing_posts.remove( + post_name + ) # create list of posts which have not been returned by the feed + + for post in existing_posts: + # remove blog posts no longer returned by the RSS feed + print("deleted", post) + shutil.rmtree(os.path.join(feed_dir, slugify(post))) + + end = time.time() + + print(end - start) diff --git a/pyproject.toml b/pyproject.toml index 20ba70a..75e14df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,4 +28,5 @@ build-backend = "poetry.core.masonry.api" lumbunglib-cal = "lumbunglib.cloudcal:main" lumbunglib-vid = "lumbunglib.video:main" lumbunglib-feed = "lumbunglib.feed:main" +lumbunglib-timeline = "lumbunglib.timeline:main" lumbunglib-hash = "lumbunglib.hashtag:main" diff --git a/setup.py b/setup.py index e1e1af7..9aae1b7 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ install_requires = \ entry_points = \ {'console_scripts': ['lumbunglib-cal = lumbunglib.cloudcal:main', 'lumbunglib-feed = lumbunglib.feed:main', + 'lumbunglib-timeline = lumbunglib.timeline:main', 'lumbunglib-hash = lumbunglib.hashtag:main', 'lumbunglib-vid = lumbunglib.video:main']}