commit - 3ac0752c060d3243f4fabbfb6627e2785a0ebc05
commit + 96d6ed3742822eccf1e51cd5a5a4a68fd2757fa8
blob - c9fff8bed243e98a938761c0d7bfa7e6515938ef
blob + e9250e9d1be065e3774bfb68f294a94a6fb82923
--- avk.py
+++ avk.py
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta, timezone
from http.cookiejar import CookieJar, Cookie
from urllib.request import build_opener, HTTPCookieProcessor
+from urllib.parse import urljoin, urlunparse, parse_qs, urlparse
from html.parser import HTMLParser
-from dataclasses import dataclass, field
-from urllib.parse import urljoin
-from datetime import datetime
+from http import HTTPStatus
+from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
+
+import xml.etree.ElementTree as ET
from json import loads as json_loads
)
}
+_NUMERAL_MINUTES = {
+ name: i + 1
+ for i, name in enumerate(
+ ("однa", "две", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять")
+ )
+}
+_NUMERAL_HOURS = {
+ name: i + 1
+ for i, name in enumerate(
+ ("один", "два", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять")
+ )
+}
+
+_TZ = timezone(timedelta(hours=3), name="Moscow")
+
+def _get_now():
+ return datetime.now(tz=_TZ)
+
+
def _parse_date(value: str):
splitted = value.split()
- if splitted[0].isdecimal():
+
+ if splitted[0].isdecimal() and splitted[1] in _MONTHS:
day = int(splitted[0])
month = _MONTHS[splitted[1]]
+
if len(splitted) == 3:
year = int(splitted[2])
- return datetime(year=year, month=month, day=day)
- elif len(splitted) == 4 and splitted[2] == "в":
- now = datetime.now()
+ return datetime(year=year, month=month, day=day, tzinfo=_TZ)
+
+ if len(splitted) == 4 and splitted[2] == "в":
+ now = _get_now()
year = now.year if month <= now.month else (now.year - 1)
hour, minute = [int(i) for i in splitted[3].split(":")]
- return datetime(year=year, month=month, day=day, hour=hour, minute=minute)
- else:
- raise NotImplementedError()
- else:
- raise NotImplementedError()
- return value
+ return datetime(year=year, month=month, day=day, hour=hour, minute=minute, tzinfo=_TZ)
+ raise NotImplementedError(value)
+ if splitted == ["минуту", "назад"]:
+ return _get_now() - timedelta(minutes=1)
+ if splitted[-1] == "назад" and splitted[-2].startswith("минут"):
+ return _get_now() - timedelta(minutes=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_MINUTES[splitted[0]])
+ if splitted == ["час", "назад"]:
+ return _get_now() - timedelta(hours=1)
+ if splitted[-1] == "назад" and splitted[-2].startswith("час"):
+ return _get_now() - timedelta(hours=int(splitted[0]) if splitted[0].isdecimal() else _NUMERAL_HOURS[splitted[0]])
+
+ if len(splitted) == 3 and splitted[0:2] == ["сегодня", "в"]:
+ now = _get_now() - timedelta()
+ hour, minute = [int(i) for i in splitted[2].split(":")]
+ return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ)
+
+ if len(splitted) == 3 and splitted[0:2] == ["вчера", "в"]:
+ now = _get_now() - timedelta(days=1)
+ hour, minute = [int(i) for i in splitted[2].split(":")]
+ return datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=_TZ)
+
+ raise NotImplementedError(value)
+
+
@dataclass()
class Text:
data: str
class _HTMLParser(HTMLParser):
- def __init__(self, base_url, *args, **kwargs):
+ def __init__(self, base_url, *args, debug_print=False, **kwargs):
super().__init__(*args, **kwargs)
self.__base_url = base_url
+ self.__debug_print = debug_print
self.__stack = []
self.__harvested = []
self.__entry = Entry()
def handle_starttag(self, tag, attrs):
self.__current_attrs = {pair[0]: pair[1] for pair in attrs}
+
+ def _append():
+ self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
+ if self.__debug_print:
+ print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+
if not self.__stack:
if tag == "div" and self.__current_attrs.get("class", "").startswith("wall_item "):
self.__entry = Entry()
-
- self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
- # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+ _append()
else:
if tag == "a":
if self.__current_attrs.get("class", "").startswith("PostHeaderTime"):
- assert not self.__entry.updated, self.__entry
self.__entry.updated = _parse_date(self.__current_attrs["data-date"])
- assert not self.__entry.url, self.__entry
self.__entry.url = urljoin(self.__base_url, self.__current_attrs["href"])
elif self.__current_attrs.get("class", "") == "MediaGrid__interactive":
self.__entry.content.append(LineBreak())
urljoin(self.__base_url, self.__current_attrs["href"])
)
)
- elif self.__stack[-1][0] == "div" and self.__stack[-1][1].startswith("thumb_map"):
- self.__entry.content.append(LineBreak())
- self.__entry.content.append(
- Link.from_(
- self.__base_url,
- self.__current_attrs,
- urljoin(self.__base_url, self.__current_attrs["href"])
+ elif self.__stack[-1][0] == "div":
+ if self.__stack[-1][1].startswith("thumb_map") or self.__stack[-1][1] == "AttachmentsList":
+ self.__entry.content.append(LineBreak())
+ self.__entry.content.append(
+ Link.from_(
+ self.__base_url,
+ self.__current_attrs,
+ urljoin(self.__base_url, self.__current_attrs["href"])
+ )
)
- )
elif tag == "br":
self.__entry.content.append(LineBreak())
elif tag == "img" and self.__current_attrs.get("class") in {"PhotoPrimaryAttachment__imageElement", "MediaGrid__imageElement"}:
self.__entry.content.append(Image(url=self.__current_attrs["src"]))
elif tag == "div" and self.__current_attrs.get("class") == "pic_body_wrap":
self.__entry.content.append(HorizontalRule())
+ elif tag == "img" and self.__current_attrs.get("class") == "emoji":
+ data = self.__current_attrs.get("alt")
+ if data:
+ self._on_text(data)
elif tag == "div" and self.__current_attrs.get("class", "").startswith("LinkSnippetPrimaryAttachmentReactBlockMVK__root"):
data_exec = self.__current_attrs.get("data-exec")
if data_exec:
self.__entry.content.append(LineBreak())
self.__entry.content.append(Link.from_(self.__base_url, {"href": url}, url))
break
+ _append()
- self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
- # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
-
def handle_endtag(self, tag):
if self.__stack:
while self.__stack:
if stack_tag == tag:
break
if not self.__stack:
- assert self.__entry.updated, self.__entry
- assert self.__entry.url, self.__entry
self.__harvested.append(self.__entry)
self.__current_attrs = {}
def handle_data(self, data):
if self.__stack and data.strip():
if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]:
- if self.__entry.content and isinstance(self.__entry.content[-1], Text):
- self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data
- else:
- self.__entry.content.append(Text(data=data))
- elif self.__stack[-1][0] == "a":
+ self._on_text(data)
+ elif self.__stack[-1:] == [("img", "emoji")] or self.__stack[-2:] ==[("img", "emoji"), ("span", "")]:
+ self._on_text(data)
+ elif self.__stack[-1][0] in {"a", "img"}:
if self.__stack[-2:-1] == [("div", "pi_text")] or self.__stack[-3:-1] == [("div", "pi_text"), ("span", "")]:
title = self.__current_attrs.get("title", "")
self.__entry.content.append(Link.from_(self.__base_url, self.__current_attrs, data))
- # print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}")
+ if self.__debug_print:
+ print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}")
def feed(self, *args, **kwargs):
super().feed(*args, **kwargs)
+ if not self.__harvested:
+ raise RuntimeError("No entries found in feed")
return self.__harvested
+ def _on_text(self, data):
+ if self.__entry.content and isinstance(self.__entry.content[-1], Text):
+ self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data
+ else:
+ self.__entry.content.append(Text(data=data))
-for base_url in (
- "https://m.vk.com/2sort",
- "https://m.vk.com/poreziknasobachke",
- "https://m.vk.com/andrushaewa",
- "https://m.vk.com/sagolik",
- "https://m.vk.com/ryabovpetrvladimirovich",
- "https://m.vk.com/id71430449",
- "https://m.vk.com/doodletimeru",
- "https://m.vk.com/shantynatty",
-):
- # print("*" * 80)
+
+class _HTTPRequestHandler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ try:
+ _response_fn = self._do_GET()
+ except Exception as error:
+ self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(error))
+ return
+
+ _response_fn()
+
+ def _do_GET(self):
+ _, _, path, _, qs, _ = urlparse(self.path)
+ query = parse_qs(qs) if qs else {}
+ if path == "/robots.txt":
+ return self._return_robots_txt
+
+ if path == "/":
+ name = query.get("name", [None])[0]
+ if name:
+ base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
+ feed = ET.Element("feed")
+ feed.attrib.update(xmlns="http://www.w3.org/2005/Atom")
+ ET.SubElement(feed, "title").text = base_url
+ ET.SubElement(feed, "id").text = base_url
+ ET.SubElement(feed, "link").attrib.update(rel="self", href=self.path)
+ harvested = _HTMLParser(base_url).feed(_get_html(base_url))
+ ET.SubElement(feed, "updated").text = max(entry.updated for entry in harvested).isoformat()
+ return lambda: self._return_xml(ET.ElementTree(feed))
+
+ return self._return_not_found
+
+ def _return_xml(self, tree):
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-type", "text/xml")
+ self.end_headers()
+ ET.indent(tree)
+ tree.write(self.wfile, xml_declaration=True, encoding="utf-8")
+
+ def _return_robots_txt(self):
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-type", "text/plain")
+ self.end_headers()
+ self.wfile.write(b"User-agent: *\nDisallow: /")
+
+ def _return_not_found(self):
+ self.send_error(HTTPStatus.NOT_FOUND, "Path not found")
+
+
+def _run_http_server(address: str, port: int):
+ with ThreadingHTTPServer((address, port), _HTTPRequestHandler) as http_server:
+ sock_host, sock_port = http_server.socket.getsockname()[:2]
+ print(f"HTTP server started ({sock_host}:{sock_port})...")
+ try:
+ http_server.serve_forever()
+ except KeyboardInterrupt:
+ print("\nKeyboard interrupt received, exiting.")
+
+
+def _run_dump(name: str):
+ base_url = urlunparse(("https", "m.vk.com", name, "", "", ""))
+ print("=" * 80)
print(base_url)
- harvested = _HTMLParser(base_url).feed(_get_html(base_url))
- print("\n".join(str(entry.updated) for entry in harvested))
- # print("\n".join(str(entry) for entry in harvested))
- # print(base_url)
+ harvested = _HTMLParser(base_url, debug_print=True).feed(_get_html(base_url))
+ print("=" * 80)
+ print("\n".join(str(entry) for entry in harvested))
+ print(base_url)
+
+
+if __name__ == "__main__":
+ from argparse import ArgumentParser
+
+ parser = ArgumentParser()
+ parser.add_argument("--address", default="127.0.0.1", help="bind to this address (default: %(default)s)")
+ parser.add_argument("--port", default=8000, type=int, help="bind to this port (default: %(default)s)")
+ parser.add_argument("--dump", help="[special mode]: dumping parsed VK page specified by name")
+ args = parser.parse_args()
+ if args.dump:
+ _run_dump(args.dump)
+ else:
+ _run_http_server(args.address, args.port)