cli commands

This commit is contained in:
cellarspoon 2021-12-15 12:23:37 +01:00
parent 30dbc6212f
commit b385833cbe
No known key found for this signature in database
GPG Key ID: 03789458B3D0C410
6 changed files with 154 additions and 190 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
test test
__pycache__

View File

@ -1,8 +1,3 @@
#!/bin/python3
# lumbung.space calendar feed generator
# © 2021 roel roscam abbing gplv3 etc
import os import os
import re import re
import shutil import shutil
@ -187,22 +182,23 @@ def update_event_post(post_dir, event):
print("Event current: ", event.name, "({})".format(event.uid)) print("Event current: ", event.name, "({})".format(event.uid))
for event in list(cal.events): def main():
for event in list(cal.events):
post_dir = os.path.join(output_dir, event.uid) post_dir = os.path.join(output_dir, event.uid)
if event.uid not in existing_posts: if event.uid not in existing_posts:
# if there is an event we dont already have, make it # if there is an event we dont already have, make it
create_event_post(post_dir, event) create_event_post(post_dir, event)
elif event.uid in existing_posts: elif event.uid in existing_posts:
# if we already have it, update # if we already have it, update
update_event_post(post_dir, event) update_event_post(post_dir, event)
existing_posts.remove( existing_posts.remove(
event.uid event.uid
) # create list of posts which have not been returned by the calendar ) # create list of posts which have not been returned by the calendar
for post in existing_posts: for post in existing_posts:
# remove events not returned by the calendar (deletion) # remove events not returned by the calendar (deletion)
print("deleted", post) print("deleted", post)
shutil.rmtree(os.path.join(output_dir, post)) shutil.rmtree(os.path.join(output_dir, post))

View File

@ -1,8 +1,3 @@
#!/bin/python3
# lumbung.space rss feed aggregator
# © 2021 roel roscam abbing gplv3 etc
import os import os
import shutil import shutil
import time import time
@ -100,6 +95,9 @@ def create_post(post_dir, entry):
parsed_content = parse_posts(post_dir, post_content) parsed_content = parse_posts(post_dir, post_content)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template("feed.md")
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html
post = template.render(frontmatter=frontmatter, content=parsed_content) post = template.render(frontmatter=frontmatter, content=parsed_content)
f.write(post) f.write(post)
@ -140,7 +138,6 @@ def parse_posts(post_dir, post_content):
""" """
soup = BeautifulSoup(post_content, "html.parser") soup = BeautifulSoup(post_content, "html.parser")
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"] allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"]
media = []
for img in soup(["img", "object"]): for img in soup(["img", "object"]):
local_image = grab_media(post_dir, img["src"]) local_image = grab_media(post_dir, img["src"])
@ -184,70 +181,64 @@ def grab_feed(feed_url):
return False return False
feed_urls = open("feeds_list.txt", "r").read().splitlines() def main():
feed_urls = open("feeds_list.txt", "r").read().splitlines()
start = time.time() start = time.time()
if not os.path.exists("etags"): if not os.path.exists("etags"):
os.mkdir("etags") os.mkdir("etags")
output_dir = os.environ.get("OUTPUT_DIR")
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") if not os.path.exists(output_dir):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) os.makedirs(output_dir)
output_dir = os.environ.get("OUTPUT_DIR") # add iframe to the allowlist of feedparser's sanitizer,
# this is now handled in parse_post()
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
if not os.path.exists(output_dir): for feed_url in feed_urls:
os.makedirs(output_dir)
template = env.get_template("feed.md") feed_name = urlparse(feed_url).netloc
# add iframe to the allowlist of feedparser's sanitizer, feed_dir = os.path.join(output_dir, feed_name)
# this is now handled in parse_post()
feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"}
for feed_url in feed_urls: if not os.path.exists(feed_dir):
os.makedirs(feed_dir)
feed_name = urlparse(feed_url).netloc existing_posts = os.listdir(feed_dir)
feed_dir = os.path.join(output_dir, feed_name) data = grab_feed(feed_url)
if not os.path.exists(feed_dir): if data:
os.makedirs(feed_dir) for entry in data.entries:
# if 'tags' in entry:
# for tag in entry.tags:
# for x in ['lumbung.space', 'D15', 'lumbung']:
# if x in tag['term']:
# print(entry.title)
entry["feed_name"] = feed_name
existing_posts = os.listdir(feed_dir) post_name = slugify(entry.title)
post_dir = os.path.join(output_dir, feed_name, post_name)
data = grab_feed(feed_url) if post_name not in existing_posts:
# if there is a blog entry we dont already have, make it
create_post(post_dir, entry)
if data: elif post_name in existing_posts:
for entry in data.entries: # if we already have it, update it
# if 'tags' in entry: create_post(post_dir, entry)
# for tag in entry.tags: existing_posts.remove(
# for x in ['lumbung.space', 'D15', 'lumbung']: post_name
# if x in tag['term']: ) # create list of posts which have not been returned by the feed
# print(entry.title)
entry["feed_name"] = feed_name
post_name = slugify(entry.title) for post in existing_posts:
post_dir = os.path.join(output_dir, feed_name, post_name) # remove blog posts no longer returned by the RSS feed
print("deleted", post)
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
if post_name not in existing_posts: end = time.time()
# if there is a blog entry we dont already have, make it
create_post(post_dir, entry)
elif post_name in existing_posts: print(end - start)
# if we already have it, update it
create_post(post_dir, entry)
existing_posts.remove(
post_name
) # create list of posts which have not been returned by the feed
for post in existing_posts:
# remove blog posts no longer returned by the RSS feed
print("deleted", post)
shutil.rmtree(os.path.join(feed_dir, slugify(post)))
end = time.time()
print(end - start)

