Blob


1 #!/usr/bin/env python3
2 from json import loads as json_loads
3 from mimetypes import guess_type
4 from os import environ
5 from pathlib import Path
6 from sys import stdout, stderr
7 from sqlite3 import connect as sqlite3_connect
8 from time import clock_gettime, CLOCK_MONOTONIC
9 from urllib.error import HTTPError, URLError
10 from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote, quote
11 from urllib.request import urlopen
12 from html.parser import HTMLParser
15 class _BaseTag:
16 def __init__(self, tag, attrs):
17 self.tag = tag
19 def on_data(self, data):
20 raise AssertionError("The method must be overridden")
22 def flush(self):
23 raise AssertionError("The method must be overridden")
26 class ParagraphTag(_BaseTag):
27 def __init__(self, tag, attrs):
28 super().__init__(tag, attrs)
29 self.content = []
30 self.footer = []
32 def on_data(self, data):
33 self.content.append(data.strip())
35 def flush(self):
36 result = " ".join(" ".join(data.split()) for data in self.content if data)
37 footer = self.footer
38 self.content = []
39 self.footer = []
40 return "\n".join([result] + footer) + "\n" if result else ""
43 class LinkTag(_BaseTag):
44 def __init__(self, href, paragraph, tag, attrs):
45 super().__init__(tag, attrs)
46 self.href = href
47 self.paragraph = paragraph
48 self.content = []
50 def on_data(self, data):
51 if not self.content:
52 self.paragraph.on_data("↳")
53 self.paragraph.on_data(data)
54 self.content.append(data.strip())
56 def flush(self):
57 text = " ".join(" ".join(data.split()) for data in self.content if data)
58 self.paragraph.footer.append(f"=> {self.href} {text}")
59 return ""
62 class LiItemTag(ParagraphTag):
63 def flush(self):
64 content = super().flush()
65 return f"* {content}" if content else ""
68 class QuoteTag(ParagraphTag):
69 def flush(self):
70 content = super().flush()
71 return f"> {content}" if content else ""
74 class HeaderTag(ParagraphTag):
75 def flush(self):
76 content = super().flush()
77 if not content:
78 return ""
79 return f"{'#' * int(self.tag[1:])} {content}"
82 class PreformattedTag(_BaseTag):
83 def __init__(self, tag, attrs):
84 super().__init__(tag, attrs)
85 self.content = ""
87 def on_data(self, data):
88 self.content += data
90 def flush(self):
91 result = self.content
92 self.content = ""
93 return f"```\n{result}\n```\n" if result else ""
96 class HtmlToGmi(HTMLParser):
97 def __init__(self, base_url, fn_media_url):
98 super().__init__()
99 self.gmi_text = []
100 self.stack = []
101 self.base_url = base_url
102 self.fn_media_url = fn_media_url
104 def feed(self, data):
105 super().feed(data)
106 while self.stack:
107 self.gmi_text.append(self.stack.pop().flush())
108 return "\n".join(gmi_text for gmi_text in self.gmi_text if gmi_text)
110 def handle_starttag(self, tag, attrs):
111 def _push(elem):
112 if self.stack:
113 self.gmi_text.append(self.stack[-1].flush())
114 self.stack.append(elem)
116 if tag == "p":
117 _push(ParagraphTag(tag, attrs))
118 elif tag == "pre":
119 _push(PreformattedTag(tag, attrs))
120 elif tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
121 _push(HeaderTag(tag, attrs))
122 elif tag in {"li", "dt"}:
123 _push(LiItemTag(tag, attrs))
124 elif tag in {"blockquote", "q"}:
125 _push(QuoteTag(tag, attrs))
126 elif tag == "a":
127 href = dict(attrs).get("href")
128 if href:
129 self.stack.append(LinkTag(urljoin(self.base_url, href), self._get_current_paragraph(), tag, attrs))
130 elif tag == "img":
131 img = dict(attrs)
132 title = img.get("title") or ""
133 if img.get("class") == "emu" and title and self.stack:
134 self.stack[-1].on_data(title)
135 else:
136 src = img.get("src")
137 if src:
138 http_img_url = urljoin(self.base_url, src)
139 mime, _ = guess_type(http_img_url)
140 img_url = self.fn_media_url(mime, http_img_url)
141 self.gmi_text.append(f"=> {img_url} {title or http_img_url}")
142 elif tag == "br":
143 if self.stack:
144 self.gmi_text.append(self.stack[-1].flush())
146 def handle_data(self, data):
147 if not self.stack:
148 self.stack.append(ParagraphTag("p", []))
149 self.stack[-1].on_data(data)
151 def handle_endtag(self, tag):
152 if self.stack and tag == self.stack[-1].tag:
153 self.gmi_text.append(self.stack.pop().flush())
155 def _get_current_paragraph(self):
156 for elem in reversed(self.stack):
157 if isinstance(elem, ParagraphTag):
158 return elem
160 self.stack = [ParagraphTag("p", [])] + self.stack
161 return self.stack[0]
164 class LonkUrl:
165 def __init__(self, raw_url):
166 self._splitted_url = urlsplit(raw_url)
167 self.splitted_path = [part for part in self._splitted_url.path.split("/") if part]
168 self.page = self.splitted_path[-1]
169 self._base_path = []
170 for path_part in self.splitted_path:
171 self._base_path.append(path_part)
172 if path_part == "lonk":
173 break
175 def build(self, page, query=""):
176 page = page if isinstance(page, list) else [page]
177 return urlunsplit(
178 ("gemini", self._splitted_url.netloc, "/".join(self._base_path + page), query, "")
181 def media(self, mime, url):
182 return self.build("proxy", urlencode({"m": mime, "u": url})) if mime else url
184 @property
185 def query(self):
186 return self._splitted_url.query
189 class HonkUrl:
190 def __init__(self, raw_url, token):
191 self._splitted_url = urlsplit(raw_url)
192 self._token = token
194 def build(self, scheme=None, netloc=None, path="", query="", fragment=""):
195 return urlunsplit(
197 scheme or self._splitted_url.scheme,
198 netloc or self._splitted_url.netloc,
199 path,
200 query,
201 fragment,
205 def get(self, action, answer_is_json=True, **kwargs):
206 start_time = clock_gettime(CLOCK_MONOTONIC)
207 try:
208 query = {**{"action": action, "token": self._token}, **kwargs}
209 with urlopen(self.build(path="api", query=urlencode(query)), timeout=45) as response:
210 answer = response.read().decode("utf8")
211 return json_loads(answer) if answer_is_json else answer
212 finally:
213 stderr.write(f"GET {action} {kwargs}|{clock_gettime(CLOCK_MONOTONIC) - start_time:.3f}sec.\n")
216 def db_create_schema(db_con):
217 db_con.execute(
218 """
219 CREATE TABLE
220 client (
221 cert_hash TEXT PRIMARY KEY,
222 honk_url TEXT NOT NULL,
223 token TEXT NOT NULL
225 """
229 def db_connect():
230 db_file_path = Path(__file__).parent / ".local" / "db"
231 db_file_path.parent.mkdir(parents=True, exist_ok=True)
232 db_exist = db_file_path.exists()
233 db_con = sqlite3_connect(db_file_path)
234 if not db_exist:
235 with db_con:
236 db_create_schema(db_con)
237 return db_con
240 def print_header(page_name):
241 print("20 text/gemini\r")
242 print(f"# 𝓗 onk: {page_name}\r")
243 print("\r")
246 def print_menu(lonk_url, honk_url, gethonks_answer=None):
247 print("## 📝 Menu\r")
248 print(f"=> {lonk_url.build('newhonk')} new honk\r")
249 print(f"=> {lonk_url.build([])} lonk home\r")
251 if gethonks_answer:
252 line = f"=> {lonk_url.build('atme')} @me"
253 if gethonks_answer["mecount"]:
254 line += f' ({gethonks_answer["mecount"]})'
255 print(line + "\r")
257 line = f"=> {honk_url.build(path='chatter')} chatter"
258 if gethonks_answer["chatcount"]:
259 line += f' ({gethonks_answer["chatcount"]})'
260 print(line + "\r")
262 print(f"=> {lonk_url.build('search')} search\r")
263 print(f"=> {lonk_url.build('longago')} long ago\r")
264 print(f"=> {lonk_url.build('myhonks')} my honks\r")
265 print(f"=> {lonk_url.build('gethonkers')} honkers\r")
266 print(f"=> {lonk_url.build('addhonker')} add new honker\r")
269 def print_gethonks(gethonks_answer, lonk_url, honk_url):
270 print_menu(lonk_url, honk_url, gethonks_answer)
271 print("\r")
273 for honk in gethonks_answer.get("honks") or []:
274 convoy = honk["Convoy"]
275 re_url = honk.get("RID")
276 oondle = honk.get("Oondle")
277 from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}'
278 lines = [
279 f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}',
280 f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}',
281 f'=> {honk["XID"]}',
283 if re_url:
284 lines.append(f'=> {re_url} Re: {re_url}')
285 lines.append("")
286 lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"]))
287 for donk in honk.get("Donks") or []:
288 if donk.get("XID"):
289 donk_url = honk_url.build(path=f'/d/{donk["XID"]}')
290 else:
291 donk_url = urljoin(honk["XID"], donk["URL"])
292 donk_mime = donk["Media"]
293 lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}')
294 donk_text = donk.get("Desc") or donk.get("Name") or None
295 if donk_text:
296 lines.append(donk_text)
297 lines.append("")
298 if honk.get("Public"):
299 lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk')
300 honk_back_url = lonk_url.build(
302 quote(honk["Handles"] or " ", safe=""),
303 quote(honk["XID"], safe=""),
304 "honkback",
307 lines.append(f'=> {honk_back_url} ↱ honk back')
308 for xonker in (honk.get("Honker"), honk.get("Oonker")):
309 if xonker:
310 lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}')
311 print("\r\n".join(lines))
312 print("\r")
314 if gethonks_answer.get("honks"):
315 print("\r")
316 print_menu(lonk_url, honk_url, gethonks_answer)
319 class _LonkTreeItem:
320 def __init__(self, honk=None):
321 self.honk = honk
322 self.thread = []
324 def iterate_honks(self):
325 if self.honk is not None:
326 yield self.honk
327 yield from self.thread
330 def page_lonk(lonk_url, honk_url):
331 gethonks_answer = honk_url.get("gethonks", page="home")
333 lonk_page = {}
334 for honk in reversed(gethonks_answer["honks"]):
335 if honk.get("RID"):
336 lonk_page.setdefault(honk["Convoy"], None)
337 else:
338 lonk_page[honk["Convoy"]] = _LonkTreeItem(honk)
340 correction_map = {}
341 for convoy in list(reversed([convoy for convoy, item in lonk_page.items() if item is None]))[:36]:
342 convoy = correction_map.get(convoy, convoy)
343 if lonk_page.get(convoy) is not None:
344 continue
345 for honk in honk_url.get("gethonks", page="convoy", c=convoy)["honks"]:
346 if honk["What"] == "honked":
347 if convoy != honk["Convoy"]:
348 correction_map[convoy] = honk["Convoy"]
349 convoy = honk["Convoy"]
350 if lonk_page.get(convoy) is None:
351 lonk_page[convoy] = _LonkTreeItem(honk)
352 break
353 else:
354 lonk_page[convoy] = _LonkTreeItem(None)
356 for honk in reversed(gethonks_answer.pop("honks")):
357 if honk.get("RID"):
358 item = lonk_page.get(correction_map.get(honk["Convoy"], honk["Convoy"]))
359 if item is not None:
360 item.thread.append(honk)
362 gethonks_answer["honks"] = []
363 for item in reversed(lonk_page.values()):
364 if item is None:
365 break
366 gethonks_answer["honks"] += list(item.iterate_honks())
368 print_header("lonk home")
369 print_gethonks(gethonks_answer, lonk_url, honk_url)
372 def page_convoy(lonk_url, honk_url):
373 query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}
374 if "c" not in query:
375 print("51 Not found\r")
376 return
378 gethonks_answer = honk_url.get("gethonks", page="convoy", c=query["c"])
379 print_header(f"convoy {query['c']}")
380 print_gethonks(gethonks_answer, lonk_url, honk_url)
383 def page_search(lonk_url, honk_url):
384 if not lonk_url.query:
385 print("10 What are we looking for?\r")
386 return
388 q = unquote(lonk_url.query)
389 gethonks_answer = honk_url.get("gethonks", page="search", q=q)
390 print_header(f"search - {q}")
391 print_gethonks(gethonks_answer, lonk_url, honk_url)
394 def page_atme(lonk_url, honk_url):
395 gethonks_answer = honk_url.get("gethonks", page="atme")
396 print_header("@me")
397 print_gethonks(gethonks_answer, lonk_url, honk_url)
400 def page_longago(lonk_url, honk_url):
401 gethonks_answer = honk_url.get("gethonks", page="longago")
402 print_header("long ago")
403 print_gethonks(gethonks_answer, lonk_url, honk_url)
406 def page_myhonks(lonk_url, honk_url):
407 gethonks_answer = honk_url.get("gethonks", page="myhonks")
408 print_header("my honks")
409 print_gethonks(gethonks_answer, lonk_url, honk_url)
412 def page_honker(lonk_url, honk_url):
413 xid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("xid")
414 if not xid:
415 print("51 Not found\r")
416 return
418 gethonks_answer = honk_url.get("gethonks", page="honker", xid=xid)
419 print_header(f"honks of {xid}")
420 print_gethonks(gethonks_answer, lonk_url, honk_url)
423 def bonk(lonk_url, honk_url):
424 what = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("w")
425 if not what:
426 print("51 Not found\r")
427 return
429 honk_url.get("zonkit", wherefore="bonk", what=what, answer_is_json=False)
430 print(f'30 {lonk_url.build("myhonks")}\r')
433 def gethonkers(lonk_url, honk_url):
434 print_header("honkers")
435 print_menu(lonk_url, honk_url)
437 honkers = honk_url.get("gethonkers").get("honkers") or []
438 for honker in honkers:
439 print(f'## {honker.get("Name") or honker["XID"]}\r')
440 for field_name, display_name in zip(("Name", "XID", "Flavor"), ("name", "url", "flavor")):
441 value = honker.get(field_name)
442 if value:
443 print(f'{display_name}: {value}\r')
444 if honker.get("Flavor") == "sub":
445 print(f'=> {lonk_url.build("unsubscribe", urlencode({"honkerid": honker["ID"]}))} unsubscribe\r')
446 else:
447 print(f'=> {lonk_url.build("subscribe", urlencode({"honkerid": honker["ID"]}))} (re)subscribe\r')
448 print(f'=> {lonk_url.build("honker", urlencode({"xid": honker["XID"]}))} honks of {honker["XID"]}\r')
449 print('\r')
451 if honkers:
452 print("\r")
453 print_menu(lonk_url, honk_url)
456 def addhonker(lonk_url, honk_url):
457 if not lonk_url.query:
458 print("10 honker url: \r")
459 return
461 url = unquote(lonk_url.query)
462 try:
463 honk_url.get("savehonker", url=url, answer_is_json=False)
464 print(f'30 {lonk_url.build("gethonkers")}\r')
465 except HTTPError as error:
466 print_header("add new honker")
467 print_menu(lonk_url, honk_url)
468 print("\r")
469 print('## Error\r')
470 print(f'> {error.fp.read().decode("utf8")}\r')
473 def unsubscribe(lonk_url, honk_url):
474 honkerid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("honkerid")
475 if not honkerid:
476 print("51 Not found\r")
477 return
479 url = unquote(lonk_url.query)
480 honk_url.get("savehonker", honkerid=honkerid, unsub="unsub", answer_is_json=False)
481 print(f'30 {lonk_url.build("gethonkers")}\r')
484 def subscribe(lonk_url, honk_url):
485 honkerid = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("honkerid")
486 if not honkerid:
487 print("51 Not found\r")
488 return
490 url = unquote(lonk_url.query)
491 honk_url.get("savehonker", honkerid=honkerid, sub="sub", answer_is_json=False)
492 print(f'30 {lonk_url.build("gethonkers")}\r')
495 def newhonk(lonk_url, honk_url):
496 if not lonk_url.query:
497 print("10 let's make some noise: \r")
498 return
500 noise = unquote(lonk_url.query)
501 honk_url.get("honk", noise=noise, answer_is_json=False)
502 print(f'30 {lonk_url.build("myhonks")}\r')
505 def honkback(lonk_url, honk_url):
506 if not lonk_url.query:
507 handles = unquote(lonk_url.splitted_path[-3]).strip()
508 rid = unquote(lonk_url.splitted_path[-2])
509 print(f"10 Answer to {handles or rid}:\r")
510 return
512 noise = unquote(lonk_url.query)
513 rid = unquote(lonk_url.splitted_path[-2])
514 honk_url.get("honk", noise=noise, rid=rid, answer_is_json=False)
515 print(f'30 {lonk_url.build("myhonks")}\r')
518 def authenticated(cert_hash, lonk_url, fn_impl):
519 db_con = db_connect()
520 row = db_con.execute("SELECT honk_url, token FROM client WHERE cert_hash=?", (cert_hash, )).fetchone()
521 if not row:
522 print(f'30 {lonk_url.build("ask_server")}\r')
523 return
524 honk_url, token = row
525 with db_con:
526 fn_impl(lonk_url, HonkUrl(honk_url, token))
529 def new_client_stage_1_ask_server(lonk_url):
530 if not lonk_url.query:
531 print("10 Honk server URL\r")
532 return
533 splitted = urlsplit(unquote(lonk_url.query))
534 path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"]
535 print(f'30 {lonk_url.build(path)}\r')
538 def new_client_stage_2_ask_username(lonk_url):
539 if not lonk_url.query:
540 print("10 Honk user name\r")
541 return
542 if len(lonk_url.splitted_path) < 3:
543 print('59 Bad request\r')
544 return
545 quoted_server = lonk_url.splitted_path[-2]
546 path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"]
547 print(f'30 {lonk_url.build(path)}\r')
550 def new_client_stage_3_ask_password(cert_hash, lonk_url):
551 if not lonk_url.query:
552 print("11 Honk user password\r")
553 return
554 if len(lonk_url.splitted_path) < 4:
555 print('59 Bad request\r')
556 return
558 honk_url = unquote(lonk_url.splitted_path[-3])
559 post_data = {
560 "username": unquote(lonk_url.splitted_path[-2]),
561 "password": unquote(lonk_url.query),
562 "gettoken": "1",
564 with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response:
565 token = response.read().decode("utf8")
566 db_con = db_connect()
567 with db_con:
568 db_con.execute(
569 "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)",
570 (cert_hash, honk_url, token)
572 print(f'30 {lonk_url.build([])}\r')
575 def proxy(mime, url):
576 with urlopen(url, timeout=10) as response:
577 stdout.buffer.write(b"20 " + mime.encode() + b"\r\n")
578 while True:
579 content = response.read(512 * 1024)
580 if not content:
581 break
582 stdout.buffer.write(content)
585 def vgi(cert_hash, raw_url):
586 lonk_url = LonkUrl(raw_url)
587 if lonk_url.page == "lonk":
588 authenticated(cert_hash, lonk_url, page_lonk)
589 elif lonk_url.page == "convoy":
590 authenticated(cert_hash, lonk_url, page_convoy)
591 elif lonk_url.page == "atme":
592 authenticated(cert_hash, lonk_url, page_atme)
593 elif lonk_url.page == "search":
594 authenticated(cert_hash, lonk_url, page_search)
595 elif lonk_url.page == "longago":
596 authenticated(cert_hash, lonk_url, page_longago)
597 elif lonk_url.page == "myhonks":
598 authenticated(cert_hash, lonk_url, page_myhonks)
599 elif lonk_url.page == "honker":
600 authenticated(cert_hash, lonk_url, page_honker)
601 elif lonk_url.page == "bonk":
602 authenticated(cert_hash, lonk_url, bonk)
603 elif lonk_url.page == "gethonkers":
604 authenticated(cert_hash, lonk_url, gethonkers)
605 elif lonk_url.page == "addhonker":
606 authenticated(cert_hash, lonk_url, addhonker)
607 elif lonk_url.page == "unsubscribe":
608 authenticated(cert_hash, lonk_url, unsubscribe)
609 elif lonk_url.page == "subscribe":
610 authenticated(cert_hash, lonk_url, subscribe)
611 elif lonk_url.page == "newhonk":
612 authenticated(cert_hash, lonk_url, newhonk)
613 elif lonk_url.page == "honkback":
614 authenticated(cert_hash, lonk_url, honkback)
615 elif lonk_url.page == "ask_server":
616 new_client_stage_1_ask_server(lonk_url)
617 elif lonk_url.page == "ask_username":
618 new_client_stage_2_ask_username(lonk_url)
619 elif lonk_url.page == "ask_password":
620 new_client_stage_3_ask_password(cert_hash, lonk_url)
621 elif lonk_url.page == "proxy":
622 query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}
623 if "m" not in query or "u" not in query:
624 print("51 Not found\r")
625 else:
626 proxy(mime=query["m"], url=query["u"])
627 else:
628 print("51 Not found\r")
631 if __name__ == '__main__':
632 cert_hash_ = environ.get("VGI_CERT_HASH")
633 if cert_hash_:
634 try:
635 start_time = clock_gettime(CLOCK_MONOTONIC)
636 try:
637 input_url = input().strip()
638 vgi(cert_hash_, input_url)
639 finally:
640 stderr.write(f"{cert_hash_}|{input_url}|{clock_gettime(CLOCK_MONOTONIC) - start_time:.3f}sec.\n")
641 except HTTPError as error:
642 stderr.write(f"{error}\n")
643 print(f"43 Remote server return {error.code}: {error.reason}\r")
644 except URLError as error:
645 stderr.write(f"{error}\n")
646 print(f"43 Error while trying to access remote server: {error.reason}\r")
647 except TimeoutError as error:
648 stderr.write(f"{error}\n")
649 print(f"43 Error while trying to access remote server: {error}\r")
650 else:
651 stderr.write("Certificate required\n")
652 print("60 Certificate required\r")