#!/usr/bin/env python3 from json import loads as json_loads from mimetypes import guess_type from os import environ from pathlib import Path from sys import stdout, stderr from sqlite3 import connect as sqlite3_connect from time import clock_gettime, CLOCK_MONOTONIC from urllib.error import HTTPError, URLError from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote, quote from urllib.request import urlopen from html.parser import HTMLParser class _BaseTag: def __init__(self, tag, attrs): self.tag = tag def on_data(self, data): raise AssertionError("The method must be overridden") def flush(self): raise AssertionError("The method must be overridden") class ParagraphTag(_BaseTag): def __init__(self, tag, attrs): super().__init__(tag, attrs) self.content = [] self.footer = [] def on_data(self, data): self.content.append(data.strip()) def flush(self): result = " ".join(" ".join(data.split()) for data in self.content if data) footer = self.footer self.content = [] self.footer = [] return "\n".join([result] + footer) + "\n" if result else "" class LinkTag(_BaseTag): def __init__(self, href, paragraph, tag, attrs): super().__init__(tag, attrs) self.href = href self.paragraph = paragraph self.content = [] def on_data(self, data): if not self.content: self.paragraph.on_data("↳") self.paragraph.on_data(data) self.content.append(data.strip()) def flush(self): text = " ".join(" ".join(data.split()) for data in self.content if data) self.paragraph.footer.append(f"=> {self.href} {text}") return "" class LiItemTag(ParagraphTag): def flush(self): content = super().flush() return f"* {content}" if content else "" class QuoteTag(ParagraphTag): def flush(self): content = super().flush() return f"> {content}" if content else "" class HeaderTag(ParagraphTag): def flush(self): content = super().flush() if not content: return "" return f"{'#' * int(self.tag[1:])} {content}" class PreformattedTag(_BaseTag): def __init__(self, tag, attrs): super().__init__(tag, attrs) self.content = "" def on_data(self, data): self.content += data def flush(self): result = self.content self.content = "" return f"```\n{result}\n```\n" if result else "" class HtmlToGmi(HTMLParser): def __init__(self, base_url, fn_media_url): super().__init__() self.gmi_text = [] self.stack = [] self.base_url = base_url self.fn_media_url = fn_media_url def feed(self, data): super().feed(data) while self.stack: self.gmi_text.append(self.stack.pop().flush()) return "\n".join(gmi_text for gmi_text in self.gmi_text if gmi_text) def handle_starttag(self, tag, attrs): def _push(elem): if self.stack: self.gmi_text.append(self.stack[-1].flush()) self.stack.append(elem) if tag == "p": _push(ParagraphTag(tag, attrs)) elif tag == "pre": _push(PreformattedTag(tag, attrs)) elif tag in {"h1", "h2", "h3", "h4", "h5", "h6"}: _push(HeaderTag(tag, attrs)) elif tag in {"li", "dt"}: _push(LiItemTag(tag, attrs)) elif tag in {"blockquote", "q"}: _push(QuoteTag(tag, attrs)) elif tag == "a": href = dict(attrs).get("href") if href: self.stack.append(LinkTag(urljoin(self.base_url, href), self._get_current_paragraph(), tag, attrs)) elif tag == "img": img = dict(attrs) title = img.get("title") or "" if img.get("class") == "emu" and title and self.stack: self.stack[-1].on_data(title) else: src = img.get("src") if src: http_img_url = urljoin(self.base_url, src) mime, _ = guess_type(http_img_url) img_url = self.fn_media_url(mime, http_img_url) self.gmi_text.append(f"=> {img_url} {title or http_img_url}") elif tag == "br": if self.stack: self.gmi_text.append(self.stack[-1].flush()) def handle_data(self, data): if not self.stack: self.stack.append(ParagraphTag("p", [])) self.stack[-1].on_data(data) def handle_endtag(self, tag): if self.stack and tag == self.stack[-1].tag: self.gmi_text.append(self.stack.pop().flush()) def _get_current_paragraph(self): for elem in reversed(self.stack): if isinstance(elem, ParagraphTag): return elem self.stack = [ParagraphTag("p", [])] + self.stack return self.stack[0] class LonkUrl: def __init__(self, raw_url): self._splitted_url = urlsplit(raw_url) self.splitted_path = [part for part in self._splitted_url.path.split("/") if part] self.page = self.splitted_path[-1] self._base_path = [] for path_part in self.splitted_path: self._base_path.append(path_part) if path_part == "lonk": break def build(self, page, query=""): page = page if isinstance(page, list) else [page] return urlunsplit( ("gemini", self._splitted_url.netloc, "/".join(self._base_path + page), query, "") ) def media(self, mime, url): return self.build("proxy", urlencode({"m": mime, "u": url})) if mime else url @property def query(self): return self._splitted_url.query class HonkUrl: def __init__(self, raw_url, token): self._splitted_url = urlsplit(raw_url) self._token = token def build(self, scheme=None, netloc=None, path="", query="", fragment=""): return urlunsplit( ( scheme or self._splitted_url.scheme, netloc or self._splitted_url.netloc, path, query, fragment, ) ) def get(self, action, answer_is_json=True, **kwargs): start_time = clock_gettime(CLOCK_MONOTONIC) try: query = {**{"action": action, "token": self._token}, **kwargs} with urlopen(self.build(path="api", query=urlencode(query)), timeout=45) as response: answer = response.read().decode("utf8") return json_loads(answer) if answer_is_json else answer finally: stderr.write(f"GET {action} {kwargs}|{clock_gettime(CLOCK_MONOTONIC) - start_time:.3f}sec.\n") def db_create_schema(db_con): db_con.execute( """ CREATE TABLE client ( cert_hash TEXT PRIMARY KEY, honk_url TEXT NOT NULL, token TEXT NOT NULL ) """ ) def db_connect(): db_file_path = Path(__file__).parent / ".local" / "db" db_file_path.parent.mkdir(parents=True, exist_ok=True) db_exist = db_file_path.exists() db_con = sqlite3_connect(db_file_path) if not db_exist: with db_con: db_create_schema(db_con) return db_con def print_header(page_name): print("20 text/gemini\r") print(f"# 𝓗 onk: {page_name}\r") print("\r") def print_menu(lonk_url, honk_url, gethonks_answer=None): print("## 📝 Menu\r") print(f"=> {lonk_url.build('newhonk')} new honk\r") print(f"=> {lonk_url.build([])} lonk home\r") print(f"=> {lonk_url.build('first')} first class only\r") if gethonks_answer: line = f"=> {lonk_url.build('atme')} @me" if gethonks_answer["mecount"]: line += f' ({gethonks_answer["mecount"]})' print(line + "\r") line = f"=> {honk_url.build(path='chatter')} chatter" if gethonks_answer["chatcount"]: line += f' ({gethonks_answer["chatcount"]})' print(line + "\r") print(f"=> {lonk_url.build('search')} search\r") print(f"=> {lonk_url.build('longago')} long ago\r") print(f"=> {lonk_url.build('myhonks')} my honks\r") print(f"=> {lonk_url.build('gethonkers')} honkers\r") print(f"=> {lonk_url.build('addhonker')} add new honker\r") def print_gethonks(gethonks_answer, lonk_url, honk_url): print_menu(lonk_url, honk_url, gethonks_answer) print("\r") for honk in gethonks_answer.get("honks") or []: convoy = honk["Convoy"] re_url = honk.get("RID") oondle = honk.get("Oondle") from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}' lines = [ f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}', f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}', f'=> {honk["XID"]}', ] if re_url: lines.append(f'=> {re_url} Re: {re_url}') lines.append("") lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"])) for donk in honk.get("Donks") or []: if donk.get("XID"): donk_url = honk_url.build(path=f'/d/{donk["XID"]}') else: donk_url = urljoin(honk["XID"], donk["URL"]) donk_mime = donk["Media"] lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}') donk_text = donk.get("Desc") or donk.get("Name") or None if donk_text: lines.append(donk_text) lines.append("") if honk.get("Public"): lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk') honk_back_url = lonk_url.build( [ quote(honk["Handles"] or " ", safe=""), quote(honk["XID"], safe=""), "honkback", ] ) lines.append(f'=> {honk_back_url} ↱ honk back') for xonker in (honk.get("Honker"), honk.get("Oonker")): if xonker: lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}') print("\r\n".join(lines)) print("\r") if gethonks_answer.get("honks"): print("\r") print_menu(lonk_url, honk_url, gethonks_answer) class _LonkTreeItem: def __init__(self, honk): self.honk = honk self.thread = [] def iterate_honks(self): if self.honk is not None: yield self.honk yield from self.thread def page_lonk(lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="home") timeline = {} lonk_tree = {} for i, honk in enumerate(reversed(gethonks_answer["honks"])): timeline.setdefault(honk["Convoy"], i) if honk.get("RID"): lonk_tree.setdefault(honk["Convoy"], _LonkTreeItem(None)) else: lonk_tree[honk["Convoy"]] = _LonkTreeItem(honk) # fetch first 36 threads (without start honk) sorted_ = sorted(timeline.keys(), key=lambda convoy: timeline[convoy], reverse=True) correction_map = {} for convoy in [convoy for convoy in sorted_ if lonk_tree[convoy].honk is None][:36]: for honk in honk_url.get("gethonks", page="convoy", c=convoy)["honks"]: if not honk.get("RID"): if convoy != honk["Convoy"]: correction_map[convoy] = honk["Convoy"] tl_weight_1 = timeline.pop(convoy) convoy = honk["Convoy"] tl_weight_2 = timeline.get(convoy) timeline[convoy] = tl_weight_1 if tl_weight_2 is None else min(tl_weight_1, tl_weight_2) item = lonk_tree.get(convoy) if item is None: lonk_tree[convoy] = _LonkTreeItem(honk) elif item.honk is None: item.honk = honk break else: # no thread start found timeline.pop(convoy) # link answers to thread for honk in reversed(gethonks_answer.pop("honks")): if honk.get("RID"): item = lonk_tree.get(correction_map.get(honk["Convoy"], honk["Convoy"])) if item is not None: if item.honk is not None: item.thread.append(honk) # build honks for page gethonks_answer["honks"] = [] for convoy in sorted(timeline.keys(), key=lambda convoy: timeline[convoy], reverse=True): item = lonk_tree[convoy] if item.honk is None: break # first unfetched gethonks_answer["honks"] += list(item.iterate_honks()) print_header("lonk home") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_first(lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="home") gethonks_answer["honks"] = [honk for honk in gethonks_answer.pop("honks") if honk["What"] in {"bonked", "honked"}] print_header("first class only") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_convoy(lonk_url, honk_url): query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)} if "c" not in query: print("51 Not found\r") return gethonks_answer = honk_url.get("gethonks", page="convoy", c=query["c"]) print_header(f"convoy {query['c']}") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_search(lonk_url, honk_url): if not lonk_url.query: print("10 What are we looking for?\r") return q = unquote(lonk_url.query) gethonks_answer = honk_url.get("gethonks", page="search", q=q) print_header(f"search - {q}") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_atme(lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="atme") print_header("@me") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_longago(lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="longago") print_header("long ago") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_myhonks(lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="myhonks") print_header("my honks") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_honker(lonk_url, honk_url): xid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("xid") if not xid: print("51 Not found\r") return gethonks_answer = honk_url.get("gethonks", page="honker", xid=xid) print_header(f"honks of {xid}") print_gethonks(gethonks_answer, lonk_url, honk_url) def bonk(lonk_url, honk_url): what = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("w") if not what: print("51 Not found\r") return honk_url.get("zonkit", wherefore="bonk", what=what, answer_is_json=False) print(f'30 {lonk_url.build("myhonks")}\r') def gethonkers(lonk_url, honk_url): print_header("honkers") print_menu(lonk_url, honk_url) honkers = honk_url.get("gethonkers").get("honkers") or [] for honker in honkers: print(f'## {honker.get("Name") or honker["XID"]}\r') for field_name, display_name in zip(("Name", "XID", "Flavor"), ("name", "url", "flavor")): value = honker.get(field_name) if value: print(f'{display_name}: {value}\r') if honker.get("Flavor") == "sub": print(f'=> {lonk_url.build("unsubscribe", urlencode({"honkerid": honker["ID"]}))} unsubscribe\r') else: print(f'=> {lonk_url.build("subscribe", urlencode({"honkerid": honker["ID"]}))} (re)subscribe\r') print(f'=> {lonk_url.build("honker", urlencode({"xid": honker["XID"]}))} honks of {honker["XID"]}\r') print('\r') if honkers: print("\r") print_menu(lonk_url, honk_url) def addhonker(lonk_url, honk_url): if not lonk_url.query: print("10 honker url: \r") return url = unquote(lonk_url.query) try: honk_url.get("savehonker", url=url, answer_is_json=False) print(f'30 {lonk_url.build("gethonkers")}\r') except HTTPError as error: print_header("add new honker") print_menu(lonk_url, honk_url) print("\r") print('## Error\r') print(f'> {error.fp.read().decode("utf8")}\r') def unsubscribe(lonk_url, honk_url): honkerid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("honkerid") if not honkerid: print("51 Not found\r") return url = unquote(lonk_url.query) honk_url.get("savehonker", honkerid=honkerid, unsub="unsub", answer_is_json=False) print(f'30 {lonk_url.build("gethonkers")}\r') def subscribe(lonk_url, honk_url): honkerid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("honkerid") if not honkerid: print("51 Not found\r") return url = unquote(lonk_url.query) honk_url.get("savehonker", honkerid=honkerid, sub="sub", answer_is_json=False) print(f'30 {lonk_url.build("gethonkers")}\r') def newhonk(lonk_url, honk_url): if not lonk_url.query: print("10 let's make some noise: \r") return noise = unquote(lonk_url.query) honk_url.get("honk", noise=noise, answer_is_json=False) print(f'30 {lonk_url.build("myhonks")}\r') def honkback(lonk_url, honk_url): if not lonk_url.query: handles = unquote(lonk_url.splitted_path[-3]).strip() rid = unquote(lonk_url.splitted_path[-2]) print(f"10 Answer to {handles or rid}:\r") return noise = unquote(lonk_url.query) rid = unquote(lonk_url.splitted_path[-2]) honk_url.get("honk", noise=noise, rid=rid, answer_is_json=False) print(f'30 {lonk_url.build("myhonks")}\r') def authenticated(cert_hash, lonk_url, fn_impl): db_con = db_connect() row = db_con.execute("SELECT honk_url, token FROM client WHERE cert_hash=?", (cert_hash, )).fetchone() if not row: print(f'30 {lonk_url.build("ask_server")}\r') return honk_url, token = row with db_con: fn_impl(lonk_url, HonkUrl(honk_url, token)) def new_client_stage_1_ask_server(lonk_url): if not lonk_url.query: print("10 Honk server URL\r") return splitted = urlsplit(unquote(lonk_url.query)) path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"] print(f'30 {lonk_url.build(path)}\r') def new_client_stage_2_ask_username(lonk_url): if not lonk_url.query: print("10 Honk user name\r") return if len(lonk_url.splitted_path) < 3: print('59 Bad request\r') return quoted_server = lonk_url.splitted_path[-2] path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"] print(f'30 {lonk_url.build(path)}\r') def new_client_stage_3_ask_password(cert_hash, lonk_url): if not lonk_url.query: print("11 Honk user password\r") return if len(lonk_url.splitted_path) < 4: print('59 Bad request\r') return honk_url = unquote(lonk_url.splitted_path[-3]) post_data = { "username": unquote(lonk_url.splitted_path[-2]), "password": unquote(lonk_url.query), "gettoken": "1", } with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response: token = response.read().decode("utf8") db_con = db_connect() with db_con: db_con.execute( "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)", (cert_hash, honk_url, token) ) print(f'30 {lonk_url.build([])}\r') def proxy(mime, url): with urlopen(url, timeout=10) as response: stdout.buffer.write(b"20 " + mime.encode() + b"\r\n") while True: content = response.read(512 * 1024) if not content: break stdout.buffer.write(content) def vgi(cert_hash, raw_url): lonk_url = LonkUrl(raw_url) if lonk_url.page == "lonk": authenticated(cert_hash, lonk_url, page_lonk) elif lonk_url.page == "first": authenticated(cert_hash, lonk_url, page_first) elif lonk_url.page == "convoy": authenticated(cert_hash, lonk_url, page_convoy) elif lonk_url.page == "atme": authenticated(cert_hash, lonk_url, page_atme) elif lonk_url.page == "search": authenticated(cert_hash, lonk_url, page_search) elif lonk_url.page == "longago": authenticated(cert_hash, lonk_url, page_longago) elif lonk_url.page == "myhonks": authenticated(cert_hash, lonk_url, page_myhonks) elif lonk_url.page == "honker": authenticated(cert_hash, lonk_url, page_honker) elif lonk_url.page == "bonk": authenticated(cert_hash, lonk_url, bonk) elif lonk_url.page == "gethonkers": authenticated(cert_hash, lonk_url, gethonkers) elif lonk_url.page == "addhonker": authenticated(cert_hash, lonk_url, addhonker) elif lonk_url.page == "unsubscribe": authenticated(cert_hash, lonk_url, unsubscribe) elif lonk_url.page == "subscribe": authenticated(cert_hash, lonk_url, subscribe) elif lonk_url.page == "newhonk": authenticated(cert_hash, lonk_url, newhonk) elif lonk_url.page == "honkback": authenticated(cert_hash, lonk_url, honkback) elif lonk_url.page == "ask_server": new_client_stage_1_ask_server(lonk_url) elif lonk_url.page == "ask_username": new_client_stage_2_ask_username(lonk_url) elif lonk_url.page == "ask_password": new_client_stage_3_ask_password(cert_hash, lonk_url) elif lonk_url.page == "proxy": query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)} if "m" not in query or "u" not in query: print("51 Not found\r") else: proxy(mime=query["m"], url=query["u"]) else: print("51 Not found\r") def main(): cert_hash_ = environ.get("VGI_CERT_HASH") if cert_hash_: try: start_time = clock_gettime(CLOCK_MONOTONIC) try: input_url = input().strip() vgi(cert_hash_, input_url) finally: stderr.write(f"{cert_hash_}|{input_url}|{clock_gettime(CLOCK_MONOTONIC) - start_time:.3f}sec.\n") except HTTPError as error: stderr.write(f"{error}\n") print(f"43 Remote server return {error.code}: {error.reason}\r") except URLError as error: stderr.write(f"{error}\n") print(f"43 Error while trying to access remote server: {error.reason}\r") except TimeoutError as error: stderr.write(f"{error}\n") print(f"43 Error while trying to access remote server: {error}\r") else: stderr.write("Certificate required\n") print("60 Certificate required\r") if __name__ == '__main__': main()