commit - 056d37098b58e2e53ae59e8de059960b5796b3c8
commit + 280fbeb10c0d939863f473a995bfdc7ee1bfc242
blob - e842129b7ab6db310097621fc975a31a9f92d3c4
blob + 81636d2b9a34d48eb86d7c79b9dd9d0481486213
--- lonk.py
+++ lonk.py
return ""
-class LitItemTag(ParagraphTag):
+class LiItemTag(ParagraphTag):
def flush(self):
content = super().flush()
return f"* {content}" if content else ""
elif tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
_push(HeaderTag(tag, attrs))
elif tag in {"li", "dt"}:
- _push(LitItemTag(tag, attrs))
+ _push(LiItemTag(tag, attrs))
elif tag in {"blockquote", "q"}:
_push(QuoteTag(tag, attrs))
elif tag == "a":
else:
src = img.get("src")
if src:
- img_url = urljoin(self.base_url, src)
- mime, _ = guess_type(img_url)
- img_url = self.fn_media_url(mime, img_url)
- self.gmi_text.append(f"=> {img_url} {title or img_url}")
+ http_img_url = urljoin(self.base_url, src)
+ mime, _ = guess_type(http_img_url)
+ img_url = self.fn_media_url(mime, http_img_url)
+ self.gmi_text.append(f"=> {img_url} {title or http_img_url}")
elif tag == "br":
if self.stack:
self.gmi_text.append(self.stack[-1].flush())
return json_loads(answer) if answer_is_json else answer
-def _create_schema(db_con):
+def db_create_schema(db_con):
db_con.execute(
"""
CREATE TABLE
honker_url TEXT,
oonker_url TEXT,
FOREIGN KEY (client_id) REFERENCES client (client_id),
- UNIQUE (convoy, client_id),
- UNIQUE (honk_id, client_id)
+ UNIQUE (convoy, client_id)
)
"""
)
db_con = sqlite3_connect(db_file_path)
if not db_exist:
with db_con:
- _create_schema(db_con)
+ db_create_schema(db_con)
return db_con
+
+
+def print_header(page_name):
+ print("20 text/gemini\r")
+ print(f"# 𝓗 onk: {page_name}\r")
+ print("\r")
+
+
+def print_menu(lonk_url, honk_url, gethonks_answer=None):
+ print(f"## 📝 Menu\r")
+ print(f"=> {lonk_url.build('newhonk')} new honk\r")
+ print(f"=> {lonk_url.build([])} lonk home\r")
+
+ if gethonks_answer:
+ line = f"=> {lonk_url.build('atme')} @me"
+ if gethonks_answer["mecount"]:
+ line += f' ({gethonks_answer["mecount"]})'
+ print(line + "\r")
+
+ line = f"=> {honk_url.build(path='chatter')} chatter"
+ if gethonks_answer["chatcount"]:
+ line += f' ({gethonks_answer["chatcount"]})'
+ print(line + "\r")
+
+ print(f"=> {lonk_url.build('search')} search\r")
+ print(f"=> {lonk_url.build('longago')} long ago\r")
+ print(f"=> {lonk_url.build('myhonks')} my honks\r")
+ print(f"=> {lonk_url.build('gethonkers')} honkers\r")
+ print(f"=> {lonk_url.build('addhonker')} add new honker\r")
+
+
+def print_gethonks(gethonks_answer, lonk_url, honk_url):
+ print_menu(lonk_url, honk_url, gethonks_answer)
+ print("\r")
+
+ for honk in gethonks_answer.get("honks") or []:
+ convoy = honk["Convoy"]
+ re_url = honk.get("RID")
+ oondle = honk.get("Oondle")
+ from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}'
+ lines = [
+ f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}',
+ f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}',
+ f'=> {honk["XID"]}',
+ ]
+ if re_url:
+ lines.append(f'=> {re_url} Re: {re_url}')
+ lines.append("")
+ lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"]))
+ for donk in honk.get("Donks") or []:
+ donk_url = honk_url.build(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"]
+ donk_mime, donk_text = donk["Media"], donk.get("Desc") or donk.get("Name") or None
+ lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}')
+ if donk_text:
+ lines.append(donk_text)
+ lines.append("")
+ if honk.get("Public"):
+ lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk')
+ honk_back_url = lonk_url.build(
+ [
+ quote(honk["Handles"] or " ", safe=""),
+ quote(honk["XID"], safe=""),
+ "honkback",
+ ]
+ )
+ lines.append(f'=> {honk_back_url} ↱ honk back')
+ for xonker in (honk.get("Honker"), honk.get("Oonker")):
+ if xonker:
+ lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}')
+ print("\r\n".join(lines))
+ print("\r")
+ if gethonks_answer.get("honks"):
+ print("\r")
+ print_menu(lonk_url, honk_url, gethonks_answer)
+
class _LonkTreeItem:
def __init__(self, convoy_id, convoy, honk_id, handle, oondle, url, html, date, public, handles, honker, oonker):
self.convoy_id = convoy_id
for tree_item in reversed(lonk_page.values()):
gethonks_answer["honks"] += list(tree_item.iterate_honks())
- print("20 text/gemini\r")
- print("# 𝓗 onk: lonk\r")
- print("\r")
+ print_header("lonk home")
print_gethonks(gethonks_answer, lonk_url, honk_url)
return
gethonks_answer = honk_url.get("gethonks", page="convoy", c=query["c"])
- print("20 text/gemini\r")
- print(f"# 𝓗 onk: convoy {query['c']}\r")
- print("\r")
+ print_header(f"convoy {query['c']}")
print_gethonks(gethonks_answer, lonk_url, honk_url)
q = unquote(lonk_url.query)
gethonks_answer = honk_url.get("gethonks", page="search", q=q)
- print("20 text/gemini\r")
- print(f"# 𝓗 onk: search - {q}\r")
- print("\r")
+ print_header(f"search - {q}")
print_gethonks(gethonks_answer, lonk_url, honk_url)
def page_atme(db_con, client_id, lonk_url, honk_url):
gethonks_answer = honk_url.get("gethonks", page="atme")
- print("20 text/gemini\r")
- print("# 𝓗 onk: @me")
- print("\r")
+ print_header("@me")
print_gethonks(gethonks_answer, lonk_url, honk_url)
def page_longago(db_con, client_id, lonk_url, honk_url):
gethonks_answer = honk_url.get("gethonks", page="longago")
- print("20 text/gemini\r")
- print("# 𝓗 onk: long ago")
- print("\r")
+ print_header("long ago")
print_gethonks(gethonks_answer, lonk_url, honk_url)
def page_myhonks(db_con, client_id, lonk_url, honk_url):
gethonks_answer = honk_url.get("gethonks", page="myhonks")
- print("20 text/gemini\r")
- print("# 𝓗 onk: my honks")
- print("\r")
+ print_header("my honks")
print_gethonks(gethonks_answer, lonk_url, honk_url)
return
gethonks_answer = honk_url.get("gethonks", page="honker", xid=xid)
- print("20 text/gemini\r")
- print(f"# 𝓗 onk: honks of {xid}\r")
- print("\r")
+ print_header(f"honks of {xid}")
print_gethonks(gethonks_answer, lonk_url, honk_url)
-
-
-def menu(lonk_url, honk_url, gethonks_answer=None):
- print(f"## 📝 Menu\r")
- print(f"=> {lonk_url.build('newhonk')} new honk\r")
- print(f"=> {lonk_url.build([])} lonk home\r")
-
- if gethonks_answer:
- line = f"=> {lonk_url.build('atme')} @me"
- if gethonks_answer["mecount"]:
- line += f' ({gethonks_answer["mecount"]})'
- print(line + "\r")
-
- line = f"=> {honk_url.build(path='chatter')} chatter"
- if gethonks_answer["chatcount"]:
- line += f' ({gethonks_answer["chatcount"]})'
- print(line + "\r")
-
- print(f"=> {lonk_url.build('search')} search\r")
- print(f"=> {lonk_url.build('longago')} long ago\r")
- print(f"=> {lonk_url.build('myhonks')} my honks\r")
- print(f"=> {lonk_url.build('gethonkers')} honkers\r")
- print(f"=> {lonk_url.build('addhonker')} add new honker\r")
-
-
-def print_gethonks(gethonks_answer, lonk_url, honk_url):
- menu(lonk_url, honk_url, gethonks_answer)
- print("\r")
-
- for honk in gethonks_answer.get("honks") or []:
- convoy = honk["Convoy"]
- re_url = honk.get("RID")
- oondle = honk.get("Oondle")
- from_ = f'{oondle} (🔁 {honk["Handle"]})' if oondle else f'{honk["Handle"]}'
- lines = [
- f'##{"# ↱" if re_url else ""} From {from_} {honk["Date"]}',
- f'=> {lonk_url.build("convoy", urlencode({"c": convoy}))} Convoy {convoy}',
- f'=> {honk["XID"]}',
- ]
- if re_url:
- lines.append(f'=> {re_url} Re: {re_url}')
- lines.append("")
- lines.append(HtmlToGmi(honk_url.build(), lonk_url.media).feed(honk["HTML"]))
- for donk in honk.get("Donks") or []:
- donk_url = honk_url.build(path=f'/d/{donk["XID"]}') if donk.get("XID") else donk["URL"]
- donk_mime, donk_text = donk["Media"], donk.get("Desc") or donk.get("Name") or None
- lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}')
- if donk_text:
- lines.append(donk_text)
- lines.append("")
- if honk.get("Public"):
- lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} ↺ bonk')
- honk_back_url = lonk_url.build(
- [
- quote(honk["Handles"] or " ", safe=""),
- quote(honk["XID"], safe=""),
- "honkback",
- ]
- )
- lines.append(f'=> {honk_back_url} ↱ honk back')
- for xonker in (honk.get("Honker"), honk.get("Oonker")):
- if xonker:
- lines.append(f'=> {lonk_url.build("honker", urlencode({"xid": xonker}))} honks of {xonker}')
- print("\r\n".join(lines))
- print("\r")
- if gethonks_answer.get("honks"):
- print("\r")
- menu(lonk_url, honk_url, gethonks_answer)
-
-def new_client_stage_1_ask_server(lonk_url):
- if not lonk_url.query:
- print("10 Honk server URL\r")
- return
- splitted = urlsplit(unquote(lonk_url.query))
- path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"]
- print(f'30 {lonk_url.build(path)}\r')
-
-
-def new_client_stage_2_ask_username(lonk_url):
- if not lonk_url.query:
- print("10 Honk user name\r")
- return
- if len(lonk_url.splitted_path) < 3:
- print('59 Bad request\r')
- return
- quoted_server = lonk_url.splitted_path[-2]
- path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"]
- print(f'30 {lonk_url.build(path)}\r')
-
-
-def new_client_stage_3_ask_password(cert_hash, lonk_url):
- if not lonk_url.query:
- print("11 Honk user password\r")
- return
- if len(lonk_url.splitted_path) < 4:
- print('59 Bad request\r')
- return
-
- honk_url = unquote(lonk_url.splitted_path[-3])
- post_data = {
- "username": unquote(lonk_url.splitted_path[-2]),
- "password": unquote(lonk_url.query),
- "gettoken": "1",
- }
- with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response:
- token = response.read().decode("utf8")
- db_con = db_connect()
- with db_con:
- db_con.execute(
- "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)",
- (cert_hash, honk_url, token)
- )
- print(f'30 {lonk_url.build([])}\r')
-
-
def bonk(db_con, client_id, lonk_url, honk_url):
what = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("w")
if not what:
def gethonkers(db_con, client_id, lonk_url, honk_url):
- print("20 text/gemini\r")
- print("# 𝓗 onk: honkers\r")
- print("\r")
+ print_header("honkers")
+ print_menu(lonk_url, honk_url)
- menu(lonk_url, honk_url)
- print("\r")
-
honkers = honk_url.get("gethonkers").get("honkers") or []
for honker in honkers:
print(f'## {honker.get("Name") or honker["XID"]}\r')
print(f'=> {lonk_url.build("unsubscribe", urlencode({"honkerid": honker["ID"]}))} unsubscribe\r')
else:
print(f'=> {lonk_url.build("subscribe", urlencode({"honkerid": honker["ID"]}))} (re)subscribe\r')
+ print(f'=> {lonk_url.build("honker", urlencode({"xid": honker["XID"]}))} honks of {honker["XID"]}\r')
print('\r')
if honkers:
print("\r")
- menu(lonk_url, honk_url)
+ print_menu(lonk_url, honk_url)
def addhonker(db_con, client_id, lonk_url, honk_url):
honk_url.get("savehonker", url=url, answer_is_json=False)
print(f'30 {lonk_url.build("gethonkers")}\r')
except HTTPError as error:
- print("20 text/gemini\r")
- print("# 𝓗 onk: add new honker\r")
+ print_header("add new honker")
+ print_menu(lonk_url, honk_url)
print("\r")
- menu(lonk_url, honk_url)
- print("\r")
print('## Error\r')
print(f'> {error.fp.read().decode("utf8")}\r')
fn_impl(db_con, client_id, lonk_url, HonkUrl(honk_url, token))
+def new_client_stage_1_ask_server(lonk_url):
+ if not lonk_url.query:
+ print("10 Honk server URL\r")
+ return
+ splitted = urlsplit(unquote(lonk_url.query))
+ path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"]
+ print(f'30 {lonk_url.build(path)}\r')
+
+
+def new_client_stage_2_ask_username(lonk_url):
+ if not lonk_url.query:
+ print("10 Honk user name\r")
+ return
+ if len(lonk_url.splitted_path) < 3:
+ print('59 Bad request\r')
+ return
+ quoted_server = lonk_url.splitted_path[-2]
+ path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"]
+ print(f'30 {lonk_url.build(path)}\r')
+
+
+def new_client_stage_3_ask_password(cert_hash, lonk_url):
+ if not lonk_url.query:
+ print("11 Honk user password\r")
+ return
+ if len(lonk_url.splitted_path) < 4:
+ print('59 Bad request\r')
+ return
+
+ honk_url = unquote(lonk_url.splitted_path[-3])
+ post_data = {
+ "username": unquote(lonk_url.splitted_path[-2]),
+ "password": unquote(lonk_url.query),
+ "gettoken": "1",
+ }
+ with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response:
+ token = response.read().decode("utf8")
+ db_con = db_connect()
+ with db_con:
+ db_con.execute(
+ "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)",
+ (cert_hash, honk_url, token)
+ )
+ print(f'30 {lonk_url.build([])}\r')
+
+
def proxy(mime, url):
with urlopen(url, timeout=10) as response:
stdout.buffer.write(b"20 " + mime.encode() + b"\r\n")