Commit Diff


commit - 3ac0752c060d3243f4fabbfb6627e2785a0ebc05
commit + 96d6ed3742822eccf1e51cd5a5a4a68fd2757fa8
blob - c9fff8bed243e98a938761c0d7bfa7e6515938ef
blob + e9250e9d1be065e3774bfb68f294a94a6fb82923
--- avk.py
+++ avk.py
@@ -1,9 +1,13 @@
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta, timezone
 from http.cookiejar import CookieJar, Cookie
 from urllib.request import build_opener, HTTPCookieProcessor
+from urllib.parse import urljoin, urlunparse, parse_qs, urlparse
 from html.parser import HTMLParser
-from dataclasses import dataclass, field
-from urllib.parse import urljoin
-from datetime import datetime
+from http import HTTPStatus
+from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
+
+import xml.etree.ElementTree as ET
 from json import loads as json_loads
 
 
@@ -48,27 +52,67 @@ _MONTHS = {
     )
 }
 
+_NUMERAL_MINUTES = {
+    name: i + 1
+    for i, name in enumerate(
+        ("однa", "две", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять")
+    )
+}
 
+_NUMERAL_HOURS = {
+    name: i + 1
+    for i, name in enumerate(
+        ("один", "два", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять")
+    )
+}
+
+_TZ = timezone(timedelta(hours=3), name="Moscow")
+
+def _get_now():
+    return datetime.now(tz=_TZ)
+
+
 def _parse_date(value: str):
     splitted = value.split()
-    if splitted[0].isdecimal():
+
+    if splitted[0].isdecimal() and splitted[1] in _MONTHS:
         day = int(splitted[0])
         month = _MONTHS[splitted[1]]
+
         if len(splitted) == 3:
             year = int(splitted[2])
-            return datetime(year=year, month=month, day=day)
-        elif len(splitted) == 4 and splitted[2] == "в":
-            now = datetime.now()
+            return datetime(year=year, month=month, day=day, tzinfo=_TZ)
+
+        if len(splitted) == 4 and splitted[2] == "в":
+            now = _get_now()
             year = now.year if month <= now.month else (now.year - 1)
             hour, minute = [int(i) for i in  splitted[3].split(":")]
-            return datetime(year=year, month=month, day=day, hour=hour, minute=minute)
-        else:
-            raise NotImplementedError()
-    else:
-        raise NotImplementedError()
-    return value
+            return datetime(year=year, month=month, day=day, hour=hour, minute=minute, tzinfo=_TZ)
 
+        raise NotImplementedError(value)
 
+    if splitted == ["минуту", "назад"]:
+        return _get_now() - timedelta(minutes=1)
+    if splitted[-1] == "назад" and splitted[-2].startswith("минут"):
+        return _get_now() - timedelta(minutes=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_MINUTES[splitted[0]])
+    if splitted == ["час", "назад"]:
+        return _get_now() - timedelta(hours=1)
+    if splitted[-1] == "назад" and splitted[-2].startswith("час"):
+        return _get_now() - timedelta(hours=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_HOURS[splitted[0]])
+
+    if len(splitted) == 3 and splitted[0:2] == ["сегодня", "в"]:
+        now = _get_now() - timedelta()
+        hour, minute = [int(i) for i in  splitted[2].split(":")]
+        return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ)
+
+    if len(splitted) == 3 and splitted[0:2] == ["вчера", "в"]:
+        now = _get_now() - timedelta(days=1)
+        hour, minute = [int(i) for i in  splitted[2].split(":")]
+        return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ)
+
+    raise NotImplementedError(value)
+
+
 @dataclass()
 class Text:
     data: str
@@ -111,9 +155,10 @@ class Entry:
 
 
 class _HTMLParser(HTMLParser):
-    def __init__(self, base_url, *args, **kwargs):
+    def __init__(self, base_url, *args, debug_print=False, **kwargs):
         super().__init__(*args, **kwargs)
         self.__base_url = base_url
+        self.__debug_print = debug_print
         self.__stack = []
         self.__harvested = []
         self.__entry = Entry()
@@ -121,18 +166,20 @@ class _HTMLParser(HTMLParser):
 
     def handle_starttag(self, tag, attrs):
         self.__current_attrs = {pair[0]: pair[1] for pair in attrs}
