commit - 577b12da144a505da89ff3710bedb70d3e67175e
commit + e03b8dd39559fe4a0f3416fe33ecbc0145c45ee9
blob - da97511c010f4a47ae6bd8b745a3e771351d49ed
blob + 86ee11075e51b794dabf4d95f03481e944ed0506
--- lonk.py
+++ lonk.py
from sys import stdout
from sqlite3 import connect as sqlite3_connect
from urllib.error import HTTPError, URLError
-from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote
+from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote, quote
from urllib.request import urlopen
from html.parser import HTMLParser
self.tag = tag
def on_data(self, data):
- raise NotImplementedError()
+ raise AssertionError("The method must be overridden")
def flush(self):
- raise NotImplementedError()
+ raise AssertionError("The method must be overridden")
class ParagraphTag(_BaseTag):
return "\n".join(lines)
+class LonkUrl:
+ def __init__(self, raw_url):
+ self._splitted_url = urlsplit(raw_url)
+ self.splitted_path = [part for part in self._splitted_url.path.split("/") if part]
+ self.page = self.splitted_path[-1]
+ self._base_path = []
+ for path_part in self.splitted_path:
+ self._base_path.append(path_part)
+ if path_part == "lonk":
+ break
+
+ def build(self, page, query=""):
+ page = page if isinstance(page, list) else [page]
+ return urlunsplit(
+ ("gemini", self._splitted_url.netloc, "/".join(self._base_path + page), query, "")
+ )
+
+ @property
+ def query(self):
+ return self._splitted_url.query
+
+
def _proxy_url_factory(proxy_media_enabled, lonk_url):
def _get_proxy_url(mime, url):
- return urlunsplit(
- (
- "gemini",
- lonk_url.netloc,
- lonk_url.path + "/proxy",
- urlencode({"m": mime, "u": url}),
- "",
- )
- ) if mime and proxy_media_enabled else url
+ return (
+ lonk_url.build("proxy", urlencode({"m": mime, "u": url}))
+ if mime and proxy_media_enabled else
+ url
+ )
return _get_proxy_url
db_exist = db_file_path.exists()
db_con = sqlite3_connect(db_file_path)
if not db_exist:
- _create_schema(db_con)
+ with db_con:
+ _create_schema(db_con)
return db_con
db_con = db_connect()
row = db_con.execute("SELECT client_id, honk_url, token FROM client WHERE cert_hash = ?", (cert_hash, )).fetchone()
if not row:
- new_client_url = urlunsplit(
- ("gemini", lonk_url.netloc, lonk_url.path + "/new_client", "", "")
- )
- print(f"30 {new_client_url}\r")
+ print(f'30 {lonk_url.build("ask_server")}\r')
return
- raise RuntimeError(row)
+ client_id, honk_url, token = row
+ raise NotImplementedError(client_id, honk_url, token)
-def new_client(cert_hash, lonk_url):
- db_con = db_connect()
+def new_client_stage_1_ask_server(lonk_url):
if not lonk_url.query:
- print(f"10 Honk server URL (for example: https://honk.any-key.press)\r")
+ print("10 Honk server URL (for example: https://honk.any-key.press)\r")
return
splitted = urlsplit(unquote(lonk_url.query))
honk_url = urlunsplit((splitted.scheme, splitted.netloc, "", "", ""))
- raise RuntimeError(honk_url)
+ path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"]
+ print(f'30 {lonk_url.build(path)}\r')
+
+def new_client_stage_2_ask_username(lonk_url):
+ if not lonk_url.query:
+ print("10 Honk user name\r")
+ return
+ if len(lonk_url.splitted_path) < 3:
+ print('59 Bad request\r')
+ return
+ quoted_server = lonk_url.splitted_path[-2]
+ path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"]
+ print(f'30 {lonk_url.build(path)}\r')
+
+
+def new_client_stage_3_ask_password(cert_hash, lonk_url):
+ if not lonk_url.query:
+ print("10 Honk user password\r")
+ return
+ if len(lonk_url.splitted_path) < 4:
+ print('59 Bad request\r')
+ return
+
+ honk_url = unquote(lonk_url.splitted_path[-3])
+ post_data = {
+ "username": unquote(lonk_url.splitted_path[-2]),
+ "password": unquote(lonk_url.query),
+ "gettoken": "1",
+ }
+ with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as f:
+ token = f.read().decode("utf8")
+ db_con = db_connect()
+ with db_con:
+ db_con.execute(
+ "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)",
+ (cert_hash, honk_url, token)
+ )
+ print(f'30 {lonk_url.build([])}\r')
+
+
def proxy(mime, url):
with urlopen(url, timeout=10) as f:
stdout.buffer.write(b"20 " + mime.encode() + b"\r\n")
def vgi(cert_hash):
- url = urlsplit(input().strip())
- page = [part for part in url.path.split("/") if part][-1]
- if page == "lonk":
- # demo(url)
- lonk(cert_hash, url)
- elif page == "new_client":
- new_client(cert_hash, url)
- elif page == "proxy":
- query = {pair[0]: pair[1] for pair in parse_qsl(url.query)}
+ lonk_url = LonkUrl(input().strip())
+ if lonk_url.page == "lonk":
+ lonk(cert_hash, lonk_url)
+ elif lonk_url.page == "demo":
+ demo(lonk_url)
+ elif lonk_url.page == "ask_server":
+ new_client_stage_1_ask_server(lonk_url)
+ elif lonk_url.page == "ask_username":
+ new_client_stage_2_ask_username(lonk_url)
+ elif lonk_url.page == "ask_password":
+ new_client_stage_3_ask_password(cert_hash, lonk_url)
+ elif lonk_url.page == "proxy":
+ query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)}
if "m" not in query or "u" not in query:
print("51 Not found\r")
else: