commit - 1f466cd2f73201e3170797899b9539fdafd95fb2
commit + f6854b294d578a287fa401e5fd4951174764cd3c
blob - 31c6a759b398e842535b7f69eab2147194f0f313
blob + 9b8070176a5e023393d7444c1ab34fa284de2558
--- lonk.py
+++ lonk.py
self.content.append(data.strip())
def flush(self):
- rv = " ".join(" ".join(data.split()) for data in self.content if data)
+ result = " ".join(" ".join(data.split()) for data in self.content if data)
footer = self.footer
self.content = []
self.footer = []
- return "\n".join([rv] + footer) + "\n" if rv else ""
+ return "\n".join([result] + footer) + "\n" if result else ""
class LinkTag(_BaseTag):
self.content += data
def flush(self):
- rv = self.content
+ result = self.content
self.content = ""
- return f"```\n{rv}\n```\n" if rv else ""
+ return f"```\n{result}\n```\n" if result else ""
class HtmlToGmi(HTMLParser):
self.base_url = base_url
self.fn_media_url = fn_media_url
- def feed(self, html_text):
- super().feed(html_text)
+ def feed(self, data):
+ super().feed(data)
while self.stack:
self.gmi_text.append(self.stack.pop().flush())
return "\n".join(gmi_text for gmi_text in self.gmi_text if gmi_text)
)
)
- def do_get(self, action, page, c=None, after=None):
- query = {"action": action, "page": page, "token": self._token}
- if c is not None:
- query["c"] = c
- if after is not None:
- query["after"] = after
- with urlopen(self.build_url(path="api", query=urlencode(query)), timeout=45) as f:
- return json_loads(f.read().decode("utf8"))
+ def do_get(self, action, answer_is_json=True, **kwargs):
+ query = {**{"action": action, "token": self._token}, **kwargs}
+ with urlopen(self.build_url(path="api", query=urlencode(query)), timeout=45) as response:
+ answer = response.read().decode("utf8")
+ return json_loads(answer) if answer_is_json else answer
def _create_schema(db_con):
def page_lonk(db_con, client_id, lonk_url, honk_url):
- gethonks_answer = honk_url.do_get(action="gethonks", page="home")
+ gethonks_answer = honk_url.do_get("gethonks", page="home")
lonk_page = {}
for honk in reversed(gethonks_answer.pop("honks", None) or []):
lonk_page[convoy].donks.append((donk_url, donk_mime, donk_text))
if honk.get("RID"):
- for honk_in_convoy in honk_url.do_get(action="gethonks", page="convoy", c=convoy)["honks"]:
+ for honk_in_convoy in honk_url.do_get("gethonks", page="convoy", c=convoy)["honks"]:
if not honk_in_convoy.get("RID"):
honker = honk_in_convoy.get("Oondle") or honk_in_convoy["Handle"]
_save_convoy(convoy, honker, honk_in_convoy)
print("51 Not found\r")
return
-
- gethonks_answer = honk_url.do_get(action="gethonks", page="convoy", c=query["c"])
+ gethonks_answer = honk_url.do_get("gethonks", page="convoy", c=query["c"])
gethonks_answer["honks"] = [
honk
for honk in (gethonks_answer.pop("honks", None) or [])
def page_atme(db_con, client_id, lonk_url, honk_url):
- gethonks_answer = honk_url.do_get(action="gethonks", page="atme")
+ gethonks_answer = honk_url.do_get("gethonks", page="atme")
print("20 text/gemini\r")
print("# 𝓗 onk: @me")
print("\r")
lines.append(f'=> {lonk_url.media(donk_mime, donk_url)} {donk_url}')
if donk_text:
lines.append(donk_text)
+ lines.append("")
+ lines.append(f'=> {lonk_url.build("bonk", urlencode({"w": honk["XID"]}))} bonk: Share honk with others')
print("\r\n".join(lines))
print("\r")
"password": unquote(lonk_url.query),
"gettoken": "1",
}
- with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as f:
- token = f.read().decode("utf8")
+ with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as response:
+ token = response.read().decode("utf8")
db_con = db_connect()
with db_con:
db_con.execute(
print(f'30 {lonk_url.build([])}\r')
+def bonk(db_con, client_id, lonk_url, honk_url):
+ what = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}.get("w")
+ if not what:
+ print("51 Not found\r")
+ return
+
+ bonk_answer = honk_url.do_get("zonkit", wherefore="bonk", what=what, answer_is_json=False)
+ print(f'30 {lonk_url.build([])}\r')
+
+
def authenticated(cert_hash, lonk_url, fn_impl):
db_con = db_connect()
row = db_con.execute("SELECT client_id, honk_url, token FROM client WHERE cert_hash=?", (cert_hash, )).fetchone()
def proxy(mime, url):
- with urlopen(url, timeout=10) as f:
+ with urlopen(url, timeout=10) as response:
stdout.buffer.write(b"20 " + mime.encode() + b"\r\n")
while True:
- content = f.read(512 * 1024)
+ content = response.read(512 * 1024)
if not content:
break
stdout.buffer.write(content)
authenticated(cert_hash, lonk_url, page_convoy)
elif lonk_url.page == "atme":
authenticated(cert_hash, lonk_url, page_atme)
+ elif lonk_url.page == "bonk":
+ authenticated(cert_hash, lonk_url, bonk)
elif lonk_url.page == "ask_server":
new_client_stage_1_ask_server(lonk_url)
elif lonk_url.page == "ask_username":