From 1f1bb7874e57b99cb3f04f011bb5c4d7c38f7bc0 Mon Sep 17 00:00:00 2001 From: rra Date: Wed, 16 Feb 2022 13:02:28 +0100 Subject: [PATCH] feed: support the parsing of OPDS type feeds --- lumbunglib/feed.py | 168 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 134 insertions(+), 34 deletions(-) diff --git a/lumbunglib/feed.py b/lumbunglib/feed.py index abaf586..983a90a 100644 --- a/lumbunglib/feed.py +++ b/lumbunglib/feed.py @@ -48,40 +48,63 @@ def get_etag(feed_name): def create_frontmatter(entry): """ - parse RSS metadata and return as frontmatter + parse RSS metadata and return as frontmatter """ - if "published" in entry: + if 'published' in entry: published = entry.published_parsed - if "updated" in entry: + if 'updated' in entry: published = entry.updated_parsed published = arrow.get(published) - if "author" in entry: + if 'author' in entry: author = entry.author else: - author = "" + author = '' + + if 'authors' in entry: + authors = [] + for a in entry.authors: + authors.append(a['name']) - if "title" in entry: - title = entry.title + if 'summary' in entry: + summary = entry.summary else: - title = "" + summary = '' + + if 'publisher' in entry: + publisher = entry.publisher + else: + publisher = '' tags = [] - if "tags" in entry: - # TODO finish categories + if 'tags' in entry: + #TODO finish categories for t in entry.tags: - tags.append(t["term"]) + tags.append(t['term']) - frontmatter = { - "title": title, - "date": published.format(), - "summary": "", - "author": author, - "original_link": entry.link, - "feed_name": entry["feed_name"], - "tags": str(tags), - } + if "opds" in entry: + frontmatter = { + 'title':entry.title, + 'date': published.format(), + 'summary': summary, + 'author': ",".join(authors), + 'publisher': publisher, + 'original_link': entry.links[0]['href'].replace('opds/cover/','books/'), + 'feed_name': entry['feed_name'], + 'tags': str(tags), + 'category': "books" + } + else: + frontmatter = { + 'title':entry.title, + 'date': published.format(), + 'summary': '', + 'author': author, + 'original_link': entry.link, + 'feed_name': entry['feed_name'], + 'tags': str(tags) + } return frontmatter @@ -132,28 +155,31 @@ def create_post(post_dir, entry): print("created post for", entry.title, "({})".format(entry.link)) -def grab_media(post_directory, url): +def grab_media(post_directory, url, prefered_name=None): """ download media linked in post to have local copy if download succeeds return new local path otherwise return url """ - image = urlparse(url).path.split("/")[-1] + media_item = urlparse(url).path.split('/')[-1] + + if prefered_name: + media_item = prefered_name try: - if not os.path.exists(os.path.join(post_directory, image)): - # TODO: stream is true is a conditional so we could check the headers for things, mimetype etc + if not os.path.exists(os.path.join(post_directory, media_item)): + #TODO: stream is true is a conditional so we could check the headers for things, mimetype etc response = requests.get(url, stream=True) if response.ok: - with open(os.path.join(post_directory, image), "wb") as img_file: - shutil.copyfileobj(response.raw, img_file) - print("Downloaded cover image", image) - return image - return image - elif os.path.exists(os.path.join(post_directory, image)): - return image + with open(os.path.join(post_directory, media_item), 'wb') as media_file: + shutil.copyfileobj(response.raw, media_file) + print('Downloaded media item', media_item) + return media_item + return media_item + elif os.path.exists(os.path.join(post_directory, media_item)): + return media_item except Exception as e: - print("Failed to download image", url) + print('Failed to download image', url) print(e) return url @@ -208,6 +234,58 @@ def grab_feed(feed_url): return data return False +def create_opds_post(post_dir, entry): + """ + create a HUGO post based on OPDS entry + or update it if the timestamp is newer + Downloads the cover & file + """ + + frontmatter = create_frontmatter(entry) + + template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") + env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) + template = env.get_template("feed.md") + + if not os.path.exists(post_dir): + os.makedirs(post_dir) + + if os.path.exists(os.path.join(post_dir, '.timestamp')): + old_timestamp = open(os.path.join(post_dir, '.timestamp')).read() + old_timestamp = arrow.get(float(old_timestamp)) + current_timestamp = arrow.get(entry['updated_parsed']) + + if current_timestamp > old_timestamp: + pass + else: + print('Book "{}..." already up to date'.format(entry['title'][:32])) + return + + for item in entry.links: + ft = item['type'].split('/')[-1] + fn = item['rel'].split('/')[-1] + + if fn == "acquisition": + fn = "publication" #calling the publications acquisition is weird + + prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft) + + grab_media(post_dir, item['href'], prefered_name) + + if "summary" in entry: + summary = entry.summary + else: + summary = "" + + with open(os.path.join(post_dir,'index.md'),'w') as f: + post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary) + f.write(post) + print('created post for Book', entry.title) + + with open(os.path.join(post_dir, '.timestamp'), 'w') as f: + timestamp = arrow.get(entry['updated_parsed']) + f.write(timestamp.format('X')) + def main(): feed_urls = open("feeds_list.txt", "r").read().splitlines() @@ -252,6 +330,15 @@ def main(): data = grab_feed(feed_url) if data: + + opds_feed = False + for i in data.feed['links']: + if i['rel'] == 'self': + if 'opds' in i['type']: + opds_feed = True + print("OPDS type feed!") + + for entry in data.entries: # if 'tags' in entry: # for tag in entry.tags: @@ -261,6 +348,7 @@ def main(): entry["feed_name"] = feed_name post_name = slugify(entry.title) + # pixelfed returns the whole post text as the post name. max # filename length is 255 on many systems. here we're shortening # the name and adding a hash to it to avoid a conflict in a @@ -268,15 +356,27 @@ def main(): if len(post_name) > 150: post_hash = md5(bytes(post_name, "utf-8")) post_name = post_name[:150] + "-" + post_hash.hexdigest() + + if opds_feed: + entry['opds'] = True + #format: Beyond-Debiasing-Report_Online-75535a4886e3 + post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1] + post_dir = os.path.join(output_dir, feed_name, post_name) if post_name not in existing_posts: # if there is a blog entry we dont already have, make it - create_post(post_dir, entry) + if opds_feed: + create_opds_post(post_dir, entry) + else: + create_post(post_dir, entry) elif post_name in existing_posts: # if we already have it, update it - create_post(post_dir, entry) + if opds_feed: + create_opds_post(post_dir, entry) + else: + create_post(post_dir, entry) existing_posts.remove( post_name ) # create list of posts which have not been returned by the feed