Blob


1 """Yet another http-to-gemini."""
2 import socket
3 import ssl
4 import xml.etree.ElementTree as ET
5 from argparse import ArgumentParser
6 from email.message import Message
7 from http import HTTPStatus
8 from http.server import HTTPServer, BaseHTTPRequestHandler
9 from urllib.parse import parse_qs, urlparse, urljoin, urlencode, uses_relative, uses_netloc
10 from contextlib import contextmanager
12 # for urljoin:
13 uses_relative.append("gemini")
14 uses_netloc.append("gemini")
17 def _build_navigation(url=None):
18 form = ET.Element("form")
19 form.attrib.update(method="get")
20 input_ = ET.SubElement(form, "input")
21 input_.attrib.update(
22 **{
23 "title": "url",
24 "type": "text",
25 "name": "url",
26 "placeholder": "gemini://",
27 "autocomplete": "off",
28 "size": "64",
29 }
30 )
31 if url:
32 input_.attrib.update(value=url)
33 input_ = ET.SubElement(form, "input")
34 input_.attrib.update(**{"type": "submit", "value": "go!"})
35 return ET.tostring(form) + b"\r\n"
38 class _HTTPServer(HTTPServer):
39 def __init__(
40 self,
41 *args,
42 header_file_path=None,
43 footer_file_path=None,
44 icon_file_path=None,
45 css_file_path=None,
46 **kwargs
47 ):
48 super().__init__(*args, **kwargs)
49 with open(header_file_path, "rb") as f:
50 self.header_file_bytes = f.read()
51 with open(footer_file_path, "rb") as f:
52 self.footer_file_bytes = f.read()
53 with open(icon_file_path, "rb") as f:
54 self.icon_file_bytes = f.read()
55 with open(css_file_path, "rb") as f:
56 self.css_file_bytes = f.read()
59 class _Elem:
60 def __init__(self, file):
61 self.elem = None
62 self._file = file
64 @contextmanager
65 def __call__(self):
66 yield self
67 self.flush()
69 def flush(self):
70 if self.elem is not None:
71 self._file.write(ET.tostring(self.elem) + b"\r\n")
72 self.elem = None
75 class _AutoFlush:
76 def __init__(self, elem):
77 self._elem = elem
79 def cancel(self):
80 self._elem = None
82 @contextmanager
83 def __call__(self):
84 yield self
85 if self._elem is not None:
86 self._elem.flush()
89 class _RequestHandler(BaseHTTPRequestHandler):
90 def _parse_path(self):
91 _, _, path, _, query, _ = urlparse(self.path)
92 return path, parse_qs(query) if query else {}
94 def do_GET(self):
95 path, query = self._parse_path()
96 if path in {"/index.html", "/index.htm", "/index", "/"}:
97 url = query.get("url", [None])[0]
98 if not url:
99 self.send_response(HTTPStatus.OK)
100 self.send_header("Content-type", "text/html")
101 self.end_headers()
102 self.wfile.write(
103 self.server.header_file_bytes.replace(b"$URL", b"yet another http-to-gemini")
105 self.wfile.write(_build_navigation())
106 self.wfile.write(self.server.footer_file_bytes)
107 return
109 try:
110 for _ in range(32):
111 parsed = urlparse(url)
112 if parsed.scheme != "gemini":
113 self.send_error(HTTPStatus.BAD_REQUEST, "Only gemini:// URLs are supported")
114 return
116 context = ssl.SSLContext()
117 context.check_hostname = False
118 context.verify_mode = ssl.CERT_NONE
119 with socket.create_connection((parsed.hostname, parsed.port or 1965)) as raw_s:
120 with context.wrap_socket(raw_s, server_hostname=parsed.hostname) as s:
121 s.sendall((url + '\r\n').encode("UTF-8"))
122 fp = s.makefile("rb")
123 splitted = fp.readline().decode("UTF-8").strip().split(maxsplit=1)
124 status = splitted[0]
125 if status.startswith("3") and len(splitted) == 2:
126 # redirect
127 url = urljoin(url, splitted[1])
128 continue
129 if not status.startswith("2"):
130 self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Unsupported answer: " + str(splitted))
131 return
132 mime = splitted[1].lower() if len(splitted) == 2 else "text/gemini"
133 if not mime.startswith("text/gemini"):
134 # return as-is
135 self.send_response(HTTPStatus.OK)
136 self.send_header("Content-type", mime)
137 self.end_headers()
138 self.wfile.write(fp.read())
139 return
140 m = Message()
141 m['content-type'] = mime
142 body = fp.read().decode(m.get_param('charset') or "UTF-8")
143 self._convert_gemini_to_html(url, body, mime)
144 return
145 break
146 else:
147 raise RuntimeError("Too many redirects")
148 except Exception as error:
149 self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(error))
150 return
152 if path == "/favicon.ico":
153 self.send_response(HTTPStatus.OK)
154 self.send_header("Content-type", "image/x-icon")
155 self.end_headers()
156 self.wfile.write(self.server.icon_file_bytes)
157 return
159 if path == "/style.css":
160 self.send_response(HTTPStatus.OK)
161 self.send_header("Content-type", "text/css")
162 self.end_headers()
163 self.wfile.write(self.server.css_file_bytes)
164 return
166 self.send_error(HTTPStatus.NOT_FOUND, "File not found")
168 def _convert_gemini_to_html(self, url, body, mime):
169 # convert gemini (body) to html
170 self.send_response(HTTPStatus.OK)
171 self.send_header("Content-type", mime.replace("gemini", "html"))
172 self.end_headers()
173 self.wfile.write(self.server.header_file_bytes.replace(b"$URL", url.encode()))
174 self.wfile.write(_build_navigation(url))
175 with _Elem(self.wfile)() as pre:
176 with _Elem(self.wfile)() as ul:
177 for line in body.splitlines():
178 with _AutoFlush(ul)() as auto_flush_ul:
179 if line.startswith("```"):
180 if pre.elem is None:
181 pre.elem = ET.Element("pre")
182 pre.elem.text = ""
183 else:
184 pre.flush()
185 elif pre.elem is not None:
186 if pre.elem.text:
187 pre.elem.text += "\r\n"
188 pre.elem.text += line
189 elif line.startswith("=>") and line[2:].strip():
190 p = ET.Element("p")
191 p.text = "=> "
192 splitted = line[2:].strip().split(maxsplit=1)
193 target = urljoin(url, splitted[0])
194 a = ET.SubElement(p, "a")
195 if urlparse(target).scheme == "gemini":
196 a.attrib.update(href="/?" + urlencode({"url": target}))
197 else:
198 a.attrib.update(target="_blank", href=target)
199 a.text = splitted[1] if len(splitted) > 1 else target
200 self.wfile.write(ET.tostring(p) + b"\r\n")
201 elif line.startswith("#") and len(line.split()) > 1:
202 splitted = line.split(maxsplit=1)
203 h = ET.Element("h" + str(len(splitted[0])))
204 h.text = splitted[1]
205 self.wfile.write(ET.tostring(h) + b"\r\n")
206 elif line.startswith("* ") and line[2:].strip():
207 if ul.elem is None:
208 ul.elem = ET.Element("ul")
209 ET.SubElement(ul.elem, "li").text = line[2:].strip()
210 auto_flush_ul.cancel()
211 elif line.startswith("> ") and line[2:].strip():
212 blockquote = ET.Element("blockquote")
213 ET.SubElement(blockquote, "p").text = line[2:].strip()
214 self.wfile.write(ET.tostring(blockquote) + b"\r\n")
215 else:
216 if line:
217 p = ET.Element("p")
218 p.text = line
219 self.wfile.write(ET.tostring(p) + b"\r\n")
220 self.wfile.write(self.server.footer_file_bytes)
223 def _main():
224 parser = ArgumentParser()
225 parser.add_argument("--address", default="127.0.0.1", help="bind to this address (default: %(default)s)")
226 parser.add_argument("--port", default=8000, type=int, help="bind to this port (default: %(default)s)")
227 parser.add_argument("--header", required=True, help="path to `index.html.header`")
228 parser.add_argument("--footer", required=True, help="path to `index.html.footer`")
229 parser.add_argument("--icon", required=True , help="path to `favicon.ico`")
230 parser.add_argument("--css", required=True, help="path to `style.css`")
231 args = parser.parse_args()
233 with _HTTPServer(
234 (args.address, args.port),
235 _RequestHandler,
236 header_file_path=args.header,
237 footer_file_path=args.footer,
238 icon_file_path=args.icon,
239 css_file_path=args.css,
240 ) as http_server:
241 sock_host, sock_port = http_server.socket.getsockname()[:2]
242 print(f"HTTP server started ({sock_host}:{sock_port})...")
243 try:
244 http_server.serve_forever()
245 except KeyboardInterrupt:
246 print("\nKeyboard interrupt received, exiting.")
249 if __name__ == '__main__':
250 _main()