forked from ruangrupa/konfluks
209 lines
6.8 KiB
Python
209 lines
6.8 KiB
Python
import os
|
||
import re
|
||
import shutil
|
||
from pathlib import Path
|
||
from urllib.parse import urlparse
|
||
from slugify import slugify
|
||
|
||
import arrow
|
||
import jinja2
|
||
import requests
|
||
from ics import Calendar
|
||
from natural import date
|
||
from slugify import slugify
|
||
|
||
# a publicly accessible ICS calendar
|
||
calendar_url = os.environ.get("CALENDAR_URL")
|
||
|
||
# your Hugo content directory
|
||
output_dir = os.environ.get("OUTPUT_DIR")
|
||
|
||
cal = Calendar(requests.get(calendar_url).text)
|
||
|
||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates")
|
||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
|
||
|
||
if not os.path.exists(output_dir):
|
||
os.mkdir(output_dir)
|
||
|
||
template = env.get_template("calendar.md")
|
||
|
||
existing_posts = os.listdir(output_dir)
|
||
|
||
|
||
def findURLs(string):
|
||
"""
|
||
return all URLs in a given string
|
||
"""
|
||
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
|
||
url = re.findall(regex, string)
|
||
return [x[0] for x in url]
|
||
|
||
|
||
def find_imageURLS(string):
|
||
"""
|
||
return all image URLS in a given string
|
||
"""
|
||
regex = r"(?:http\:|https\:)?\/\/.*?\.(?:png|jpg|jpeg|gif|svg)"
|
||
|
||
img_urls = re.findall(regex, string, flags=re.IGNORECASE)
|
||
return img_urls
|
||
|
||
|
||
def create_metadata(event):
|
||
"""
|
||
construct a formatted dict of event metadata for use as frontmatter for HUGO post
|
||
"""
|
||
|
||
if event.location:
|
||
location_urls = findURLs(event.location)
|
||
|
||
if location_urls:
|
||
location_url = location_urls[0]
|
||
event.location = "[{}]({})".format(
|
||
urlparse(location_url).netloc, location_url
|
||
)
|
||
|
||
event_metadata = {
|
||
"name": event.name,
|
||
"created": event.created.format(),
|
||
"description": event.description,
|
||
"localized_begin": " ".join(
|
||
localize_time(event.begin)
|
||
), # non-breaking space characters to defeat markdown
|
||
"begin": event.begin.format(),
|
||
"end": event.end.format(),
|
||
"duration": date.compress(event.duration),
|
||
"location": event.location,
|
||
"uid": event.uid,
|
||
"featured_image": "",
|
||
"images": find_imageURLS(event.description), # currently not used in template
|
||
}
|
||
|
||
return event_metadata
|
||
|
||
|
||
def localize_time(date):
|
||
"""
|
||
Turn a given date into various timezones
|
||
Takes arrow objects
|
||
"""
|
||
|
||
# 3 PM Kassel, Germany, 4 PM Ramallah/Jerusalem, Palestina (QoF),
|
||
# 8 AM Bogota, Colombia (MaMa), 8 PM Jakarta, Indonesia (Gudskul),
|
||
# 1 PM (+1day) Wellington, New Zealand (Fafswag), 9 AM Havana, Cuba (Instar).
|
||
|
||
tzs = [
|
||
("Kassel", "Europe/Berlin"),
|
||
("Bamako", "Europe/London"),
|
||
("Palestine", "Asia/Jerusalem"),
|
||
("Bogota", "America/Bogota"),
|
||
("Jakarta", "Asia/Jakarta"),
|
||
("Makassar", "Asia/Makassar"),
|
||
("Wellington", "Pacific/Auckland"),
|
||
]
|
||
|
||
localized_begins = []
|
||
for location, tz in tzs:
|
||
localized_begins.append( # javascript formatting because of string creation from hell
|
||
"__{}__ {}".format(
|
||
str(location), str(date.to(tz).format("YYYY-MM-DD __HH:mm__"))
|
||
)
|
||
)
|
||
return localized_begins
|
||
|
||
def create_event_post(post_dir, event):
|
||
"""
|
||
Create HUGO post based on calendar event metadata
|
||
Searches for image URLS in description and downloads them
|
||
Function is also called when post is in need of updating
|
||
In that case it will also delete images no longer in metadata
|
||
TODO: split this up into more functions for legibility
|
||
"""
|
||
|
||
if not os.path.exists(post_dir):
|
||
os.mkdir(post_dir)
|
||
|
||
event_metadata = create_metadata(event)
|
||
|
||
# list already existing images
|
||
# so we can later delete them if we dont find them in the event metadata anymore
|
||
existing_images = os.listdir(post_dir)
|
||
try:
|
||
existing_images.remove("index.md")
|
||
existing_images.remove(".timestamp")
|
||
except:
|
||
pass
|
||
|
||
for img in event_metadata["images"]:
|
||
|
||
# parse img url to safe local image name
|
||
img_name = os.path.basename(img)
|
||
fn, ext = os.path.splitext(img_name)
|
||
img_name = slugify(fn) + '.' + ext
|
||
|
||
local_image = os.path.join(post_dir, img_name)
|
||
|
||
if not os.path.exists(local_image):
|
||
# download preview image
|
||
response = requests.get(img, stream=True)
|
||
if response.status_code == 200:
|
||
with open(local_image, "wb") as img_file:
|
||
shutil.copyfileobj(response.raw, img_file)
|
||
print('Downloaded image for event "{}"'.format(event.name))
|
||
event_metadata["description"] = event_metadata["description"].replace(
|
||
img, "![]({})".format(img_name)
|
||
)
|
||
if event_metadata["featured_image"] == "":
|
||
event_metadata["featured_image"] = img_name
|
||
if img_name in existing_images:
|
||
existing_images.remove(img_name)
|
||
|
||
for left_over_image in existing_images:
|
||
# remove images we found, but which are no longer in remote event
|
||
os.remove(os.path.join(post_dir, left_over_image))
|
||
print("deleted image", left_over_image)
|
||
|
||
with open(os.path.join(post_dir, "index.md"), "w") as f:
|
||
post = template.render(event=event_metadata)
|
||
f.write(post)
|
||
print("created post for", event.name, "({})".format(event.uid))
|
||
|
||
with open(os.path.join(post_dir, ".timestamp"), "w") as f:
|
||
f.write(event_metadata["created"])
|
||
|
||
|
||
def update_event_post(post_dir, event):
|
||
"""
|
||
Update a post based on the VCARD event 'created' field which changes when updated
|
||
"""
|
||
if os.path.exists(post_dir):
|
||
old_timestamp = open(os.path.join(post_dir, ".timestamp")).read()
|
||
if event.created > arrow.get(old_timestamp):
|
||
print("Updating", event.name, "({})".format(event.uid))
|
||
create_event_post(post_dir, event)
|
||
else:
|
||
print("Event current: ", event.name, "({})".format(event.uid))
|
||
|
||
|
||
def main():
|
||
for event in list(cal.events):
|
||
post_name = slugify(event.name) + "-" + event.uid
|
||
post_dir = os.path.join(output_dir, post_name)
|
||
|
||
if post_name not in existing_posts:
|
||
# if there is an event we dont already have, make it
|
||
create_event_post(post_dir, event)
|
||
|
||
elif post_name in existing_posts:
|
||
# if we already have it, update
|
||
update_event_post(post_dir, event)
|
||
existing_posts.remove(
|
||
post_name
|
||
) # create list of posts which have not been returned by the calendar
|
||
|
||
for post in existing_posts:
|
||
# remove events not returned by the calendar (deletion)
|
||
print("deleted", post)
|
||
shutil.rmtree(os.path.join(output_dir, post))
|