commit 53cff30f688fa4c6de0b531106aa693125ebbfa3 from: Aleksey Ryndin date: Wed Jan 15 16:45:13 2025 UTC Add HTML parser (incomplete, work-in-progress, proof-of-concept) commit - 9aafee460406220455032974c519ef5ac37016d5 commit + 53cff30f688fa4c6de0b531106aa693125ebbfa3 blob - ffff8f49abe14f79cbc81439814591e53f28cd6a blob + 4a503df4bd8722ca47fe51f3a075e73d1f8e7d7a --- avk.py +++ avk.py @@ -1,51 +1,191 @@ from http.cookiejar import CookieJar, Cookie from urllib.request import build_opener, HTTPCookieProcessor from html.parser import HTMLParser +from dataclasses import dataclass, field +from urllib.parse import urljoin +from datetime import datetime +from json import loads as json_loads -cookiejar = CookieJar() -opener = build_opener(HTTPCookieProcessor(cookiejar)) -with opener.open("https://m.vk.com/rtpbooks") as response: +def _get_html(base_url): + cookiejar = CookieJar() + opener = build_opener(HTTPCookieProcessor(cookiejar)) + with opener.open(base_url, timeout=15) as response: + # fill cookies + pass + + cookie_tmpl = next(iter(cookiejar)) + cookiejar.set_cookie( + Cookie( + cookie_tmpl.version, + "remixmdevice", + "1920/1080/1/!", + cookie_tmpl.port, + cookie_tmpl.port_specified, + cookie_tmpl.domain, + cookie_tmpl.domain_specified, + cookie_tmpl.domain_initial_dot, + cookie_tmpl.path, + cookie_tmpl.path_specified, + cookie_tmpl.secure, + cookie_tmpl.expires, + cookie_tmpl.discard, + cookie_tmpl.comment, + cookie_tmpl.comment_url, + cookie_tmpl._rest, + rfc2109=cookie_tmpl.rfc2109, + ) + ) + + with opener.open(base_url, timeout=15) as response: + return response.read().decode("utf8") + + +@dataclass() +class Text: + data: str + +@dataclass() +class Link: + data: str + url: str + + @classmethod + def from_(cls, base_url, attrs, data): + title = attrs.get("title", "") + if title.startswith("https://"): + return cls(data=title if data.startswith("https://") else data, url=title) + + href = urljoin(base_url, attrs["href"]) + if "/away.php?" in href and data.startswith("https://"): + return cls(data=data, url=data) + + return cls(data=data, url=href) + +@dataclass() +class LineBreak: pass +@dataclass() +class HorizontalRule: + pass -cookie_tmpl = next(iter(cookiejar)) -cookiejar.set_cookie( - Cookie( - cookie_tmpl.version, - "remixmdevice", - "1920/1080/1/!", - cookie_tmpl.port, - cookie_tmpl.port_specified, - cookie_tmpl.domain, - cookie_tmpl.domain_specified, - cookie_tmpl.domain_initial_dot, - cookie_tmpl.path, - cookie_tmpl.path_specified, - cookie_tmpl.secure, - cookie_tmpl.expires, - cookie_tmpl.discard, - cookie_tmpl.comment, - cookie_tmpl.comment_url, - cookie_tmpl._rest, - rfc2109=cookie_tmpl.rfc2109, - ) -) +@dataclass() +class Image: + url: str -with opener.open("https://m.vk.com/rtpbooks") as response: - html_page = response.read().decode("utf8") +@dataclass() +class Entry: + updated: datetime | None = None + url: str = "" + content: list = field(default_factory=list) + class MyHTMLParser(HTMLParser): + def __init__(self, base_url, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__base_url = base_url + self.__stack = [] + self.__harvested = [] + self.__entry = Entry() + self.__current_attrs = {} + def handle_starttag(self, tag, attrs): - print(f"> {tag} {attrs=}") + self.__current_attrs = {pair[0]: pair[1] for pair in attrs} + if not self.__stack: + if tag == "div" and self.__current_attrs.get("class", "").startswith("wall_item "): + self.__entry = Entry() + self.__stack.append((tag, self.__current_attrs.get("class", "").strip())) + # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}") + else: + if tag == "a": + if self.__current_attrs.get("class", "").startswith("PostHeaderTime"): + assert not self.__entry.updated, self.__entry + self.__entry.updated = self.__current_attrs["data-date"] + assert not self.__entry.url, self.__entry + self.__entry.url = urljoin(self.__base_url, self.__current_attrs["href"]) + elif self.__current_attrs.get("class", "") == "MediaGrid__interactive": + self.__entry.content.append(LineBreak()) + self.__entry.content.append( + Link.from_( + self.__base_url, + self.__current_attrs, + urljoin(self.__base_url, self.__current_attrs["href"]) + ) + ) + elif self.__stack[-1][0] == "div" and self.__stack[-1][1].startswith("thumb_map"): + self.__entry.content.append(LineBreak()) + self.__entry.content.append( + Link.from_( + self.__base_url, + self.__current_attrs, + urljoin(self.__base_url, self.__current_attrs["href"]) + ) + ) + elif tag == "br": + self.__entry.content.append(LineBreak()) + elif tag == "img" and self.__current_attrs.get("class") in {"PhotoPrimaryAttachment__imageElement", "MediaGrid__imageElement"}: + self.__entry.content.append(LineBreak()) + self.__entry.content.append(Image(url=self.__current_attrs["src"])) + elif tag == "div" and self.__current_attrs.get("class") == "pic_body_wrap": + self.__entry.content.append(HorizontalRule()) + elif tag == "div" and self.__current_attrs.get("class", "").startswith("LinkSnippetPrimaryAttachmentReactBlockMVK__root"): + data_exec = self.__current_attrs.get("data-exec") + if data_exec: + for data in json_loads(data_exec).values(): + url = data.get("snippet", {}).get("url") + if url: + self.__entry.content.append(LineBreak()) + self.__entry.content.append(Link.from_(self.__base_url, {"href": url}, url)) + break + + self.__stack.append((tag, self.__current_attrs.get("class", "").strip())) + # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}") + def handle_endtag(self, tag): - print(f"< {tag}") + if self.__stack: + while self.__stack: + stack_tag, _ = self.__stack.pop() + if stack_tag == tag: + break + if not self.__stack: + assert self.__entry.updated, self.__entry + assert self.__entry.url, self.__entry + self.__harvested.append(self.__entry) + self.__current_attrs = {} def handle_data(self, data): - print(f"{data=}") + if self.__stack and data.strip(): + if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]: + if self.__entry.content and isinstance(self.__entry.content[-1], Text): + self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data + else: + self.__entry.content.append(Text(data=data)) + elif self.__stack[-1][0] == "a": + if self.__stack[-2:-1] == [("div", "pi_text")] or self.__stack[-3:-1] == [("div", "pi_text"), ("span", "")]: + title = self.__current_attrs.get("title", "") + self.__entry.content.append(Link.from_(self.__base_url, self.__current_attrs, data)) + # print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}") + def feed(self, *args, **kwargs): + super().feed(*args, **kwargs) + return self.__harvested -parser = MyHTMLParser() -parser.feed(html_page) + +for base_url in ( + "https://m.vk.com/2sort", + "https://m.vk.com/poreziknasobachke", + "https://m.vk.com/andrushaewa", + "https://m.vk.com/sagolik", + "https://m.vk.com/ryabovpetrvladimirovich", + "https://m.vk.com/id71430449", + "https://m.vk.com/doodletimeru", + "https://m.vk.com/shantynatty", +): + harvested = MyHTMLParser(base_url).feed(_get_html(base_url)) + print("*" * 80) + print(base_url) + print("\n".join(str(entry) for entry in harvested)) + print(base_url)