+
+        def _append():
+            self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
+            if self.__debug_print:
+                print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+
         if not self.__stack:
             if tag == "div" and self.__current_attrs.get("class", "").startswith("wall_item "):
                 self.__entry = Entry()
-
-                self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
-                # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+                _append()
         else:
             if tag == "a":
                 if self.__current_attrs.get("class", "").startswith("PostHeaderTime"):
-                    assert not self.__entry.updated, self.__entry
                     self.__entry.updated = _parse_date(self.__current_attrs["data-date"])
-                    assert not self.__entry.url, self.__entry
                     self.__entry.url = urljoin(self.__base_url, self.__current_attrs["href"])
                 elif self.__current_attrs.get("class", "") == "MediaGrid__interactive":
                     self.__entry.content.append(LineBreak())
@@ -143,15 +190,16 @@ class _HTMLParser(HTMLParser):
                             urljoin(self.__base_url, self.__current_attrs["href"])
                         )
                     )
-                elif self.__stack[-1][0] == "div" and self.__stack[-1][1].startswith("thumb_map"):
-                    self.__entry.content.append(LineBreak())
-                    self.__entry.content.append(
-                        Link.from_(
-                            self.__base_url,
-                            self.__current_attrs,
-                            urljoin(self.__base_url, self.__current_attrs["href"])
+                elif self.__stack[-1][0] == "div":
+                    if self.__stack[-1][1].startswith("thumb_map") or self.__stack[-1][1] == "AttachmentsList":
+                        self.__entry.content.append(LineBreak())
+                        self.__entry.content.append(
+                            Link.from_(
+                                self.__base_url,
+                                self.__current_attrs,
+                                urljoin(self.__base_url, self.__current_attrs["href"])
+                            )
                         )
-                    )
             elif tag == "br":
                 self.__entry.content.append(LineBreak())
             elif tag == "img" and self.__current_attrs.get("class") in {"PhotoPrimaryAttachment__imageElement", "MediaGrid__imageElement"}:
@@ -159,6 +207,10 @@ class _HTMLParser(HTMLParser):
                 self.__entry.content.append(Image(url=self.__current_attrs["src"]))
             elif tag == "div" and self.__current_attrs.get("class") == "pic_body_wrap":
                 self.__entry.content.append(HorizontalRule())
+            elif tag == "img" and self.__current_attrs.get("class") == "emoji":
+                data = self.__current_attrs.get("alt")
+                if data:
+                    self._on_text(data)
             elif tag == "div" and self.__current_attrs.get("class", "").startswith("LinkSnippetPrimaryAttachmentReactBlockMVK__root"):
                 data_exec = self.__current_attrs.get("data-exec")
                 if data_exec:
@@ -168,10 +220,8 @@ class _HTMLParser(HTMLParser):
                             self.__entry.content.append(LineBreak())
                             self.__entry.content.append(Link.from_(self.__base_url, {"href": url}, url))
                             break
+            _append()
 
-            self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
-            # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
-
     def handle_endtag(self, tag):
         if self.__stack:
             while self.__stack:
@@ -179,42 +229,112 @@ class _HTMLParser(HTMLParser):
                 if stack_tag == tag:
                     break
             if not self.__stack:
-                assert self.__entry.updated, self.__entry
-                assert self.__entry.url, self.__entry
                 self.__harvested.append(self.__entry)
         self.__current_attrs = {}
 
     def handle_data(self, data):
         if self.__stack and data.strip():
             if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]:
-                if self.__entry.content and isinstance(self.__entry.content[-1], Text):
-                    self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data
-                else:
-                    self.__entry.content.append(Text(data=data))
-            elif self.__stack[-1][0] == "a":
+                self._on_text(data)
+            elif self.__stack[-1:] == [("img", "emoji")] or self.__stack[-2:] ==[("img", "emoji"), ("span", "")]:
+                self._on_text(data)
+            elif self.__stack[-1][0] in {"a", "img"}:
                 if self.__stack[-2:-1] == [("div", "pi_text")] or self.__stack[-3:-1] == [("div", "pi_text"), ("span", "")]:
                     title = self.__current_attrs.get("title", "")
                     self.__entry.content.append(Link.from_(self.__base_url, self.__current_attrs, data))
