feat: Bilibili comment done

This commit is contained in:
Relakkes
2023-12-09 21:10:01 +08:00
parent a136ea2739
commit 97d7a0c38b
10 changed files with 199 additions and 58 deletions

View File

@@ -16,11 +16,12 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import bilibili
from proxy.proxy_ip_pool import create_ip_pool, IpInfoModel
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from tools import utils
from var import comment_tasks_var, crawler_type_var
from .client import BilibiliClient
from .exception import DataFetchError
from .field import SearchOrderType
from .login import BilibiliLogin
@@ -107,17 +108,88 @@ class BilibiliCrawler(AbstractCrawler):
order=SearchOrderType.DEFAULT,
)
video_list: List[Dict] = videos_res.get("result")
for video_item in video_list:
video_id_list.append(video_item.get("id"))
await bilibili.update_bilibili_video(video_item)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_video_info_task(video_item.get("aid"), semaphore)
for video_item in video_list
]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili.update_bilibili_video(video_item)
page += 1
await self.batch_get_video_comments(video_id_list)
async def batch_get_video_comments(self, video_id_list: List[str]):
"""
batch get video comments
:param video_id_list:
:return:
"""
utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for video_id in video_id_list:
task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id)
task_list.append(task)
await asyncio.gather(*task_list)
async def get_comments(self, video_id: str, semaphore: asyncio.Semaphore):
"""
get comment for video id
:param video_id:
:param semaphore:
:return:
"""
async with semaphore:
try:
utils.logger.info(f"[get_comments] bengin get video_id: {video_id} comments ...")
await self.bili_client.get_video_all_comments(
video_id=video_id,
crawl_interval=random.random(),
callback=bilibili.batch_update_bilibili_video_comments
)
except DataFetchError as ex:
utils.logger.error(f"[get_comments] get video_id: {video_id} comment error: {ex}")
except Exception as e:
utils.logger.error(f"[get_comments] may be been blocked, err:", e)
async def get_specified_videos(self):
"""
get specified videos info
:return:
"""
pass
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_video_info_task(video_id=video_id, semaphore=semaphore) for video_id in config.BILI_SPECIFIED_ID_LIST
]
video_details = await asyncio.gather(*task_list)
for video_detail in video_details:
if video_detail is not None:
await bilibili.update_bilibili_video(video_detail)
await self.batch_get_video_comments(config.BILI_SPECIFIED_ID_LIST)
async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
"""
Get video detail task
:param video_id:
:param semaphore:
:return:
"""
async with semaphore:
try:
result = await self.bili_client.get_video_info(video_id)
return result
except DataFetchError as ex:
utils.logger.error(f"Get video detail error: {ex}")
return None
except KeyError as ex:
utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}")
return None
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:
"""Create xhs client"""