View File

@ -1,9 +1,3 @@
# lumbung.space hashtag publishing bot
# © 2021 roel roscam abbing agplv3
# Makes Hugo posts out of hashtag feeds on Mastodon.
# Requires an account on the Mastodon instance configured.
# Currently does not do any thread recreation and only handles images
import os import os
import shutil import shutil
from pathlib import Path from pathlib import Path
@ -12,23 +6,11 @@ import jinja2
import requests import requests
from mastodon import Mastodon from mastodon import Mastodon
# Which instance to login to
instance = "https://social.lumbung.space" instance = "https://social.lumbung.space"
# n.b. if it is the first time you use this script
# You need to register the app:
# https://mastodonpy.readthedocs.io/en/stable/#module-mastodon
# Login credentials for bot account
email = "" email = ""
password = "" password = ""
# Which hashtags to publish
hashtags = ["jalansesama"] hashtags = ["jalansesama"]
# your Hugo content directory
output_dir = os.environ.get("OUTPUT_DIR", "path/to/hugo/content")
def login_mastodon_bot(): def login_mastodon_bot():
mastodon = Mastodon( mastodon = Mastodon(
@ -82,6 +64,14 @@ def create_post(post_directory, post_metadata):
if not os.path.exists(post_directory): if not os.path.exists(post_directory):
os.mkdir(post_directory) os.mkdir(post_directory)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
env.filters["localize_media_url"] = localize_media_url
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
template = env.get_template("hashtag.md")
with open(os.path.join(post_directory, "index.html"), "w") as f: with open(os.path.join(post_directory, "index.html"), "w") as f:
post = template.render(post_metadata=post_metadata) post = template.render(post_metadata=post_metadata)
f.write(post) f.write(post)
@ -106,57 +96,48 @@ def filter_mastodon_urls(content):
return content return content
mastodon = login_mastodon_bot() def main():
mastodon = login_mastodon_bot()
output_dir = output_dir output_dir = os.environ.get("OUTPUT_DIR")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
for hashtag in hashtags:
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") hashtag_dir = os.path.join(output_dir, hashtag)
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) if not os.path.exists(hashtag_dir):
os.mkdir(hashtag_dir)
env.filters["localize_media_url"] = localize_media_url existing_posts = os.listdir(hashtag_dir) # list all existing posts
env.filters["filter_mastodon_urls"] = filter_mastodon_urls
template = env.get_template("hashtag.md") timeline = mastodon.timeline_hashtag(
hashtag, local=True, only_media=True
) # returns max 20 queries and only with media
timeline = mastodon.fetch_remaining(
timeline
) # returns all the rest n.b. can take a while because of rate limit
for post_metadata in timeline:
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
if not os.path.exists(output_dir): # if there is a post in the feed we dont already have locally, make it
os.mkdir(output_dir) if str(post_metadata["id"]) not in existing_posts:
if not post_metadata[
"local_only"
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
create_post(post_dir, post_metadata)
for hashtag in hashtags: # if we already have the post do nothing, possibly update
elif str(post_metadata["id"]) in existing_posts:
# update_post(post_dir, post_metadata)
existing_posts.remove(
str(post_metadata["id"])
) # create list of posts which have not been returned in the feed
hashtag_dir = os.path.join(output_dir, hashtag) for post in existing_posts:
if not os.path.exists(hashtag_dir): print(
os.mkdir(hashtag_dir) "deleted", post
) # rm posts that exist but are no longer returned in feed
existing_posts = os.listdir(hashtag_dir) # list all existing posts shutil.rmtree(os.path.join(hashtag_dir, post))
timeline = mastodon.timeline_hashtag(
hashtag, local=True, only_media=True
) # returns max 20 queries and only with media
timeline = mastodon.fetch_remaining(
timeline
) # returns all the rest n.b. can take a while because of rate limit
for post_metadata in timeline:
post_dir = os.path.join(hashtag_dir, str(post_metadata["id"]))
# if there is a post in the feed we dont already have locally, make it
if str(post_metadata["id"]) not in existing_posts:
if not post_metadata[
"local_only"
]: # if you get an error here then you are using vanilla Mastodon, this is a Hometown or Glitch only feature
create_post(post_dir, post_metadata)
# if we already have the post do nothing, possibly update
elif str(post_metadata["id"]) in existing_posts:
# update_post(post_dir, post_metadata)
existing_posts.remove(
str(post_metadata["id"])
) # create list of posts which have not been returned in the feed
for post in existing_posts:
print("deleted", post) # rm posts that exist but are no longer returned in feed
shutil.rmtree(os.path.join(hashtag_dir, post))

View File

@ -1,8 +1,3 @@
#!/bin/python3
# lumbung.space video feed generator
# c 2021 roel roscam abbing gpvl3 etc
import ast import ast
import datetime import datetime
import json import json
@ -15,6 +10,9 @@ import jinja2
import peertube import peertube
import requests import requests
host = "https://tv.lumbung.space"
configuration = peertube.Configuration(host=host + "/api/v1")
client = peertube.ApiClient(configuration)
# jinja filters & config # jinja filters & config
def duration(n): def duration(n):
@ -35,29 +33,10 @@ def linebreaks(text):
return br.sub(r"<br />\n", text) return br.sub(r"<br />\n", text)
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") def create_post(post_directory, video_metadata, host):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
env.filters["duration"] = duration
env.filters["linebreaks"] = linebreaks
host = "https://tv.lumbung.space"
configuration = peertube.Configuration(host=host + "/api/v1")
client = peertube.ApiClient(configuration)
v = peertube.VideoApi(client)
response = v.videos_get(count=100, filter="local", tags_one_of="publish")
videos = response.to_dict()
videos = videos["data"]
def create_post(post_directory, video_metadata):
global client # lazy global client # lazy
if not os.path.exists(post_dir): if not os.path.exists(post_directory):
os.mkdir(post_directory) os.mkdir(post_directory)
preview_image = video_metadata["preview_path"].split("/")[-1] preview_image = video_metadata["preview_path"].split("/")[-1]
@ -77,6 +56,12 @@ def create_post(post_directory, video_metadata):
long_description = ast.literal_eval(api_response) long_description = ast.literal_eval(api_response)
video_metadata["description"] = long_description["description"] video_metadata["description"] = long_description["description"]
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
env.filters["duration"] = duration
env.filters["linebreaks"] = linebreaks
template = env.get_template("video.md")
with open(os.path.join(post_directory, "index.md"), "w") as f: with open(os.path.join(post_directory, "index.md"), "w") as f:
post = template.render(v=video_metadata, host=host, preview_image=preview_image) post = template.render(v=video_metadata, host=host, preview_image=preview_image)
f.write(post) f.write(post)
@ -86,7 +71,7 @@ def create_post(post_directory, video_metadata):
f.write(timestamp.format("X")) f.write(timestamp.format("X"))
def update_post(post_directory, video_metadata): def update_post(post_directory, video_metadata, host):
if os.path.exists(post_directory): if os.path.exists(post_directory):
if os.path.exists(os.path.join(post_directory, ".timestamp")): if os.path.exists(os.path.join(post_directory, ".timestamp")):
old_timestamp = open(os.path.join(post_directory, ".timestamp")).read() old_timestamp = open(os.path.join(post_directory, ".timestamp")).read()
@ -101,7 +86,7 @@ def update_post(post_directory, video_metadata):
video_metadata["name"], video_metadata["name"],
"({})".format(video_metadata["uuid"]), "({})".format(video_metadata["uuid"]),
) )
create_post(post_dir, video_metadata) create_post(post_directory, video_metadata, host)
else: else:
print( print(
"Video current: ", "Video current: ",
@ -110,37 +95,43 @@ def update_post(post_directory, video_metadata):
) )
else: else:
# compat for when there is no timestamp yet.. # compat for when there is no timestamp yet..
create_post(post_dir, video_metadata) create_post(post_directory, video_metadata, host)
output_dir = os.environ.get( def main():
"OUTPUT_DIR", "/home/r/Programming/lumbung.space/lumbung.space-web/content/video" v = peertube.VideoApi(client)
)
if not os.path.exists(output_dir): response = v.videos_get(count=100, filter="local", tags_one_of="publish")
os.mkdir(output_dir)
template = env.get_template("video.md") videos = response.to_dict()
videos = videos["data"]
existing_posts = os.listdir(output_dir) output_dir = os.environ.get("OUTPUT_DIR")
for video_metadata in videos: if not os.path.exists(output_dir):
post_dir = os.path.join(output_dir, video_metadata["uuid"]) os.mkdir(output_dir)
if ( existing_posts = os.listdir(output_dir)
video_metadata["uuid"] not in existing_posts
): # if there is a video we dont already have, make it
print("New: ", video_metadata["name"], "({})".format(video_metadata["uuid"]))
create_post(post_dir, video_metadata)
elif ( for video_metadata in videos:
video_metadata["uuid"] in existing_posts post_dir = os.path.join(output_dir, video_metadata["uuid"])
): # if we already have the video do nothing, possibly update
update_post(post_dir, video_metadata)
existing_posts.remove(
video_metadata["uuid"]
) # create list of posts which have not been returned by peertube
for post in existing_posts: if (
print("deleted", post) # rm posts not returned video_metadata["uuid"] not in existing_posts
shutil.rmtree(os.path.join(output_dir, post)) ): # if there is a video we dont already have, make it
print(
"New: ", video_metadata["name"], "({})".format(video_metadata["uuid"])
)
create_post(post_dir, video_metadata, host)
elif (
video_metadata["uuid"] in existing_posts
): # if we already have the video do nothing, possibly update
update_post(post_dir, video_metadata, host)
existing_posts.remove(
video_metadata["uuid"]
) # create list of posts which have not been returned by peertube
for post in existing_posts:
print("deleted", post) # rm posts not returned
shutil.rmtree(os.path.join(output_dir, post))

View File

@ -19,3 +19,7 @@ peertube = {git = "https://framagit.org/framasoft/peertube/clients/python.git"}
[build-system] [build-system]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
cal= "lumbunglib.cloudcal:main"
vid = "lumbunglib.video:main"