konfluks/lumbunglib/hashtag.py
rra 0aaa711538 Update 'lumbunglib/hashtag.py'
added extra tags on request
2022-04-03 16:40:22 +02:00

158 lines
5.1 KiB
Python

import os
import shutil
from pathlib import Path
from re import sub
import jinja2
import requests
from mastodon import Mastodon
instance = "https://social.lumbung.space"
email = ""
password = ""
hashtags = [
"documentafifteen",
"harvestedbyputra",
"jalansesama",
"lumbungdotspace",
"majelisakakbar",
"majelisakbar",
"warungkopi",
"lumbungkios",
"kassel_ecosystem",
"ruruhaus",
"offbeatentrack_kassel"
]
def login_mastodon_bot():
mastodon = Mastodon(
access_token=os.environ.get("MASTODON_AUTH_TOKEN"),
api_base_url = instance
)
return mastodon
def create_frontmatter(post_metadata):
"""
Parse post metadata and return it as HUGO frontmatter
"""
frontmatter = ""
return frontmatter
def download_media(post_directory, media_attachments):
"""
Download media attached to posts. N.b. currently only images
See: https://mastodonpy.readthedocs.io/en/stable/#media-dicts
"""
for item in media_attachments:
if item["type"] == "image":
image = localize_media_url(item["url"])
# TODO check whether this needs to handle delete & redraft with different images
if not os.path.exists(os.path.join(post_directory, image)):
# download image
response = requests.get(item["url"], stream=True)
with open(os.path.join(post_directory, image), "wb") as img_file:
shutil.copyfileobj(response.raw, img_file)
print("Downloaded cover image", image)
def create_post(post_directory, post_metadata):
"""
Create Hugo posts based on Toots/posts retuned in timeline.
See: https://mastodonpy.readthedocs.io/en/stable/#toot-dicts
"""
if not os.path.exists(post_directory):
os.mkdir(post_directory)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
name = post_metadata['account']['display_name']
name = sub('"', '\\"', name)
post_metadata['account']['display_name'] = name
env.filters["localize_media_url"] = localize_media_url
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
template = env.get_template("hashtag.md")
with open(os.path.join(post_directory, "index.html"), "w") as f:
post = template.render(post_metadata=post_metadata)
f.write(post)
download_media(post_directory, post_metadata["media_attachments"])
def localize_media_url(url):
"""
Returns the filename, used also as custom jinja filter
"""
return url.split("/")[-1]
def filter_mastodon_urls(content):
"""
Filters out Mastodon generated URLS for tags
e.g. <a href="https://social.lumbung.space/tags/jalankita" class="mention hashtag" rel="tag">
Used also as custom jinja filter
"""
# TODO
return content
def main():
mastodon = login_mastodon_bot()
output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
all_existing_posts = []
for i in os.listdir(output_dir):
all_existing_posts += os.listdir(os.path.join(output_dir, i))
for hashtag in hashtags:
hashtag_dir = os.path.join(output_dir, hashtag)
if not os.path.exists(hashtag_dir):
os.mkdir(hashtag_dir)
existing_posts = os.listdir(hashtag_dir) # list all existing posts
timeline = mastodon.timeline_hashtag(
hashtag, local=True, only_media=True
) # returns max 20 queries and only with media
timeline = mastodon.fetch_remaining(
timeline
) # returns all the rest n.b. can take a while because of rate limit
for post_metadata in timeline:
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
# if there is a post in the feed we dont already have locally, make it
if str(post_metadata["id"]) not in all_existing_posts:
if not post_metadata[
"local_only"
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
create_post(post_dir, post_metadata)
all_existing_posts.append(str(post_metadata["id"]))
else:
print("not pulling post %s (post is local only)" % (post_metadata["id"]))
# if we already have the post do nothing, possibly update
elif str(post_metadata["id"]) in existing_posts:
# update_post(post_dir, post_metadata)
existing_posts.remove(
str(post_metadata["id"])
) # create list of posts which have not been returned in the feed
elif str(post_metadata["id"]) in all_existing_posts:
print("skipping post %s as it was already pulled with a different hashtag." % (str(post_metadata["id"])))
for post in existing_posts:
print(
"deleted", post
) # rm posts that exist but are no longer returned in feed
shutil.rmtree(os.path.join(hashtag_dir, post))