commit e03b8dd39559fe4a0f3416fe33ecbc0145c45ee9 from: Aleksey Ryndin date: Wed Sep 25 10:15:21 2024 UTC Add: authorization flow commit - 577b12da144a505da89ff3710bedb70d3e67175e commit + e03b8dd39559fe4a0f3416fe33ecbc0145c45ee9 blob - da97511c010f4a47ae6bd8b745a3e771351d49ed blob + 86ee11075e51b794dabf4d95f03481e944ed0506 --- lonk.py +++ lonk.py @@ -6,7 +6,7 @@ from pathlib import Path from sys import stdout from sqlite3 import connect as sqlite3_connect from urllib.error import HTTPError, URLError -from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote +from urllib.parse import urlencode, urlunsplit, urljoin, urlsplit, parse_qsl, unquote, quote from urllib.request import urlopen from html.parser import HTMLParser @@ -44,10 +44,10 @@ class _BaseTag: self.tag = tag def on_data(self, data): - raise NotImplementedError() + raise AssertionError("The method must be overridden") def flush(self): - raise NotImplementedError() + raise AssertionError("The method must be overridden") class ParagraphTag(_BaseTag): @@ -206,17 +206,35 @@ def _format_honk(honk, header, fn_media_url): return "\n".join(lines) +class LonkUrl: + def __init__(self, raw_url): + self._splitted_url = urlsplit(raw_url) + self.splitted_path = [part for part in self._splitted_url.path.split("/") if part] + self.page = self.splitted_path[-1] + self._base_path = [] + for path_part in self.splitted_path: + self._base_path.append(path_part) + if path_part == "lonk": + break + + def build(self, page, query=""): + page = page if isinstance(page, list) else [page] + return urlunsplit( + ("gemini", self._splitted_url.netloc, "/".join(self._base_path + page), query, "") + ) + + @property + def query(self): + return self._splitted_url.query + + def _proxy_url_factory(proxy_media_enabled, lonk_url): def _get_proxy_url(mime, url): - return urlunsplit( - ( - "gemini", - lonk_url.netloc, - lonk_url.path + "/proxy", - urlencode({"m": mime, "u": url}), - "", - ) - ) if mime and proxy_media_enabled else url + return ( + lonk_url.build("proxy", urlencode({"m": mime, "u": url})) + if mime and proxy_media_enabled else + url + ) return _get_proxy_url @@ -279,7 +297,8 @@ def db_connect(): db_exist = db_file_path.exists() db_con = sqlite3_connect(db_file_path) if not db_exist: - _create_schema(db_con) + with db_con: + _create_schema(db_con) return db_con @@ -287,24 +306,60 @@ def lonk(cert_hash, lonk_url): db_con = db_connect() row = db_con.execute("SELECT client_id, honk_url, token FROM client WHERE cert_hash = ?", (cert_hash, )).fetchone() if not row: - new_client_url = urlunsplit( - ("gemini", lonk_url.netloc, lonk_url.path + "/new_client", "", "") - ) - print(f"30 {new_client_url}\r") + print(f'30 {lonk_url.build("ask_server")}\r') return - raise RuntimeError(row) + client_id, honk_url, token = row + raise NotImplementedError(client_id, honk_url, token) -def new_client(cert_hash, lonk_url): - db_con = db_connect() +def new_client_stage_1_ask_server(lonk_url): if not lonk_url.query: - print(f"10 Honk server URL (for example: https://honk.any-key.press)\r") + print("10 Honk server URL (for example: https://honk.any-key.press)\r") return splitted = urlsplit(unquote(lonk_url.query)) honk_url = urlunsplit((splitted.scheme, splitted.netloc, "", "", "")) - raise RuntimeError(honk_url) + path = [quote(urlunsplit((splitted.scheme, splitted.netloc, "", "", "")), safe=""), "ask_username"] + print(f'30 {lonk_url.build(path)}\r') + +def new_client_stage_2_ask_username(lonk_url): + if not lonk_url.query: + print("10 Honk user name\r") + return + if len(lonk_url.splitted_path) < 3: + print('59 Bad request\r') + return + quoted_server = lonk_url.splitted_path[-2] + path = [quoted_server, quote(unquote(lonk_url.query), safe=""), "ask_password"] + print(f'30 {lonk_url.build(path)}\r') + + +def new_client_stage_3_ask_password(cert_hash, lonk_url): + if not lonk_url.query: + print("10 Honk user password\r") + return + if len(lonk_url.splitted_path) < 4: + print('59 Bad request\r') + return + + honk_url = unquote(lonk_url.splitted_path[-3]) + post_data = { + "username": unquote(lonk_url.splitted_path[-2]), + "password": unquote(lonk_url.query), + "gettoken": "1", + } + with urlopen(honk_url + "/dologin", data=urlencode(post_data).encode(), timeout=15) as f: + token = f.read().decode("utf8") + db_con = db_connect() + with db_con: + db_con.execute( + "INSERT INTO client (cert_hash, honk_url, token) VALUES (?, ?, ?)", + (cert_hash, honk_url, token) + ) + print(f'30 {lonk_url.build([])}\r') + + def proxy(mime, url): with urlopen(url, timeout=10) as f: stdout.buffer.write(b"20 " + mime.encode() + b"\r\n") @@ -316,15 +371,19 @@ def proxy(mime, url): def vgi(cert_hash): - url = urlsplit(input().strip()) - page = [part for part in url.path.split("/") if part][-1] - if page == "lonk": - # demo(url) - lonk(cert_hash, url) - elif page == "new_client": - new_client(cert_hash, url) - elif page == "proxy": - query = {pair[0]: pair[1] for pair in parse_qsl(url.query)} + lonk_url = LonkUrl(input().strip()) + if lonk_url.page == "lonk": + lonk(cert_hash, lonk_url) + elif lonk_url.page == "demo": + demo(lonk_url) + elif lonk_url.page == "ask_server": + new_client_stage_1_ask_server(lonk_url) + elif lonk_url.page == "ask_username": + new_client_stage_2_ask_username(lonk_url) + elif lonk_url.page == "ask_password": + new_client_stage_3_ask_password(cert_hash, lonk_url) + elif lonk_url.page == "proxy": + query = {pair[0]: pair[1] for pair in parse_qsl(lonk_url.query)} if "m" not in query or "u" not in query: print("51 Not found\r") else: