commit 895356d09ddb4aff4833a1f4c60769613d920125 from: Aleksey Ryndin date: Tue Oct 01 11:16:03 2024 UTC Add convoy and atme pages commit - 6c43924a03e4d318b0e627f6228b3af226a4bbad commit + 895356d09ddb4aff4833a1f4c60769613d920125 blob - 7b305f84c4fc56b2a9cf62f4d3b19fd73a3ee5cc blob + b267403ccffd560db9af6198df291193e555fbec --- lonk.py +++ lonk.py @@ -167,18 +167,6 @@ class HtmlToGmi(HTMLParser): return self.stack[0] -def _format_honk(honk, header, honk_url, fn_media_url): - assert honk["Noise"] == honk["HTML"], honk - lines = [header, f'=> {honk["XID"]}', HtmlToGmi(honk_url.build_url(), fn_media_url).feed(honk["HTML"])] - for donk in (honk.get("Donks") or []): - donk_url = honk_url.build_url(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"] - lines.append(f'=> {fn_media_url(donk["Media"], donk_url)}') - desc = donk.get("Desc") or donk.get("Name") - if desc: - lines.append(desc) - return "\n".join(lines) - - class LonkUrl: def __init__(self, raw_url): self._splitted_url = urlsplit(raw_url) @@ -351,12 +339,10 @@ class _Collected: return rv - -def _lonk_impl(db_con, client_id, lonk_url, honk_url): - fn_media_url = _proxy_url_factory(environ.get("LONK_PROXY_MEDIA"), lonk_url) - home = honk_url.do_get(action="gethonks", page="home") +def lonk(db_con, client_id, lonk_url, honk_url, fn_media_url): + gethonks_answer = honk_url.do_get(action="gethonks", page="home") lonk_page = {} - for honk in reversed(home.get("honks") or []): + for honk in reversed(gethonks_answer.get("honks") or []): if honk.get("RID"): reason_fn = _Collected.add_reason_thread reason = honk["Handle"] @@ -437,49 +423,92 @@ def _lonk_impl(db_con, client_id, lonk_url, honk_url): reason_fn(lonk_page[convoy], reason) print("20 text/gemini\r") - print("# 𝓗 onk\r") + print("# 𝓗 onk: lonk\r") print("\r") - line = f"=> {honk_url.build_url(path='atme')} @me" - if home["mecount"]: - line += f' ({home["mecount"]})' + line = f"=> {lonk_url.build('atme')} @me" + if gethonks_answer["mecount"]: + line += f' ({gethonks_answer["mecount"]})' print(line + "\r") line = f"=> {honk_url.build_url(path='chatter')} chatter" - if home["chatcount"]: - line += f' ({home["chatcount"]})' + if gethonks_answer["chatcount"]: + line += f' ({gethonks_answer["chatcount"]})' print(line + "\r") print("\r") for collected in reversed(lonk_page.values()): if collected.honker is None: continue + lines = [ f"## From {collected.honker}{collected.format_reason()} {collected.date}", - f"=> {collected.url}", + f'=> {lonk_url.build("convoy", urlencode({"c": collected.convoy}))} Convoy {collected.convoy}', HtmlToGmi(honk_url.build_url(), fn_media_url).feed(collected.html) ] - for donk_url, donk_mime, donk_text in collected.iterate_donks(db_con): lines.append(f'=> {fn_media_url(donk_mime, donk_url)}') if donk_text: lines.append(donk_text) - print("\r\n".join(lines)) print("\r") -def lonk(cert_hash, lonk_url): - db_con = db_connect() - row = db_con.execute("SELECT client_id, honk_url, token FROM client WHERE cert_hash=?", (cert_hash, )).fetchone() - if not row: - print(f'30 {lonk_url.build("ask_server")}\r') +def convoy(db_con, client_id, lonk_url, honk_url, fn_media_url): + query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)} + if "c" not in query: + print("51 Not found\r") return - client_id, honk_url, token = row - with db_con: - _lonk_impl(db_con, client_id, lonk_url, HonkUrl(honk_url, token)) + gethonks_answer = honk_url.do_get(action="gethonks", page="convoy", c=query["c"]) + print("20 text/gemini\r") + print(f"# 𝓗 onk: convoy {query['c']}\r") + print("\r") + _gethonks(gethonks_answer, lonk_url, honk_url, fn_media_url) + +def atme(db_con, client_id, lonk_url, honk_url, fn_media_url): + gethonks_answer = honk_url.do_get(action="gethonks", page="atme") + print("20 text/gemini\r") + print("# 𝓗 onk: @me") + print("\r") + _gethonks(gethonks_answer, lonk_url, honk_url, fn_media_url) + + +def _gethonks(gethonks_answer, lonk_url, honk_url, fn_media_url): + line = f"=> {lonk_url.build('atme')} @me" + if gethonks_answer["mecount"]: + line += f' ({gethonks_answer["mecount"]})' + print(line + "\r") + + line = f"=> {honk_url.build_url(path='chatter')} chatter" + if gethonks_answer["chatcount"]: + line += f' ({gethonks_answer["chatcount"]})' + print(line + "\r") + print("\r") + + for honk in gethonks_answer.get("honks") or []: + if honk.get("What") == "bonked": + continue + + re_url = honk.get("RID") + lines = [ + f'##{"#" if re_url else ""} From {honk["Handle"]} {honk["Date"]}', + f'=> {honk["XID"]}', + ] + if re_url: + lines.append(f'=> {re_url} Re: {re_url}') + lines.append(HtmlToGmi(honk_url.build_url(), fn_media_url).feed(honk["HTML"])) + for donk in honk.get("Donks") or []: + donk_url = honk_url.build_url(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"] + donk_mime, donk_text = donk["Media"], donk.get("Desc") or donk.get("Name") or None + lines.append(f'=> {fn_media_url(donk_mime, donk_url)}') + if donk_text: + lines.append(donk_text) + print("\r\n".join(lines)) + print("\r") + + def new_client_stage_1_ask_server(lonk_url): if not lonk_url.query: print("10 Honk server URL\r") @@ -526,6 +555,18 @@ def new_client_stage_3_ask_password(cert_hash, lonk_ur print(f'30 {lonk_url.build([])}\r') +def authenticated(cert_hash, lonk_url, fn_impl): + db_con = db_connect() + row = db_con.execute("SELECT client_id, honk_url, token FROM client WHERE cert_hash=?", (cert_hash, )).fetchone() + if not row: + print(f'30 {lonk_url.build("ask_server")}\r') + return + fn_media_url = _proxy_url_factory(environ.get("LONK_PROXY_MEDIA"), lonk_url) + client_id, honk_url, token = row + with db_con: + fn_impl(db_con, client_id, lonk_url, HonkUrl(honk_url, token), fn_media_url) + + def proxy(mime, url): with urlopen(url, timeout=10) as f: stdout.buffer.write(b"20 " + mime.encode() + b"\r\n") @@ -539,7 +580,11 @@ def proxy(mime, url): def vgi(cert_hash, raw_url): lonk_url = LonkUrl(raw_url) if lonk_url.page == "lonk": - lonk(cert_hash, lonk_url) + authenticated(cert_hash, lonk_url, lonk) + elif lonk_url.page == "convoy": + authenticated(cert_hash, lonk_url, convoy) + elif lonk_url.page == "atme": + authenticated(cert_hash, lonk_url, atme) elif lonk_url.page == "ask_server": new_client_stage_1_ask_server(lonk_url) elif lonk_url.page == "ask_username":