Commit Diff


commit - e8a7df2c837928a6969715705f51b1b20baf71e7
commit + f12c4927d4f9203400143e64d0074ea64f8740df
blob - /dev/null
blob + 0052b54b059288a6e4d1d27fe7fcee49d7430ef5 (mode 755)
--- /dev/null
+++ capsule/vostok/atom2gemfeed.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+"""Generate Gemini feed by Atom.
+
+=> gemini://geminiprotocol.net/docs/companion/subscription.gmi Documentation
+=> gemini://any-key.press/vgi/atom2gemfeed/?gemini%3A%2F%2Fany-key.press%2Fatom.xml Instance
+"""
+import xml.etree.ElementTree as ET
+from datetime import datetime
+from email.message import Message
+from socket import create_connection
+from ssl import SSLContext, CERT_NONE, PROTOCOL_TLS_CLIENT
+from urllib.parse import urlsplit, urljoin, uses_relative, uses_netloc, unquote
+
+
+# for urljoin:
+uses_relative.append("gemini")
+uses_netloc.append("gemini")
+
+
+def _main(raw_url):
+    splitted_url = urlsplit(raw_url)
+    if splitted_url.scheme != "gemini":
+        print("59 Only Gemini links are allowed\r")
+        return
+
+    for _ in range(6):
+        with create_connection((splitted_url.hostname, splitted_url.port or 1965)) as raw_conn:
+            context = SSLContext(PROTOCOL_TLS_CLIENT)
+            context.check_hostname = False
+            context.verify_mode = CERT_NONE
+            with context.wrap_socket(raw_conn, server_hostname=splitted_url.hostname) as conn:
+                conn.sendall((raw_url + '\r\n').encode("UTF-8"))
+                fp = conn.makefile("rb")
+
+                splitted = fp.readline().decode("UTF-8").strip().split(maxsplit=1)
+                status = splitted[0]
+                if status.startswith("3") and len(splitted) == 2:
+                    # redirect
+                    raw_url = urljoin(raw_url, splitted[1])
+                    continue
+
+                if not status.startswith("2"):
+                    print(f"43 Remote server error: {' '.join(splitted)}\r")
+                    return
+
+                mime = splitted[1].lower() if len(splitted) == 2 else "text/gemini"
+                mime_matched = (
+                    mime.startswith("text/")
+                    or mime.startswith("application/xml")
+                    or mime.startswith("application/atom")
+                )
+                if not mime_matched:
+                    print(f"43 Only links to `text/*` are allowed: {mime}\r")
+                    return
+
+                # gemini://geminiprotocol.net/docs/companion/subscription.gmi
+
+                m = Message()
+                m['content-type'] = mime
+                root = {}
+                try:
+                    for child in ET.fromstring(fp.read().decode(m.get_param('charset') or "UTF-8")):
+                        _, _, tag = child.tag.rpartition('}')
+                        root.setdefault(tag, []).append(child)
+                except ET.ParseError:
+                    print("43 Parse Atom error\r")
+                    return
+
+                print("20 text/gemini\r")
+                title = root.get("title")
+                print(f"# {title[0].text if title else raw_url}\r")
+                print("\r")
+                for entry in root.get("entry") or []:
+                    entry_dict = {}
+                    for child in entry:
+                        _, _, tag = child.tag.rpartition('}')
+                        entry_dict[tag] = child.text
+
+                    gemini_link = entry_dict.get("id")
+                    if not gemini_link:
+                        continue
+
+                    entry_date = ""
+                    updated = entry_dict.get("updated")
+                    if updated:
+                        try:
+                            entry_date = (
+                                datetime.fromisoformat(updated)
+                                .date().strftime("%Y-%m-%d - ")
+                            )
+                        except ValueError:
+                            pass
+
+                    title = entry_dict.get("title") or ""
+                    print(f"=> {gemini_link} {entry_date}{entry_dict.get('title') or ''}\r")
+                return
+
+
+if __name__ == '__main__':
+    _main(unquote(urlsplit(input().strip()).query))