feat: 知乎支持(关键词、评论)

This commit is contained in:
Relakkes
2024-09-08 00:00:04 +08:00
parent 131b5697d5
commit b7e57da0d2
20 changed files with 1776 additions and 15 deletions

View File

@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from .core import ZhihuCrawler

View File

@@ -0,0 +1,319 @@
# -*- coding: utf-8 -*-
import asyncio
import json
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed
import config
from base.base_crawler import AbstractApiClient
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent
from tools import utils
from .exception import DataFetchError, ForbiddenError
from .field import SearchSort, SearchTime, SearchType
from .help import ZhiHuJsonExtractor, sign
class ZhiHuClient(AbstractApiClient):
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
self.default_headers = headers
self.cookie_dict = cookie_dict
self._extractor = ZhiHuJsonExtractor()
async def _pre_headers(self, url: str) -> Dict:
"""
请求头参数签名
Args:
url: 请求的URL需要包含请求的参数
Returns:
"""
d_c0 = self.cookie_dict.get("d_c0")
if not d_c0:
raise Exception("d_c0 not found in cookies")
sign_res = sign(url, self.default_headers["cookie"])
headers = self.default_headers.copy()
headers['x-zst-81'] = sign_res["x-zst-81"]
headers['x-zse-96'] = sign_res["x-zse-96"]
return headers
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, **kwargs) -> Union[str, Any]:
"""
封装httpx的公共请求方法对请求响应做一些处理
Args:
method: 请求方法
url: 请求的URL
**kwargs: 其他请求参数,例如请求头、请求体等
Returns:
"""
# return response.text
return_response = kwargs.pop('return_response', False)
async with httpx.AsyncClient(proxies=self.proxies, ) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
if response.status_code != 200:
utils.logger.error(f"[ZhiHuClient.request] Requset Url: {url}, Request error: {response.text}")
if response.status_code == 403:
raise ForbiddenError(response.text)
elif response.status_code == 404: # 如果一个content没有评论也是404
return {}
raise DataFetchError(response.text)
if return_response:
return response.text
try:
data: Dict = response.json()
if data.get("error"):
utils.logger.error(f"[ZhiHuClient.request] Request error: {data}")
raise DataFetchError(data.get("error", {}).get("message"))
return data
except json.JSONDecodeError:
utils.logger.error(f"[ZhiHuClient.request] Request error: {response.text}")
raise DataFetchError(response.text)
async def get(self, uri: str, params=None) -> Dict:
"""
GET请求对请求头签名
Args:
uri: 请求路由
params: 请求参数
Returns:
"""
final_uri = uri
if isinstance(params, dict):
final_uri += '?' + urlencode(params)
headers = await self._pre_headers(final_uri)
return await self.request(method="GET", url=zhihu_constant.ZHIHU_URL + final_uri, headers=headers)
async def pong(self) -> bool:
"""
用于检查登录态是否失效了
Returns:
"""
utils.logger.info("[ZhiHuClient.pong] Begin to pong zhihu...")
ping_flag = False
try:
res = await self.get_current_user_info()
if res.get("uid") and res.get("name"):
ping_flag = True
utils.logger.info("[ZhiHuClient.pong] Ping zhihu successfully")
else:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed, response data: {res}")
except Exception as e:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
"""
API客户端提供的更新cookies方法一般情况下登录成功后会调用此方法
Args:
browser_context: 浏览器上下文对象
Returns:
"""
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.default_headers["cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_current_user_info(self) -> Dict:
"""
获取当前登录用户信息
Returns:
"""
params = {
"include": "email,is_active,is_bind_phone"
}
return await self.get("/api/v4/me", params)
async def get_note_by_keyword(
self, keyword: str,
page: int = 1,
page_size: int = 20,
sort: SearchSort = SearchSort.DEFAULT,
note_type: SearchType = SearchType.DEFAULT,
search_time: SearchTime = SearchTime.DEFAULT
) -> List[ZhihuContent]:
"""
根据关键词搜索
Args:
keyword: 关键词
page: 第几页
page_size: 分页size
sort: 排序
note_type: 搜索结果类型
search_time: 搜索多久时间的结果
Returns:
"""
uri = "/api/v4/search_v3"
params = {
"gk_version": "gz-gaokao",
"t": "general",
"q": keyword,
"correction": 1,
"offset": (page - 1) * page_size,
"limit": page_size,
"filter_fields": "",
"lc_idx": (page - 1) * page_size,
"show_all_topics": 0,
"search_source": "Filter",
"time_interval": search_time.value,
"sort": sort.value,
"vertical": note_type.value,
}
search_res = await self.get(uri, params)
utils.logger.info(f"[ZhiHuClient.get_note_by_keyword] Search result: {search_res}")
return self._extractor.extract_contents(search_res)
async def get_root_comments(self, content_id: str, content_type: str, offset: str = "", limit: int = 10,
order_by: str = "sort") -> Dict:
"""
获取内容的一级评论
Args:
content_id: 内容ID
content_type: 内容类型(answer, article, zvideo)
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/{content_type}s/{content_id}/root_comments"
params = {
"order": order_by,
"offset": offset,
"limit": limit
}
return await self.get(uri, params)
async def get_child_comments(self, root_comment_id: str, offset: str = "", limit: int = 10,
order_by: str = "sort") -> Dict:
"""
获取一级评论下的子评论
Args:
root_comment_id:
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/comment_v5/comment/{root_comment_id}/child_comment"
params = {
"order": order_by,
"offset": offset,
"limit": limit
}
return await self.get(uri, params)
async def get_note_all_comments(self, content: ZhihuContent, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuComment]:
"""
获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
Args:
content: 内容详情对象(问题|文章|视频)
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
result: List[ZhihuComment] = []
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
root_comment_res = await self.get_root_comments(content.content_id, content.content_type, offset, limit)
if not root_comment_res:
break
paging_info = root_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
comments = self._extractor.extract_comments(content, root_comment_res.get("data"))
if not comments:
break
if callback:
await callback(comments)
result.extend(comments)
await self.get_comments_all_sub_comments(content, comments, crawl_interval=crawl_interval, callback=callback)
await asyncio.sleep(crawl_interval)
return result
async def get_comments_all_sub_comments(self, content: ZhihuContent, comments: List[ZhihuComment], crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuComment]:
"""
获取指定评论下的所有子评论
Args:
content: 内容详情对象(问题|文章|视频)
comments: 评论列表
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
if not config.ENABLE_GET_SUB_COMMENTS:
return []
all_sub_comments: List[ZhihuComment] = []
for parment_comment in comments:
if parment_comment.sub_comment_count == 0:
continue
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
child_comment_res = await self.get_child_comments(parment_comment.comment_id, offset, limit)
if not child_comment_res:
break
paging_info = child_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
sub_comments = self._extractor.extract_comments(content, child_comment_res.get("data"))
if not sub_comments:
break
if callback:
await callback(sub_comments)
all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval)
return all_sub_comments

