commit 96d6ed3742822eccf1e51cd5a5a4a68fd2757fa8 from: Aleksey Ryndin date: Thu Jan 16 14:42:44 2025 UTC Add `updated` parser, HTTP server commit - 3ac0752c060d3243f4fabbfb6627e2785a0ebc05 commit + 96d6ed3742822eccf1e51cd5a5a4a68fd2757fa8 blob - c9fff8bed243e98a938761c0d7bfa7e6515938ef blob + e9250e9d1be065e3774bfb68f294a94a6fb82923 --- avk.py +++ avk.py @@ -1,9 +1,13 @@ +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone from http.cookiejar import CookieJar, Cookie from urllib.request import build_opener, HTTPCookieProcessor +from urllib.parse import urljoin, urlunparse, parse_qs, urlparse from html.parser import HTMLParser -from dataclasses import dataclass, field -from urllib.parse import urljoin -from datetime import datetime +from http import HTTPStatus +from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler + +import xml.etree.ElementTree as ET from json import loads as json_loads @@ -48,27 +52,67 @@ _MONTHS = { ) } +_NUMERAL_MINUTES = { + name: i + 1 + for i, name in enumerate( + ("однa", "две", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять") + ) +} +_NUMERAL_HOURS = { + name: i + 1 + for i, name in enumerate( + ("один", "два", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять") + ) +} + +_TZ = timezone(timedelta(hours=3), name="Moscow") + +def _get_now(): + return datetime.now(tz=_TZ) + + def _parse_date(value: str): splitted = value.split() - if splitted[0].isdecimal(): + + if splitted[0].isdecimal() and splitted[1] in _MONTHS: day = int(splitted[0]) month = _MONTHS[splitted[1]] + if len(splitted) == 3: year = int(splitted[2]) - return datetime(year=year, month=month, day=day) - elif len(splitted) == 4 and splitted[2] == "в": - now = datetime.now() + return datetime(year=year, month=month, day=day, tzinfo=_TZ) + + if len(splitted) == 4 and splitted[2] == "в": + now = _get_now() year = now.year if month <= now.month else (now.year - 1) hour, minute = [int(i) for i in splitted[3].split(":")] - return datetime(year=year, month=month, day=day, hour=hour, minute=minute) - else: - raise NotImplementedError() - else: - raise NotImplementedError() - return value + return datetime(year=year, month=month, day=day, hour=hour, minute=minute, tzinfo=_TZ) + raise NotImplementedError(value) + if splitted == ["минуту", "назад"]: + return _get_now() - timedelta(minutes=1) + if splitted[-1] == "назад" and splitted[-2].startswith("минут"): + return _get_now() - timedelta(minutes=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_MINUTES[splitted[0]]) + if splitted == ["час", "назад"]: + return _get_now() - timedelta(hours=1) + if splitted[-1] == "назад" and splitted[-2].startswith("час"): + return _get_now() - timedelta(hours=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_HOURS[splitted[0]]) + + if len(splitted) == 3 and splitted[0:2] == ["сегодня", "в"]: + now = _get_now() - timedelta() + hour, minute = [int(i) for i in splitted[2].split(":")] + return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ) + + if len(splitted) == 3 and splitted[0:2] == ["вчера", "в"]: + now = _get_now() - timedelta(days=1) + hour, minute = [int(i) for i in splitted[2].split(":")] + return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ) + + raise NotImplementedError(value) + + @dataclass() class Text: data: str @@ -111,9 +155,10 @@ class Entry: class _HTMLParser(HTMLParser): - def __init__(self, base_url, *args, **kwargs): + def __init__(self, base_url, *args, debug_print=False, **kwargs): super().__init__(*args, **kwargs) self.__base_url = base_url + self.__debug_print = debug_print self.__stack = [] self.__harvested = [] self.__entry = Entry() @@ -121,18 +166,20 @@ class _HTMLParser(HTMLParser): def handle_starttag(self, tag, attrs): self.__current_attrs = {pair[0]: pair[1] for pair in attrs} + + def _append(): + self.__stack.append((tag, self.__current_attrs.get("class", "").strip())) + if self.__debug_print: + print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}") + if not self.__stack: if tag == "div" and self.__current_attrs.get("class", "").startswith("wall_item "): self.__entry = Entry() - - self.__stack.append((tag, self.__current_attrs.get("class", "").strip())) - # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}") + _append() else: if tag == "a": if self.__current_attrs.get("class", "").startswith("PostHeaderTime"): - assert not self.__entry.updated, self.__entry self.__entry.updated = _parse_date(self.__current_attrs["data-date"]) - assert not self.__entry.url, self.__entry self.__entry.url = urljoin(self.__base_url, self.__current_attrs["href"]) elif self.__current_attrs.get("class", "") == "MediaGrid__interactive": self.__entry.content.append(LineBreak()) @@ -143,15 +190,16 @@ class _HTMLParser(HTMLParser): urljoin(self.__base_url, self.__current_attrs["href"]) ) ) - elif self.__stack[-1][0] == "div" and self.__stack[-1][1].startswith("thumb_map"): - self.__entry.content.append(LineBreak()) - self.__entry.content.append( - Link.from_( - self.__base_url, - self.__current_attrs, - urljoin(self.__base_url, self.__current_attrs["href"]) + elif self.__stack[-1][0] == "div": + if self.__stack[-1][1].startswith("thumb_map") or self.__stack[-1][1] == "AttachmentsList": + self.__entry.content.append(LineBreak()) + self.__entry.content.append( + Link.from_( + self.__base_url, + self.__current_attrs, + urljoin(self.__base_url, self.__current_attrs["href"]) + ) ) - ) elif tag == "br": self.__entry.content.append(LineBreak()) elif tag == "img" and self.__current_attrs.get("class") in {"PhotoPrimaryAttachment__imageElement", "MediaGrid__imageElement"}: @@ -159,6 +207,10 @@ class _HTMLParser(HTMLParser): self.__entry.content.append(Image(url=self.__current_attrs["src"])) elif tag == "div" and self.__current_attrs.get("class") == "pic_body_wrap": self.__entry.content.append(HorizontalRule()) + elif tag == "img" and self.__current_attrs.get("class") == "emoji": + data = self.__current_attrs.get("alt") + if data: + self._on_text(data) elif tag == "div" and self.__current_attrs.get("class", "").startswith("LinkSnippetPrimaryAttachmentReactBlockMVK__root"): data_exec = self.__current_attrs.get("data-exec") if data_exec: @@ -168,10 +220,8 @@ class _HTMLParser(HTMLParser): self.__entry.content.append(LineBreak()) self.__entry.content.append(Link.from_(self.__base_url, {"href": url}, url)) break + _append() - self.__stack.append((tag, self.__current_attrs.get("class", "").strip())) - # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}") - def handle_endtag(self, tag): if self.__stack: while self.__stack: @@ -179,42 +229,112 @@ class _HTMLParser(HTMLParser): if stack_tag == tag: break if not self.__stack: - assert self.__entry.updated, self.__entry - assert self.__entry.url, self.__entry self.__harvested.append(self.__entry) self.__current_attrs = {} def handle_data(self, data): if self.__stack and data.strip(): if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]: - if self.__entry.content and isinstance(self.__entry.content[-1], Text): - self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data - else: - self.__entry.content.append(Text(data=data)) - elif self.__stack[-1][0] == "a": + self._on_text(data) + elif self.__stack[-1:] == [("img", "emoji")] or self.__stack[-2:] ==[("img", "emoji"), ("span", "")]: + self._on_text(data) + elif self.__stack[-1][0] in {"a", "img"}: if self.__stack[-2:-1] == [("div", "pi_text")] or self.__stack[-3:-1] == [("div", "pi_text"), ("span", "")]: title = self.__current_attrs.get("title", "") self.__entry.content.append(Link.from_(self.__base_url, self.__current_attrs, data)) - # print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}") + if self.__debug_print: + print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}") def feed(self, *args, **kwargs): super().feed(*args, **kwargs) + if not self.__harvested: + raise RuntimeError("No entries found in feed") return self.__harvested + def _on_text(self, data): + if self.__entry.content and isinstance(self.__entry.content[-1], Text): + self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data + else: + self.__entry.content.append(Text(data=data)) -for base_url in ( - "https://m.vk.com/2sort", - "https://m.vk.com/poreziknasobachke", - "https://m.vk.com/andrushaewa", - "https://m.vk.com/sagolik", - "https://m.vk.com/ryabovpetrvladimirovich", - "https://m.vk.com/id71430449", - "https://m.vk.com/doodletimeru", - "https://m.vk.com/shantynatty", -): - # print("*" * 80) + +class _HTTPRequestHandler(BaseHTTPRequestHandler): + def do_GET(self): + try: + _response_fn = self._do_GET() + except Exception as error: + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(error)) + return + + _response_fn() + + def _do_GET(self): + _, _, path, _, qs, _ = urlparse(self.path) + query = parse_qs(qs) if qs else {} + if path == "/robots.txt": + return self._return_robots_txt + + if path == "/": + name = query.get("name", [None])[0] + if name: + base_url = urlunparse(("https", "m.vk.com", name, "", "", "")) + feed = ET.Element("feed") + feed.attrib.update(xmlns="http://www.w3.org/2005/Atom") + ET.SubElement(feed, "title").text = base_url + ET.SubElement(feed, "id").text = base_url + ET.SubElement(feed, "link").attrib.update(rel="self", href=self.path) + harvested = _HTMLParser(base_url).feed(_get_html(base_url)) + ET.SubElement(feed, "updated").text = max(entry.updated for entry in harvested).isoformat() + return lambda: self._return_xml(ET.ElementTree(feed)) + + return self._return_not_found + + def _return_xml(self, tree): + self.send_response(HTTPStatus.OK) + self.send_header("Content-type", "text/xml") + self.end_headers() + ET.indent(tree) + tree.write(self.wfile, xml_declaration=True, encoding="utf-8") + + def _return_robots_txt(self): + self.send_response(HTTPStatus.OK) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(b"User-agent: *\nDisallow: /") + + def _return_not_found(self): + self.send_error(HTTPStatus.NOT_FOUND, "Path not found") + + +def _run_http_server(address: str, port: int): + with ThreadingHTTPServer((address, port), _HTTPRequestHandler) as http_server: + sock_host, sock_port = http_server.socket.getsockname()[:2] + print(f"HTTP server started ({sock_host}:{sock_port})...") + try: + http_server.serve_forever() + except KeyboardInterrupt: + print("\nKeyboard interrupt received, exiting.") + + +def _run_dump(name: str): + base_url = urlunparse(("https", "m.vk.com", name, "", "", "")) + print("=" * 80) print(base_url) - harvested = _HTMLParser(base_url).feed(_get_html(base_url)) - print("\n".join(str(entry.updated) for entry in harvested)) - # print("\n".join(str(entry) for entry in harvested)) - # print(base_url) + harvested = _HTMLParser(base_url, debug_print=True).feed(_get_html(base_url)) + print("=" * 80) + print("\n".join(str(entry) for entry in harvested)) + print(base_url) + + +if __name__ == "__main__": + from argparse import ArgumentParser + + parser = ArgumentParser() + parser.add_argument("--address", default="127.0.0.1", help="bind to this address (default: %(default)s)") + parser.add_argument("--port", default=8000, type=int, help="bind to this port (default: %(default)s)") + parser.add_argument("--dump", help="[special mode]: dumping parsed VK page specified by name") + args = parser.parse_args() + if args.dump: + _run_dump(args.dump) + else: + _run_http_server(args.address, args.port)