commit - e8a7df2c837928a6969715705f51b1b20baf71e7
commit + f12c4927d4f9203400143e64d0074ea64f8740df
blob - /dev/null
blob + 0052b54b059288a6e4d1d27fe7fcee49d7430ef5 (mode 755)
--- /dev/null
+++ capsule/vostok/atom2gemfeed.py
+#!/usr/bin/env python3
+"""Generate Gemini feed by Atom.
+
+=> gemini://geminiprotocol.net/docs/companion/subscription.gmi Documentation
+=> gemini://any-key.press/vgi/atom2gemfeed/?gemini%3A%2F%2Fany-key.press%2Fatom.xml Instance
+"""
+import xml.etree.ElementTree as ET
+from datetime import datetime
+from email.message import Message
+from socket import create_connection
+from ssl import SSLContext, CERT_NONE, PROTOCOL_TLS_CLIENT
+from urllib.parse import urlsplit, urljoin, uses_relative, uses_netloc, unquote
+
+
+# for urljoin:
+uses_relative.append("gemini")
+uses_netloc.append("gemini")
+
+
+def _main(raw_url):
+ splitted_url = urlsplit(raw_url)
+ if splitted_url.scheme != "gemini":
+ print("59 Only Gemini links are allowed\r")
+ return
+
+ for _ in range(6):
+ with create_connection((splitted_url.hostname, splitted_url.port or 1965)) as raw_conn:
+ context = SSLContext(PROTOCOL_TLS_CLIENT)
+ context.check_hostname = False
+ context.verify_mode = CERT_NONE
+ with context.wrap_socket(raw_conn, server_hostname=splitted_url.hostname) as conn:
+ conn.sendall((raw_url + '\r\n').encode("UTF-8"))
+ fp = conn.makefile("rb")
+
+ splitted = fp.readline().decode("UTF-8").strip().split(maxsplit=1)
+ status = splitted[0]
+ if status.startswith("3") and len(splitted) == 2:
+ # redirect
+ raw_url = urljoin(raw_url, splitted[1])
+ continue
+
+ if not status.startswith("2"):
+ print(f"43 Remote server error: {' '.join(splitted)}\r")
+ return
+
+ mime = splitted[1].lower() if len(splitted) == 2 else "text/gemini"
+ mime_matched = (
+ mime.startswith("text/")
+ or mime.startswith("application/xml")
+ or mime.startswith("application/atom")
+ )
+ if not mime_matched:
+ print(f"43 Only links to `text/*` are allowed: {mime}\r")
+ return
+
+ # gemini://geminiprotocol.net/docs/companion/subscription.gmi
+
+ m = Message()
+ m['content-type'] = mime
+ root = {}
+ try:
+ for child in ET.fromstring(fp.read().decode(m.get_param('charset') or "UTF-8")):
+ _, _, tag = child.tag.rpartition('}')
+ root.setdefault(tag, []).append(child)
+ except ET.ParseError:
+ print("43 Parse Atom error\r")
+ return
+
+ print("20 text/gemini\r")
+ title = root.get("title")
+ print(f"# {title[0].text if title else raw_url}\r")
+ print("\r")
+ for entry in root.get("entry") or []:
+ entry_dict = {}
+ for child in entry:
+ _, _, tag = child.tag.rpartition('}')
+ entry_dict[tag] = child.text
+
+ gemini_link = entry_dict.get("id")
+ if not gemini_link:
+ continue
+
+ entry_date = ""
+ updated = entry_dict.get("updated")
+ if updated:
+ try:
+ entry_date = (
+ datetime.fromisoformat(updated)
+ .date().strftime("%Y-%m-%d - ")
+ )
+ except ValueError:
+ pass
+
+ title = entry_dict.get("title") or ""
+ print(f"=> {gemini_link} {entry_date}{entry_dict.get('title') or ''}\r")
+ return
+
+
+if __name__ == '__main__':
+ _main(unquote(urlsplit(input().strip()).query))