View File

@@ -0,0 +1,236 @@
# -*- coding: utf-8 -*-
import asyncio
import os
import random
from asyncio import Task
from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
import config
from base.base_crawler import AbstractCrawler
from model.m_zhihu import ZhihuContent
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import zhihu as zhihu_store
from tools import utils
from var import crawler_type_var, source_keyword_var
from .client import ZhiHuClient
from .exception import DataFetchError
from .help import ZhiHuJsonExtractor
from .login import ZhiHuLogin
class ZhihuCrawler(AbstractCrawler):
context_page: Page
zhihu_client: ZhiHuClient
browser_context: BrowserContext
def __init__(self) -> None:
self.index_url = "https://www.zhihu.com"
# self.user_agent = utils.get_user_agent()
self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
self._extractor = ZhiHuJsonExtractor()
async def start(self) -> None:
"""
Start the crawler
Returns:
"""
playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info)
async with async_playwright() as playwright:
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium,
None,
self.user_agent,
headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
# Create a client to interact with the zhihu website.
self.zhihu_client = await self.create_zhihu_client(httpx_proxy_format)
if not await self.zhihu_client.pong():
login_obj = ZhiHuLogin(
login_type=config.LOGIN_TYPE,
login_phone="", # input your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES
)
await login_obj.begin()
await self.zhihu_client.update_cookies(browser_context=self.browser_context)
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information.
await self.search()
elif config.CRAWLER_TYPE == "detail":
# Get the information and comments of the specified post
raise NotImplementedError
elif config.CRAWLER_TYPE == "creator":
# Get creator's information and their notes and comments
raise NotImplementedError
else:
pass
utils.logger.info("[ZhihuCrawler.start] Zhihu Crawler finished ...")
async def search(self) -> None:
"""Search for notes and retrieve their comment information."""
utils.logger.info("[ZhihuCrawler.search] Begin search zhihu keywords")
zhihu_limit_count = 20 # zhihu limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count
start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(f"[ZhihuCrawler.search] Current search keyword: {keyword}")
page = 1
while (page - start_page + 1) * zhihu_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[ZhihuCrawler.search] Skip page {page}")
page += 1
continue
try:
utils.logger.info(f"[ZhihuCrawler.search] search zhihu keyword: {keyword}, page: {page}")
content_list: List[ZhihuContent] = await self.zhihu_client.get_note_by_keyword(
keyword=keyword,
page=page,
)
utils.logger.info(f"[ZhihuCrawler.search] Search contents :{content_list}")
if not content_list:
utils.logger.info("No more content!")
break
page += 1
for content in content_list:
await zhihu_store.update_zhihu_content(content)
await self.batch_get_content_comments(content_list)
except DataFetchError:
utils.logger.error("[ZhihuCrawler.search] Search content error")
return
async def batch_get_content_comments(self, content_list: List[ZhihuContent]):
"""
Batch get content comments
Args:
content_list:
Returns:
"""
if not config.ENABLE_GET_COMMENTS:
utils.logger.info(f"[ZhihuCrawler.batch_get_content_comments] Crawling comment mode is not enabled")
return
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for content_item in content_list:
task = asyncio.create_task(self.get_comments(content_item, semaphore), name=content_item.content_id)
task_list.append(task)
await asyncio.gather(*task_list)
async def get_comments(self, content_item: ZhihuContent, semaphore: asyncio.Semaphore):
"""
Get note comments with keyword filtering and quantity limitation
Args:
content_item:
semaphore:
Returns:
"""
async with semaphore:
utils.logger.info(f"[ZhihuCrawler.get_comments] Begin get note id comments {content_item.content_id}")
await self.zhihu_client.get_note_all_comments(
content=content_item,
crawl_interval=random.random(),
callback=zhihu_store.batch_update_zhihu_note_comments
)
@staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx"""
playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
"username": ip_proxy_info.user,
"password": ip_proxy_info.password,
}
httpx_proxy = {
f"{ip_proxy_info.protocol}": f"http://{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}"
}
return playwright_proxy, httpx_proxy
async def create_zhihu_client(self, httpx_proxy: Optional[str]) -> ZhiHuClient:
"""Create zhihu client"""
utils.logger.info("[ZhihuCrawler.create_zhihu_client] Begin create zhihu API client ...")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
zhihu_client_obj = ZhiHuClient(
proxies=httpx_proxy,
headers={
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cookie': cookie_str,
'priority': 'u=1, i',
'referer': 'https://www.zhihu.com/search?q=python&time_interval=a_year&type=content',
'user-agent': self.user_agent,
'x-api-version': '3.0.91',
'x-app-za': 'OS=Web',
'x-requested-with': 'fetch',
'x-zse-93': '101_3_3.0',
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
)
return zhihu_client_obj
async def launch_browser(
self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True
) -> BrowserContext:
"""Launch browser and create browser context"""
utils.logger.info("[ZhihuCrawler.launch_browser] Begin create browser context ...")
if config.SAVE_LOGIN_STATE:
# feat issue #14
# we will save login state to avoid login every time
user_data_dir = os.path.join(os.getcwd(), "browser_data",
config.USER_DATA_DIR % config.PLATFORM) # type: ignore
browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir,
accept_downloads=True,
headless=headless,
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
async def close(self):
"""Close browser context"""
await self.browser_context.close()
utils.logger.info("[ZhihuCrawler.close] Browser context closed ...")

View File

@@ -0,0 +1,12 @@
from httpx import RequestError
class DataFetchError(RequestError):
"""something error when fetch"""
class IPBlockError(RequestError):
"""fetch so fast that the server block us ip"""
class ForbiddenError(RequestError):
"""Forbidden"""

View File

@@ -0,0 +1,36 @@
from enum import Enum
from typing import NamedTuple
from constant import zhihu as zhihu_constant
class SearchTime(Enum):
"""
搜索时间范围
"""
DEFAULT = "" # 不限时间
ONE_DAY = "a_day" # 一天内
ONE_WEEK = "a_week" # 一周内
ONE_MONTH = "a_month" # 一个月内
THREE_MONTH = "three_months" # 三个月内
HALF_YEAR = "half_a_year" # 半年内
ONE_YEAR = "a_year" # 一年内
class SearchType(Enum):
"""
搜索结果类型
"""
DEFAULT = "" # 不限类型
ANSWER = zhihu_constant.ANSWER_NAME # 只看回答
ARTICLE = zhihu_constant.ARTICLE_NAME # 只看文章
VIDEO = zhihu_constant.VIDEO_NAME # 只看视频
class SearchSort(Enum):
"""
搜索结果排序
"""
DEFAULT = "" # 综合排序
UPVOTED_COUNT = "upvoted_count" # 最多赞同
CREATE_TIME = "created_time" # 最新发布

View File

@@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
from typing import Dict, List
from urllib.parse import parse_qs, urlparse
import execjs
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from tools.crawler_util import extract_text_from_html
ZHIHU_SGIN_JS = None
def sign(url: str, cookies: str) -> Dict:
"""
zhihu sign algorithm
Args:
url: request url with query string
cookies: request cookies with d_c0 key
Returns:
"""
global ZHIHU_SGIN_JS
if not ZHIHU_SGIN_JS:
with open("libs/zhihu.js", "r") as f:
ZHIHU_SGIN_JS = execjs.compile(f.read())
return ZHIHU_SGIN_JS.call("get_sign", url, cookies)
class ZhiHuJsonExtractor:
def __init__(self):
pass
def extract_contents(self, json_data: Dict) -> List[ZhihuContent]:
"""
extract zhihu contents
Args:
json_data: zhihu json data
Returns:
"""
if not json_data:
return []
result: List[ZhihuContent] = []
search_result: List[Dict] = json_data.get("data", [])
search_result = [s_item for s_item in search_result if s_item.get("type") in ['search_result', 'zvideo']]
for sr_item in search_result:
sr_object: Dict = sr_item.get("object", {})
if sr_object.get("type") == zhihu_constant.ANSWER_NAME:
result.append(self._extract_answer_content(sr_object))
elif sr_object.get("type") == zhihu_constant.ARTICLE_NAME:
result.append(self._extract_article_content(sr_object))
elif sr_object.get("type") == zhihu_constant.VIDEO_NAME:
result.append(self._extract_zvideo_content(sr_object))
else:
continue
return result
def _extract_answer_content(self, answer: Dict) -> ZhihuContent:
"""
extract zhihu answer content
Args:
answer: zhihu answer
Returns:
"""
res = ZhihuContent()
res.content_id = answer.get("id")
res.content_type = answer.get("type")
res.content_text = extract_text_from_html(answer.get("content"))
res.question_id = answer.get("question").get("id")
res.content_url = f"{zhihu_constant.ZHIHU_URL}/question/{res.question_id}/answer/{res.content_id}"
res.title = extract_text_from_html(answer.get("title"))
res.desc = extract_text_from_html(answer.get("description"))
res.created_time = answer.get("created_time")
res.updated_time = answer.get("updated_time")
res.voteup_count = answer.get("voteup_count")
res.comment_count = answer.get("comment_count")
# extract author info
author_info = self._extract_author(answer.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
def _extract_article_content(self, article: Dict) -> ZhihuContent:
"""
extract zhihu article content
Args:
article: zhihu article
Returns:
"""
res = ZhihuContent()
res.content_id = article.get("id")
res.content_type = article.get("type")
res.content_text = extract_text_from_html(article.get("content"))
res.content_url = f"{zhihu_constant.ZHIHU_URL}/p/{res.content_id}"
res.title = extract_text_from_html(article.get("title"))
res.desc = extract_text_from_html(article.get("excerpt"))
res.created_time = article.get("created_time")
res.updated_time = article.get("updated_time")
res.voteup_count = article.get("voteup_count")
res.comment_count = article.get("comment_count")
# extract author info
author_info = self._extract_author(article.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
def _extract_zvideo_content(self, zvideo: Dict) -> ZhihuContent:
"""
extract zhihu zvideo content
Args:
zvideo:
Returns:
"""
res = ZhihuContent()
res.content_id = zvideo.get("zvideo_id")
res.content_type = zvideo.get("type")
res.content_url = zvideo.get("video_url")
res.title = extract_text_from_html(zvideo.get("title"))
res.desc = extract_text_from_html(zvideo.get("description"))
res.created_time = zvideo.get("created_at")
res.voteup_count = zvideo.get("voteup_count")
res.comment_count = zvideo.get("comment_count")
# extract author info
author_info = self._extract_author(zvideo.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
@staticmethod
def _extract_author(author: Dict) -> ZhihuCreator:
"""
extract zhihu author
Args:
author:
Returns:
"""
res = ZhihuCreator()
if not author:
return res
if not author.get("id"):
author = author.get("member")
res.user_id = author.get("id")
res.user_link = f"{zhihu_constant.ZHIHU_URL}/people/{author.get('url_token')}"
res.user_nickname = author.get("name")
res.user_avatar = author.get("avatar_url")
return res
def extract_comments(self, page_content: ZhihuContent, comments: List[Dict]) -> List[ZhihuComment]:
"""
extract zhihu comments
Args:
page_content: zhihu content object
comments: zhihu comments
Returns:
"""
if not comments:
return []
res: List[ZhihuComment] = []
for comment in comments:
if comment.get("type") != "comment":
continue
res.append(self._extract_comment(page_content, comment))
return res
def _extract_comment(self, page_content: ZhihuContent, comment: Dict) -> ZhihuComment:
"""
extract zhihu comment
Args:
page_content: comment with content object
comment: zhihu comment
Returns:
"""
res = ZhihuComment()
res.comment_id = str(comment.get("id", ""))
res.parent_comment_id = comment.get("reply_comment_id")
res.content = extract_text_from_html(comment.get("content"))
res.publish_time = comment.get("created_time")
res.ip_location = self._extract_comment_ip_location(comment.get("comment_tag", []))
res.sub_comment_count = comment.get("child_comment_count")
res.like_count = comment.get("like_count") if comment.get("like_count") else 0
res.dislike_count = comment.get("dislike_count") if comment.get("dislike_count") else 0
res.content_id = page_content.content_id
res.content_type = page_content.content_type
# extract author info
author_info = self._extract_author(comment.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
@staticmethod
def _extract_comment_ip_location(comment_tags: List[Dict]) -> str:
"""
extract comment ip location
Args:
comment_tags:
Returns:
"""
if not comment_tags:
return ""
for ct in comment_tags:
if ct.get("type") == "ip_info":
return ct.get("text")
return ""
@staticmethod
def extract_offset(paging_info: Dict) -> str:
"""
extract offset
Args:
paging_info:
Returns:
"""
# https://www.zhihu.com/api/v4/comment_v5/zvideos/1424368906836807681/root_comment?limit=10&offset=456770961_10125996085_0&order_by=score
next_url = paging_info.get("next")
if not next_url:
return ""
parsed_url = urlparse(next_url)
query_params = parse_qs(parsed_url.query)
offset = query_params.get('offset', [""])[0]
return offset

View File

@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
import asyncio
import functools
import sys
from typing import Optional
from playwright.async_api import BrowserContext, Page
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
import config
from base.base_crawler import AbstractLogin
from tools import utils
class ZhiHuLogin(AbstractLogin):
def __init__(self,
login_type: str,
browser_context: BrowserContext,
context_page: Page,
login_phone: Optional[str] = "",
cookie_str: str = ""
):
config.LOGIN_TYPE = login_type
self.browser_context = browser_context
self.context_page = context_page
self.login_phone = login_phone
self.cookie_str = cookie_str
@retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self) -> bool:
"""
Check if the current login status is successful and return True otherwise return False
Returns:
"""
current_cookie = await self.browser_context.cookies()
_, cookie_dict = utils.convert_cookies(current_cookie)
current_web_session = cookie_dict.get("z_c0")
if current_web_session:
return True
return False
async def begin(self):
"""Start login zhihu"""
utils.logger.info("[ZhiHu.begin] Begin login zhihu ...")
if config.LOGIN_TYPE == "qrcode":
await self.login_by_qrcode()
elif config.LOGIN_TYPE == "phone":
await self.login_by_mobile()
elif config.LOGIN_TYPE == "cookie":
await self.login_by_cookies()
else:
raise ValueError("[ZhiHu.begin]I nvalid Login Type Currently only supported qrcode or phone or cookies ...")
async def login_by_mobile(self):
"""Login zhihu by mobile"""
# todo implement login by mobile
async def login_by_qrcode(self):
"""login zhihu website and keep webdriver login state"""
utils.logger.info("[ZhiHu.login_by_qrcode] Begin login zhihu by qrcode ...")
qrcode_img_selector = "canvas.Qrcode-qrcode"
# find login qrcode
base64_qrcode_img = await utils.find_qrcode_img_from_canvas(
self.context_page,
canvas_selector=qrcode_img_selector
)
if not base64_qrcode_img:
utils.logger.info("[ZhiHu.login_by_qrcode] login failed , have not found qrcode please check ....")
if not base64_qrcode_img:
sys.exit()
# show login qrcode
# fix issue #12
# we need to use partial function to call show_qrcode function and run in executor
# then current asyncio event loop will not be blocked
partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
utils.logger.info(f"[ZhiHu.login_by_qrcode] waiting for scan code login, remaining time is 120s")
try:
await self.check_login_state()
except RetryError:
utils.logger.info("[ZhiHu.login_by_qrcode] Login zhihu failed by qrcode login method ...")
sys.exit()
wait_redirect_seconds = 5
utils.logger.info(
f"[ZhiHu.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
async def login_by_cookies(self):
"""login zhihu website by cookies"""
utils.logger.info("[ZhiHu.login_by_cookies] Begin login zhihu by cookie ...")
for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
await self.browser_context.add_cookies([{
'name': key,
'value': value,
'domain': ".zhihu.com",
'path': "/"
}])