Blob


1 """Yet another http-to-gemini."""
2 import socket
3 import ssl
4 import xml.etree.ElementTree as ET
5 from argparse import ArgumentParser
6 from email.message import Message
7 from http import HTTPStatus
8 from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
9 from urllib.parse import parse_qs, urlparse, urljoin, urlencode, uses_relative, uses_netloc
10 from contextlib import contextmanager
12 # for urljoin:
13 uses_relative.append("gemini")
14 uses_netloc.append("gemini")
17 def _build_navigation(url=None):
18 form = ET.Element("form")
19 form.attrib.update(method="get")
20 input_ = ET.SubElement(form, "input")
21 input_.attrib.update(
22 **{
23 "title": "url",
24 "type": "text",
25 "name": "url",
26 "placeholder": "gemini://",
27 "autocomplete": "off",
28 "size": "64",
29 }
30 )
31 if url:
32 input_.attrib.update(value=url)
33 input_ = ET.SubElement(form, "input")
34 input_.attrib.update(**{"type": "submit", "value": "go!"})
35 return ET.tostring(form) + b"\r\n"
38 class _HTTPServer(ThreadingHTTPServer):
39 def __init__(
40 self,
41 *args,
42 header_file_path=None,
43 footer_file_path=None,
44 icon_file_path=None,
45 css_file_path=None,
46 robots_file_path=None,
47 **kwargs
48 ):
49 super().__init__(*args, **kwargs)
50 with open(header_file_path, "rb") as f:
51 self.header_file_bytes = f.read()
52 with open(footer_file_path, "rb") as f:
53 self.footer_file_bytes = f.read()
54 with open(icon_file_path, "rb") as f:
55 self.icon_file_bytes = f.read()
56 with open(css_file_path, "rb") as f:
57 self.css_file_bytes = f.read()
58 with open(robots_file_path, "rb") as f:
59 self.robots_file_bytes = f.read()
62 class _Elem:
63 def __init__(self, file):
64 self.elem = None
65 self.file = file
67 @contextmanager
68 def __call__(self):
69 yield self
70 self.flush()
72 def flush_bytes(self):
73 if self.elem is None:
74 return b""
76 rv = ET.tostring(self.elem) + b"\r\n"
77 self.elem = None
78 return rv
80 def flush(self):
81 self.file.write(self.flush_bytes())
84 class _FlushBeforeWrite:
85 def __init__(self, elem):
86 self._elem = elem
87 self._file = elem.file
89 def cancel(self):
90 self._elem = None
92 def commit(self):
93 if self._elem is not None:
94 self._elem.flush()
95 self._elem = None
96 return self._file
98 @contextmanager
99 def __call__(self):
100 yield self
101 self.commit()
104 class _RequestHandler(BaseHTTPRequestHandler):
105 def _parse_path(self):
106 _, _, path, _, query, _ = urlparse(self.path)
107 return path, parse_qs(query) if query else {}
109 def do_GET(self):
110 user_agent = self.headers.get("User-Agent", "")
111 for prefix in ("facebook", "meta", ):
112 if user_agent.startswith(prefix):
113 self.send_error(HTTPStatus.FORBIDDEN, "Crawlers are not allowed (see robots.txt)")
114 return
116 path, query = self._parse_path()
117 if path in {"/index.html", "/index.htm", "/index", "/"}:
118 url = query.get("url", [None])[0]
119 if not url:
120 self.send_response(HTTPStatus.OK)
121 self.send_header("Content-type", "text/html")
122 self.end_headers()
123 self.wfile.write(
124 self.server.header_file_bytes.replace(b"$URL", b"yet another http-to-gemini")
126 self.wfile.write(_build_navigation())
127 self.wfile.write(self.server.footer_file_bytes)
128 return
130 try:
131 for _ in range(6): # first request + 5 consecutive redirects
132 parsed = urlparse(url)
133 if parsed.scheme != "gemini":
134 self.send_error(HTTPStatus.BAD_REQUEST, "Only gemini:// URLs are supported")
135 return
137 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
138 context.check_hostname = False
139 context.verify_mode = ssl.CERT_NONE
140 with socket.create_connection((parsed.hostname, parsed.port or 1965)) as raw_s:
141 with context.wrap_socket(raw_s, server_hostname=parsed.hostname) as s:
142 s.sendall((url + '\r\n').encode("UTF-8"))
143 fp = s.makefile("rb")
144 splitted = fp.readline().decode("UTF-8").strip().split(maxsplit=1)
145 status = splitted[0]
146 if status.startswith("3") and len(splitted) == 2:
147 # redirect
148 url = urljoin(url, splitted[1])
149 continue
150 if not status.startswith("2"):
151 self.send_error(
152 HTTPStatus.INTERNAL_SERVER_ERROR,
153 f"Unsupported answer: {splitted[0]}",
154 splitted[1] if len(splitted) == 2 else None
156 return
157 mime = splitted[1].lower() if len(splitted) == 2 else "text/gemini"
158 if not mime.startswith("text/gemini"):
159 # return as-is
160 self.send_response(HTTPStatus.OK)
161 self.send_header("Content-type", mime)
162 self.end_headers()
163 while True:
164 buffer = fp.read(64 * 1024)
165 if not buffer:
166 break # EOF
167 self.wfile.write(buffer)
168 return
169 m = Message()
170 m['content-type'] = mime
171 body = fp.read().decode(m.get_param('charset') or "UTF-8")
172 self._convert_gemini_to_html(url, body, mime)
173 return
174 break
175 else:
176 raise RuntimeError("Too many redirects")
177 except Exception as error:
178 self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(error))
179 return
181 if path == "/favicon.ico":
182 self.send_response(HTTPStatus.OK)
183 self.send_header("Content-type", "image/x-icon")
184 self.end_headers()
185 self.wfile.write(self.server.icon_file_bytes)
186 return
188 if path == "/style.css":
189 self.send_response(HTTPStatus.OK)
190 self.send_header("Content-type", "text/css")
191 self.end_headers()
192 self.wfile.write(self.server.css_file_bytes)
193 return
195 if path == "/robots.txt":
196 self.send_response(HTTPStatus.OK)
197 self.send_header("Content-type", "text/plain")
198 self.end_headers()
199 self.wfile.write(self.server.robots_file_bytes)
200 return
202 self.send_error(HTTPStatus.NOT_FOUND, "File not found")
204 def _convert_gemini_to_html(self, url, body, mime):
205 # convert gemini (body) to html
206 self.send_response(HTTPStatus.OK)
207 self.send_header("Content-type", mime.replace("gemini", "html"))
208 self.end_headers()
209 self.wfile.write(self.server.header_file_bytes.replace(b"$URL", url.encode()))
210 self.wfile.write(_build_navigation(url))
211 with _Elem(self.wfile)() as pre:
212 with _Elem(self.wfile)() as ul:
213 for line in body.splitlines():
214 with _FlushBeforeWrite(ul)() as flush_before:
215 if line.startswith("```"):
216 if pre.elem is None:
217 pre.elem = ET.Element("pre")
218 pre.elem.text = ""
219 else:
220 flush_before.commit().write(pre.flush_bytes())
221 elif pre.elem is not None:
222 if pre.elem.text:
223 pre.elem.text += "\r\n"
224 pre.elem.text += line
225 elif line.startswith("=>") and line[2:].strip():
226 p = ET.Element("p")
227 p.text = "=> "
228 splitted = line[2:].strip().split(maxsplit=1)
229 target = urljoin(url, splitted[0])
230 a = ET.SubElement(p, "a")
231 if urlparse(target).scheme == "gemini":
232 a.attrib.update(href="/?" + urlencode({"url": target}))
233 else:
234 a.attrib.update(target="_blank", href=target)
235 a.text = splitted[1] if len(splitted) > 1 else target
236 flush_before.commit().write(ET.tostring(p) + b"\r\n")
237 elif line.startswith("###"):
238 h = ET.Element("h3")
239 h.text = line[3:]
240 flush_before.commit().write(ET.tostring(h) + b"\r\n")
241 elif line.startswith("##"):
242 h = ET.Element("h2")
243 h.text = line[2:]
244 flush_before.commit().write(ET.tostring(h) + b"\r\n")
245 elif line.startswith("#"):
246 h = ET.Element("h1")
247 h.text = line[1:]
248 flush_before.commit().write(ET.tostring(h) + b"\r\n")
249 elif line.startswith("* ") and line[2:].strip():
250 if ul.elem is None:
251 ul.elem = ET.Element("ul")
252 ET.SubElement(ul.elem, "li").text = line[2:].strip()
253 flush_before.cancel()
254 elif line.startswith("> ") and line[2:].strip():
255 blockquote = ET.Element("blockquote")
256 ET.SubElement(blockquote, "p").text = line[2:].strip()
257 flush_before.commit().write(ET.tostring(blockquote) + b"\r\n")
258 else:
259 if line:
260 p = ET.Element("p")
261 p.text = line
262 flush_before.commit().write(ET.tostring(p) + b"\r\n")
263 self.wfile.write(self.server.footer_file_bytes)
266 def _main():
267 parser = ArgumentParser()
268 parser.add_argument("--address", default="127.0.0.1", help="bind to this address (default: %(default)s)")
269 parser.add_argument("--port", default=8000, type=int, help="bind to this port (default: %(default)s)")
270 parser.add_argument("--header", required=True, help="path to `index.html.header`")
271 parser.add_argument("--footer", required=True, help="path to `index.html.footer`")
272 parser.add_argument("--icon", required=True , help="path to `favicon.ico`")
273 parser.add_argument("--css", required=True, help="path to `style.css`")
274 parser.add_argument("--robots", required=True, help="path to `robots.txt`")
275 args = parser.parse_args()
277 with _HTTPServer(
278 (args.address, args.port),
279 _RequestHandler,
280 header_file_path=args.header,
281 footer_file_path=args.footer,
282 icon_file_path=args.icon,
283 css_file_path=args.css,
284 robots_file_path=args.robots,
285 ) as http_server:
286 sock_host, sock_port = http_server.socket.getsockname()[:2]
287 print(f"HTTP server started ({sock_host}:{sock_port})...")
288 try:
289 http_server.serve_forever()
290 except KeyboardInterrupt:
291 print("\nKeyboard interrupt received, exiting.")
294 if __name__ == '__main__':
295 _main()