-            # print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}")
+            if self.__debug_print:
+                print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}")
 
     def feed(self, *args, **kwargs):
         super().feed(*args, **kwargs)
+        if not self.__harvested:
+            raise RuntimeError("No entries found in feed")
         return self.__harvested
 
+    def _on_text(self, data):
+        if self.__entry.content and isinstance(self.__entry.content[-1], Text):
+            self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data
+        else:
+            self.__entry.content.append(Text(data=data))
 
-for base_url in (
-    "https://m.vk.com/2sort",
-    "https://m.vk.com/poreziknasobachke",
-    "https://m.vk.com/andrushaewa",
-    "https://m.vk.com/sagolik",
-    "https://m.vk.com/ryabovpetrvladimirovich",
-    "https://m.vk.com/id71430449",
-    "https://m.vk.com/doodletimeru",
-    "https://m.vk.com/shantynatty",
-):
-    # print("*" * 80)
+
+class _HTTPRequestHandler(BaseHTTPRequestHandler):
+    def do_GET(self):
+        try:
+            _response_fn = self._do_GET()
+        except Exception as error:
+            self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(error))
+            return
+
+        _response_fn()
+
+    def _do_GET(self):
+        _, _, path, _, qs, _ = urlparse(self.path)
+        query = parse_qs(qs) if qs else {}
+        if path == "/robots.txt":
+            return self._return_robots_txt
+
+        if path == "/":
+            name = query.get("name", [None])[0]
+            if name:
+                base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
+                feed = ET.Element("feed")
+                feed.attrib.update(xmlns="http://www.w3.org/2005/Atom")
+                ET.SubElement(feed, "title").text = base_url
+                ET.SubElement(feed, "id").text = base_url
+                ET.SubElement(feed, "link").attrib.update(rel="self", href=self.path)
+                harvested = _HTMLParser(base_url).feed(_get_html(base_url))
+                ET.SubElement(feed, "updated").text = max(entry.updated for entry in harvested).isoformat()
+                return lambda: self._return_xml(ET.ElementTree(feed))
+
+        return self._return_not_found
+
+    def _return_xml(self, tree):
+        self.send_response(HTTPStatus.OK)
+        self.send_header("Content-type", "text/xml")
+        self.end_headers()
+        ET.indent(tree)
+        tree.write(self.wfile, xml_declaration=True, encoding="utf-8")
+
+    def _return_robots_txt(self):
+        self.send_response(HTTPStatus.OK)
+        self.send_header("Content-type", "text/plain")
+        self.end_headers()
+        self.wfile.write(b"User-agent: *\nDisallow: /")
+
+    def _return_not_found(self):
+        self.send_error(HTTPStatus.NOT_FOUND, "Path not found")
+
+
+def _run_http_server(address: str, port: int):
+    with ThreadingHTTPServer((address, port), _HTTPRequestHandler) as http_server:
+        sock_host, sock_port = http_server.socket.getsockname()[:2]
+        print(f"HTTP server started ({sock_host}:{sock_port})...")
+        try:
+            http_server.serve_forever()
+        except KeyboardInterrupt:
+            print("\nKeyboard interrupt received, exiting.")
+
+
+def _run_dump(name: str):
+    base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
+    print("=" * 80)
     print(base_url)
-    harvested = _HTMLParser(base_url).feed(_get_html(base_url))
-    print("\n".join(str(entry.updated) for entry in harvested))
-    # print("\n".join(str(entry) for entry in harvested))
-    # print(base_url)
+    harvested = _HTMLParser(base_url, debug_print=True).feed(_get_html(base_url))
+    print("=" * 80)
+    print("\n".join(str(entry) for entry in harvested))
+    print(base_url)
+
+
+if __name__ == "__main__":
+    from argparse import ArgumentParser
+
+    parser = ArgumentParser()
+    parser.add_argument("--address", default="127.0.0.1", help="bind to this address (default: %(default)s)")
+    parser.add_argument("--port", default=8000, type=int, help="bind to this port (default: %(default)s)")
+    parser.add_argument("--dump", help="[special mode]: dumping parsed VK page specified by name")
+    args = parser.parse_args()
+    if args.dump:
+        _run_dump(args.dump)
+    else:
+        _run_http_server(args.address, args.port)