commit - 9aafee460406220455032974c519ef5ac37016d5
commit + 53cff30f688fa4c6de0b531106aa693125ebbfa3
blob - ffff8f49abe14f79cbc81439814591e53f28cd6a
blob + 4a503df4bd8722ca47fe51f3a075e73d1f8e7d7a
--- avk.py
+++ avk.py
from http.cookiejar import CookieJar, Cookie
from urllib.request import build_opener, HTTPCookieProcessor
from html.parser import HTMLParser
+from dataclasses import dataclass, field
+from urllib.parse import urljoin
+from datetime import datetime
+from json import loads as json_loads
-cookiejar = CookieJar()
-opener = build_opener(HTTPCookieProcessor(cookiejar))
-with opener.open("https://m.vk.com/rtpbooks") as response:
+def _get_html(base_url):
+ cookiejar = CookieJar()
+ opener = build_opener(HTTPCookieProcessor(cookiejar))
+ with opener.open(base_url, timeout=15) as response:
+ # fill cookies
+ pass
+
+ cookie_tmpl = next(iter(cookiejar))
+ cookiejar.set_cookie(
+ Cookie(
+ cookie_tmpl.version,
+ "remixmdevice",
+ "1920/1080/1/!",
+ cookie_tmpl.port,
+ cookie_tmpl.port_specified,
+ cookie_tmpl.domain,
+ cookie_tmpl.domain_specified,
+ cookie_tmpl.domain_initial_dot,
+ cookie_tmpl.path,
+ cookie_tmpl.path_specified,
+ cookie_tmpl.secure,
+ cookie_tmpl.expires,
+ cookie_tmpl.discard,
+ cookie_tmpl.comment,
+ cookie_tmpl.comment_url,
+ cookie_tmpl._rest,
+ rfc2109=cookie_tmpl.rfc2109,
+ )
+ )
+
+ with opener.open(base_url, timeout=15) as response:
+ return response.read().decode("utf8")
+
+
+@dataclass()
+class Text:
+ data: str
+
+@dataclass()
+class Link:
+ data: str
+ url: str
+
+ @classmethod
+ def from_(cls, base_url, attrs, data):
+ title = attrs.get("title", "")
+ if title.startswith("https://"):
+ return cls(data=title if data.startswith("https://") else data, url=title)
+
+ href = urljoin(base_url, attrs["href"])
+ if "/away.php?" in href and data.startswith("https://"):
+ return cls(data=data, url=data)
+
+ return cls(data=data, url=href)
+
+@dataclass()
+class LineBreak:
pass
+@dataclass()
+class HorizontalRule:
+ pass
-cookie_tmpl = next(iter(cookiejar))
-cookiejar.set_cookie(
- Cookie(
- cookie_tmpl.version,
- "remixmdevice",
- "1920/1080/1/!",
- cookie_tmpl.port,
- cookie_tmpl.port_specified,
- cookie_tmpl.domain,
- cookie_tmpl.domain_specified,
- cookie_tmpl.domain_initial_dot,
- cookie_tmpl.path,
- cookie_tmpl.path_specified,
- cookie_tmpl.secure,
- cookie_tmpl.expires,
- cookie_tmpl.discard,
- cookie_tmpl.comment,
- cookie_tmpl.comment_url,
- cookie_tmpl._rest,
- rfc2109=cookie_tmpl.rfc2109,
- )
-)
+@dataclass()
+class Image:
+ url: str
-with opener.open("https://m.vk.com/rtpbooks") as response:
- html_page = response.read().decode("utf8")
+@dataclass()
+class Entry:
+ updated: datetime | None = None
+ url: str = ""
+ content: list = field(default_factory=list)
+
class MyHTMLParser(HTMLParser):
+ def __init__(self, base_url, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.__base_url = base_url
+ self.__stack = []
+ self.__harvested = []
+ self.__entry = Entry()
+ self.__current_attrs = {}
+
def handle_starttag(self, tag, attrs):
- print(f"> {tag} {attrs=}")
+ self.__current_attrs = {pair[0]: pair[1] for pair in attrs}
+ if not self.__stack:
+ if tag == "div" and self.__current_attrs.get("class", "").startswith("wall_item "):
+ self.__entry = Entry()
+ self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
+ # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+ else:
+ if tag == "a":
+ if self.__current_attrs.get("class", "").startswith("PostHeaderTime"):
+ assert not self.__entry.updated, self.__entry
+ self.__entry.updated = self.__current_attrs["data-date"]
+ assert not self.__entry.url, self.__entry
+ self.__entry.url = urljoin(self.__base_url, self.__current_attrs["href"])
+ elif self.__current_attrs.get("class", "") == "MediaGrid__interactive":
+ self.__entry.content.append(LineBreak())
+ self.__entry.content.append(
+ Link.from_(
+ self.__base_url,
+ self.__current_attrs,
+ urljoin(self.__base_url, self.__current_attrs["href"])
+ )
+ )
+ elif self.__stack[-1][0] == "div" and self.__stack[-1][1].startswith("thumb_map"):
+ self.__entry.content.append(LineBreak())
+ self.__entry.content.append(
+ Link.from_(
+ self.__base_url,
+ self.__current_attrs,
+ urljoin(self.__base_url, self.__current_attrs["href"])
+ )
+ )
+ elif tag == "br":
+ self.__entry.content.append(LineBreak())
+ elif tag == "img" and self.__current_attrs.get("class") in {"PhotoPrimaryAttachment__imageElement", "MediaGrid__imageElement"}:
+ self.__entry.content.append(LineBreak())
+ self.__entry.content.append(Image(url=self.__current_attrs["src"]))
+ elif tag == "div" and self.__current_attrs.get("class") == "pic_body_wrap":
+ self.__entry.content.append(HorizontalRule())
+ elif tag == "div" and self.__current_attrs.get("class", "").startswith("LinkSnippetPrimaryAttachmentReactBlockMVK__root"):
+ data_exec = self.__current_attrs.get("data-exec")
+ if data_exec:
+ for data in json_loads(data_exec).values():
+ url = data.get("snippet", {}).get("url")
+ if url:
+ self.__entry.content.append(LineBreak())
+ self.__entry.content.append(Link.from_(self.__base_url, {"href": url}, url))
+ break
+
+ self.__stack.append((tag, self.__current_attrs.get("class", "").strip()))
+ # print(f"> {'.'.join(pair[0] for pair in self.__stack)} {attrs=}")
+
def handle_endtag(self, tag):
- print(f"< {tag}")
+ if self.__stack:
+ while self.__stack:
+ stack_tag, _ = self.__stack.pop()
+ if stack_tag == tag:
+ break
+ if not self.__stack:
+ assert self.__entry.updated, self.__entry
+ assert self.__entry.url, self.__entry
+ self.__harvested.append(self.__entry)
+ self.__current_attrs = {}
def handle_data(self, data):
- print(f"{data=}")
+ if self.__stack and data.strip():
+ if self.__stack[-1:] == [("div", "pi_text")] or self.__stack[-2:] == [("div", "pi_text"), ("span", "")]:
+ if self.__entry.content and isinstance(self.__entry.content[-1], Text):
+ self.__entry.content[-1].data = self.__entry.content[-1].data + " " + data
+ else:
+ self.__entry.content.append(Text(data=data))
+ elif self.__stack[-1][0] == "a":
+ if self.__stack[-2:-1] == [("div", "pi_text")] or self.__stack[-3:-1] == [("div", "pi_text"), ("span", "")]:
+ title = self.__current_attrs.get("title", "")
+ self.__entry.content.append(Link.from_(self.__base_url, self.__current_attrs, data))
+ # print(f"{'|'.join(pair[0] + '.' + pair[1] for pair in self.__stack)} {data=}")
+ def feed(self, *args, **kwargs):
+ super().feed(*args, **kwargs)
+ return self.__harvested
-parser = MyHTMLParser()
-parser.feed(html_page)
+
+for base_url in (
+ "https://m.vk.com/2sort",
+ "https://m.vk.com/poreziknasobachke",
+ "https://m.vk.com/andrushaewa",
+ "https://m.vk.com/sagolik",
+ "https://m.vk.com/ryabovpetrvladimirovich",
+ "https://m.vk.com/id71430449",
+ "https://m.vk.com/doodletimeru",
+ "https://m.vk.com/shantynatty",
+):
+ harvested = MyHTMLParser(base_url).feed(_get_html(base_url))
+ print("*" * 80)
+ print(base_url)
+ print("\n".join(str(entry) for entry in harvested))
+ print(base_url)