Commit Diff


commit - 96d6ed3742822eccf1e51cd5a5a4a68fd2757fa8
commit + 5d0d11f445413917dffb40a2a259af37d9ccf8e5
blob - e9250e9d1be065e3774bfb68f294a94a6fb82923
blob + 2d673a073775178ed19a8a9df063e0a016db7f0c
--- avk.py
+++ avk.py
@@ -11,6 +11,9 @@ import xml.etree.ElementTree as ET
 from json import loads as json_loads
 
 
+_AVK_VERSION = "0.1"
+
+
 def _get_html(base_url: str):
     cookiejar = CookieJar()
     opener = build_opener(HTTPCookieProcessor(cookiejar))
@@ -113,11 +116,11 @@ def _parse_date(value: str):
     raise NotImplementedError(value)
 
 
-@dataclass()
+@dataclass(kw_only=True)
 class Text:
     data: str
 
-@dataclass()
+@dataclass(kw_only=True, frozen=True)
 class Link:
     data: str
     url: str
@@ -134,37 +137,58 @@ class Link:
 
         return cls(data=data, url=href)
 
-@dataclass()
+@dataclass(kw_only=True, frozen=True)
 class LineBreak:
     pass
 
-@dataclass()
+@dataclass(kw_only=True, frozen=True)
 class HorizontalRule:
     pass
 
-@dataclass()
+@dataclass(kw_only=True, frozen=True)
 class Image:
     url: str
 
 
-@dataclass()
+@dataclass(kw_only=True)
 class Entry:
     updated: datetime | None = None
     url: str = ""
-    content: list = field(default_factory=list)
+    content: list[Text | Link | LineBreak | HorizontalRule | Image] = field(default_factory=list)
 
 
+@dataclass(kw_only=True)
+class Feed:
+    url: str
+    _title: str  = ""
+    entries: list[Entry] = field(default_factory=list)
+
+    def get_title(self) -> str:
+        return self._title or self.url
+
+    def is_title_filled(self) -> bool:
+        return self._title
+
+    def set_title(self, title: str):
+        self._title = title
+
+    def get_updated(self) -> datetime:
+        return max(entry.updated for entry in self.entries)
+
+
 class _HTMLParser(HTMLParser):
-    def __init__(self, base_url, *args, debug_print=False, **kwargs):
+    def __init__(self, base_url: str, *args, debug_print: bool =False, **kwargs):
         super().__init__(*args, **kwargs)
         self.__base_url = base_url
         self.__debug_print = debug_print
         self.__stack = []
-        self.__harvested = []
+        self.__feed = Feed(url=base_url)
         self.__entry = Entry()
+        self.__current_tag = None
         self.__current_attrs = {}
 
     def handle_starttag(self, tag, attrs):
+        self.__current_tag = tag
         self.__current_attrs = {pair[0]: pair[1] for pair in attrs}
 
         def _append():
@@ -229,10 +253,13 @@ class _HTMLParser(HTMLParser):
                 if stack_tag == tag:
                     break
             if not self.__stack:
-                self.__harvested.append(self.__entry)
+                self.__feed.entries.append(self.__entry)
+        self.__current_tag = None
         self.__current_attrs = {}
 
     def handle_data(self, data):
+        if self.__current_tag == "title" and not self.__feed.is_title_filled():
+            self.__feed.set_title(data)
         if self.__stack and data.strip():
             if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]:
                 self._on_text(data)
@@ -247,9 +274,9 @@ class _HTMLParser(HTMLParser):
 
     def feed(self, *args, **kwargs):
         super().feed(*args, **kwargs)
-        if not self.__harvested:
+        if not self.__feed.entries:
             raise RuntimeError("No entries found in feed")
-        return self.__harvested
+        return self.__feed
 
     def _on_text(self, data):
         if self.__entry.content and isinstance(self.__entry.content[-1], Text):
@@ -278,23 +305,31 @@ class _HTTPRequestHandler(BaseHTTPRequestHandler):
             name = query.get("name", [None])[0]
             if name:
                 base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
-                feed = ET.Element("feed")
-                feed.attrib.update(xmlns="http://www.w3.org/2005/Atom")
-                ET.SubElement(feed, "title").text = base_url
-                ET.SubElement(feed, "id").text = base_url
-                ET.SubElement(feed, "link").attrib.update(rel="self", href=self.path)
-                harvested = _HTMLParser(base_url).feed(_get_html(base_url))
-                ET.SubElement(feed, "updated").text = max(entry.updated for entry in harvested).isoformat()
-                return lambda: self._return_xml(ET.ElementTree(feed))
+                feed = _HTMLParser(base_url).feed(_get_html(base_url))
 
+                xml_tree = ET.Element("feed")
+                xml_tree.attrib.update(xmlns="http://www.w3.org/2005/Atom")
+                ET.SubElement(xml_tree, "title").text = feed.get_title()
+                ET.SubElement(xml_tree, "id").text = feed.url
+                ET.SubElement(xml_tree, "link").attrib.update(rel="self", href=self.path)
+                ET.SubElement(xml_tree, "updated").text = feed.get_updated().isoformat()
+                generator = ET.SubElement(xml_tree, "generator")
+                generator.attrib.update(
+                    uri="https://got.any-key.press/?action=summary&path=avk.git",
+                    version=_AVK_VERSION
+                )
+                generator.text = "Anonymous Atom feed generator (by VK)"
+
+                return lambda: self._return_xml(ET.ElementTree(xml_tree))
+
         return self._return_not_found
 
-    def _return_xml(self, tree):
+    def _return_xml(self, xml_tree):
         self.send_response(HTTPStatus.OK)
         self.send_header("Content-type", "text/xml")
         self.end_headers()
-        ET.indent(tree)
-        tree.write(self.wfile, xml_declaration=True, encoding="utf-8")
+        ET.indent(xml_tree)
+        xml_tree.write(self.wfile, xml_declaration=True, encoding="utf-8")
 
     def _return_robots_txt(self):
         self.send_response(HTTPStatus.OK)
@@ -320,9 +355,9 @@ def _run_dump(name: str):
     base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
     print("=" * 80)
     print(base_url)
-    harvested = _HTMLParser(base_url, debug_print=True).feed(_get_html(base_url))
+    feed = _HTMLParser(base_url, debug_print=True).feed(_get_html(base_url))
     print("=" * 80)
-    print("\n".join(str(entry) for entry in harvested))
+    print("\n".join(str(entry) for entry in feed.entries))
     print(base_url)