commit 280fbeb10c0d939863f473a995bfdc7ee1bfc242 from: Aleksey Ryndin date: Fri Oct 11 11:49:32 2024 UTC Fix: title formatting commit - 056d37098b58e2e53ae59e8de059960b5796b3c8 commit + 280fbeb10c0d939863f473a995bfdc7ee1bfc242 blob - e842129b7ab6db310097621fc975a31a9f92d3c4 blob + 81636d2b9a34d48eb86d7c79b9dd9d0481486213 --- lonk.py +++ lonk.py @@ -59,7 +59,7 @@ class LinkTag(_BaseTag): return "" -class LitItemTag(ParagraphTag): +class LiItemTag(ParagraphTag): def flush(self): content = super().flush() return f"* {content}" if content else "" @@ -120,7 +120,7 @@ class HtmlToGmi(HTMLParser): elif tag in {"h1", "h2", "h3", "h4", "h5", "h6"}: _push(HeaderTag(tag, attrs)) elif tag in {"li", "dt"}: - _push(LitItemTag(tag, attrs)) + _push(LiItemTag(tag, attrs)) elif tag in {"blockquote", "q"}: _push(QuoteTag(tag, attrs)) elif tag == "a": @@ -135,10 +135,10 @@ class HtmlToGmi(HTMLParser): else: src = img.get("src") if src: - img_url = urljoin(self.base_url, src) - mime, _ = guess_type(img_url) - img_url = self.fn_media_url(mime, img_url) - self.gmi_text.append(f"=> {img_url} {title or img_url}") + http_img_url = urljoin(self.base_url, src) + mime, _ = guess_type(http_img_url) + img_url = self.fn_media_url(mime, http_img_url) + self.gmi_text.append(f"=> {img_url} {title or http_img_url}") elif tag == "br": if self.stack: self.gmi_text.append(self.stack[-1].flush()) @@ -209,7 +209,7 @@ class HonkUrl: return json_loads(answer) if answer_is_json else answer -def _create_schema(db_con): +def db_create_schema(db_con): db_con.execute( """ CREATE TABLE @@ -239,8 +239,7 @@ def _create_schema(db_con): honker_url TEXT, oonker_url TEXT, FOREIGN KEY (client_id) REFERENCES client (client_id), - UNIQUE (convoy, client_id), - UNIQUE (honk_id, client_id) + UNIQUE (convoy, client_id) ) """ ) @@ -290,10 +289,85 @@ def db_connect(): db_con = sqlite3_connect(db_file_path) if not db_exist: with db_con: - _create_schema(db_con) + db_create_schema(db_con) return db_con + + +def print_header(page_name): + print("20 text/gemini\r") + print(f"# 𝓗 onk: {page_name}\r") + print("\r") + + +def print_menu(lonk_url, honk_url, gethonks_answer=None): + print(f"## 📝 Menu\r") + print(f"=> {lonk_url.build('newhonk')} new honk\r") + print(f"=> {lonk_url.build([])} lonk home\r") + + if gethonks_answer: + line = f"=> {lonk_url.build('atme')} @me" + if gethonks_answer["mecount"]: + line += f' ({gethonks_answer["mecount"]})' + print(line + "\r") + + line = f"=> {honk_url.build(path='chatter')} chatter" + if gethonks_answer["chatcount"]: + line += f' ({gethonks_answer["chatcount"]})' + print(line + "\r") + + print(f"=> {lonk_url.build('search')} search\r") + print(f"=> {lonk_url.build('longago')} long ago\r") + print(f"=> {lonk_url.build('myhonks')} my honks\r") + print(f"=> {lonk_url.build('gethonkers')} honkers\r") + print(f"=> {lonk_url.build('addhonker')} add new honker\r") + + +def print_gethonks(gethonks_answer, lonk_url, honk_url): + print_menu(lonk_url, honk_url, gethonks_answer) + print("\r") + + for honk in gethonks_answer.get("honks") or []: + convoy = honk["Convoy"] + re_url = honk.get("RID") + oondle = honk.get("Oondle") + from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}' + lines = [ + f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}', + f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}', + f'=> {honk["XID"]}', + ] + if re_url: + lines.append(f'=> {re_url} Re: {re_url}') + lines.append("") + lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"])) + for donk in honk.get("Donks") or []: + donk_url = honk_url.build(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"] + donk_mime, donk_text = donk["Media"], donk.get("Desc") or donk.get("Name") or None + lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}') + if donk_text: + lines.append(donk_text) + lines.append("") + if honk.get("Public"): + lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk') + honk_back_url = lonk_url.build( + [ + quote(honk["Handles"] or " ", safe=""), + quote(honk["XID"], safe=""), + "honkback", + ] + ) + lines.append(f'=> {honk_back_url} ↱ honk back') + for xonker in (honk.get("Honker"), honk.get("Oonker")): + if xonker: + lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}') + print("\r\n".join(lines)) + print("\r") + if gethonks_answer.get("honks"): + print("\r") + print_menu(lonk_url, honk_url, gethonks_answer) + class _LonkTreeItem: def __init__(self, convoy_id, convoy, honk_id, handle, oondle, url, html, date, public, handles, honker, oonker): self.convoy_id = convoy_id @@ -411,9 +485,7 @@ def page_lonk(db_con, client_id, lonk_url, honk_url): for tree_item in reversed(lonk_page.values()): gethonks_answer["honks"] += list(tree_item.iterate_honks()) - print("20 text/gemini\r") - print("# 𝓗 onk: lonk\r") - print("\r") + print_header("lonk home") print_gethonks(gethonks_answer, lonk_url, honk_url) @@ -424,9 +496,7 @@ def page_convoy(db_con, client_id, lonk_url, honk_url) return gethonks_answer = honk_url.get("gethonks", page="convoy", c=query["c"]) - print("20 text/gemini\r") - print(f"# 𝓗 onk: convoy {query['c']}\r") - print("\r") + print_header(f"convoy {query['c']}") print_gethonks(gethonks_answer, lonk_url, honk_url) @@ -437,33 +507,25 @@ def page_search(db_con, client_id, lonk_url, honk_url) q = unquote(lonk_url.query) gethonks_answer = honk_url.get("gethonks", page="search", q=q) - print("20 text/gemini\r") - print(f"# 𝓗 onk: search - {q}\r") - print("\r") + print_header(f"search - {q}") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_atme(db_con, client_id, lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="atme") - print("20 text/gemini\r") - print("# 𝓗 onk: @me") - print("\r") + print_header("@me") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_longago(db_con, client_id, lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="longago") - print("20 text/gemini\r") - print("# 𝓗 onk: long ago") - print("\r") + print_header("long ago") print_gethonks(gethonks_answer, lonk_url, honk_url) def page_myhonks(db_con, client_id, lonk_url, honk_url): gethonks_answer = honk_url.get("gethonks", page="myhonks") - print("20 text/gemini\r") - print("# 𝓗 onk: my honks") - print("\r") + print_header("my honks") print_gethonks(gethonks_answer, lonk_url, honk_url) @@ -474,127 +536,10 @@ def page_honker(db_con, client_id, lonk_url, honk_url) return gethonks_answer = honk_url.get("gethonks", page="honker", xid=xid) - print("20 text/gemini\r") - print(f"# 𝓗 onk: honks of {xid}\r") - print("\r") + print_header(f"honks of {xid}") print_gethonks(gethonks_answer, lonk_url, honk_url) - - -def menu(lonk_url, honk_url, gethonks_answer=None): - print(f"## 📝 Menu\r") - print(f"=> {lonk_url.build('newhonk')} new honk\r") - print(f"=> {lonk_url.build([])} lonk home\r") - - if gethonks_answer: - line = f"=> {lonk_url.build('atme')} @me" - if gethonks_answer["mecount"]: - line += f' ({gethonks_answer["mecount"]})' - print(line + "\r") - - line = f"=> {honk_url.build(path='chatter')} chatter" - if gethonks_answer["chatcount"]: - line += f' ({gethonks_answer["chatcount"]})' - print(line + "\r") - - print(f"=> {lonk_url.build('search')} search\r") - print(f"=> {lonk_url.build('longago')} long ago\r") - print(f"=> {lonk_url.build('myhonks')} my honks\r") - print(f"=> {lonk_url.build('gethonkers')} honkers\r") - print(f"=> {lonk_url.build('addhonker')} add new honker\r") - - -def print_gethonks(gethonks_answer, lonk_url, honk_url): - menu(lonk_url, honk_url, gethonks_answer) - print("\r") - - for honk in gethonks_answer.get("honks") or []: - convoy = honk["Convoy"] - re_url = honk.get("RID") - oondle = honk.get("Oondle") - from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}' - lines = [ - f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}', - f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}', - f'=> {honk["XID"]}', - ] - if re_url: - lines.append(f'=> {re_url} Re: {re_url}') - lines.append("") - lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"])) - for donk in honk.get("Donks") or []: - donk_url = honk_url.build(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"] - donk_mime, donk_text = donk["Media"], donk.get("Desc") or donk.get("Name") or None - lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}') - if donk_text: - lines.append(donk_text) - lines.append("") - if honk.get("Public"): - lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk') - honk_back_url = lonk_url.build( - [ - quote(honk["Handles"] or " ", safe=""), - quote(honk["XID"], safe=""), - "honkback", - ] - ) - lines.append(f'=> {honk_back_url} ↱ honk back') - for xonker in (honk.get("Honker"), honk.get("Oonker")): - if xonker: - lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}') - print("\r\n".join(lines)) - print("\r") - if gethonks_answer.get("honks"): - print("\r") - menu(lonk_url, honk_url, gethonks_answer) - -def new_client_stage_1_ask_server(lonk_url): - if not lonk_url.query: - print("10 Honk server URL\r") - return - splitted = urlsplit(unquote(lonk_url.query)) - path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"] - print(f'30 {lonk_url.build(path)}\r') - - -def new_client_stage_2_ask_username(lonk_url): - if not lonk_url.query: - print("10 Honk user name\r") - return - if len(lonk_url.splitted_path) < 3: - print('59 Bad request\r') - return - quoted_server = lonk_url.splitted_path[-2] - path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"] - print(f'30 {lonk_url.build(path)}\r') - - -def new_client_stage_3_ask_password(cert_hash, lonk_url): - if not lonk_url.query: - print("11 Honk user password\r") - return - if len(lonk_url.splitted_path) < 4: - print('59 Bad request\r') - return - - honk_url = unquote(lonk_url.splitted_path[-3]) - post_data = { - "username": unquote(lonk_url.splitted_path[-2]), - "password": unquote(lonk_url.query), - "gettoken": "1", - } - with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response: - token = response.read().decode("utf8") - db_con = db_connect() - with db_con: - db_con.execute( - "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)", - (cert_hash, honk_url, token) - ) - print(f'30 {lonk_url.build([])}\r') - - def bonk(db_con, client_id, lonk_url, honk_url): what = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("w") if not what: @@ -606,13 +551,9 @@ def bonk(db_con, client_id, lonk_url, honk_url): def gethonkers(db_con, client_id, lonk_url, honk_url): - print("20 text/gemini\r") - print("# 𝓗 onk: honkers\r") - print("\r") + print_header("honkers") + print_menu(lonk_url, honk_url) - menu(lonk_url, honk_url) - print("\r") - honkers = honk_url.get("gethonkers").get("honkers") or [] for honker in honkers: print(f'## {honker.get("Name") or honker["XID"]}\r') @@ -624,11 +565,12 @@ def gethonkers(db_con, client_id, lonk_url, honk_url): print(f'=> {lonk_url.build("unsubscribe", urlencode({"honkerid": honker["ID"]}))} unsubscribe\r') else: print(f'=> {lonk_url.build("subscribe", urlencode({"honkerid": honker["ID"]}))} (re)subscribe\r') + print(f'=> {lonk_url.build("honker", urlencode({"xid": honker["XID"]}))} honks of {honker["XID"]}\r') print('\r') if honkers: print("\r") - menu(lonk_url, honk_url) + print_menu(lonk_url, honk_url) def addhonker(db_con, client_id, lonk_url, honk_url): @@ -641,11 +583,9 @@ def addhonker(db_con, client_id, lonk_url, honk_url): honk_url.get("savehonker", url=url, answer_is_json=False) print(f'30 {lonk_url.build("gethonkers")}\r') except HTTPError as error: - print("20 text/gemini\r") - print("# 𝓗 onk: add new honker\r") + print_header("add new honker") + print_menu(lonk_url, honk_url) print("\r") - menu(lonk_url, honk_url) - print("\r") print('## Error\r') print(f'> {error.fp.read().decode("utf8")}\r') @@ -706,6 +646,52 @@ def authenticated(cert_hash, lonk_url, fn_impl): fn_impl(db_con, client_id, lonk_url, HonkUrl(honk_url, token)) +def new_client_stage_1_ask_server(lonk_url): + if not lonk_url.query: + print("10 Honk server URL\r") + return + splitted = urlsplit(unquote(lonk_url.query)) + path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"] + print(f'30 {lonk_url.build(path)}\r') + + +def new_client_stage_2_ask_username(lonk_url): + if not lonk_url.query: + print("10 Honk user name\r") + return + if len(lonk_url.splitted_path) < 3: + print('59 Bad request\r') + return + quoted_server = lonk_url.splitted_path[-2] + path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"] + print(f'30 {lonk_url.build(path)}\r') + + +def new_client_stage_3_ask_password(cert_hash, lonk_url): + if not lonk_url.query: + print("11 Honk user password\r") + return + if len(lonk_url.splitted_path) < 4: + print('59 Bad request\r') + return + + honk_url = unquote(lonk_url.splitted_path[-3]) + post_data = { + "username": unquote(lonk_url.splitted_path[-2]), + "password": unquote(lonk_url.query), + "gettoken": "1", + } + with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response: + token = response.read().decode("utf8") + db_con = db_connect() + with db_con: + db_con.execute( + "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)", + (cert_hash, honk_url, token) + ) + print(f'30 {lonk_url.build([])}\r') + + def proxy(mime, url): with urlopen(url, timeout=10) as response: stdout.buffer.write(b"20 " + mime.encode() + b"\r\n")