refactor: 数据存储重构,分离不同类型的存储实现

This commit is contained in:
Relakkes
2024-01-14 22:06:31 +08:00
parent e31aebbdfb
commit 894dabcf63
37 changed files with 1427 additions and 864 deletions

View File

@@ -6,19 +6,18 @@
import asyncio
import os
import random
import time
from asyncio import Task
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
import config
from base.base_crawler import AbstractCrawler
from models import bilibili
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import bilibili as bilibili_store
from tools import utils
from var import comment_tasks_var, crawler_type_var
from var import crawler_type_var
from .client import BilibiliClient
from .exception import DataFetchError
@@ -88,7 +87,6 @@ class BilibiliCrawler(AbstractCrawler):
pass
utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def search(self):
"""
search bilibili video with keywords
@@ -118,7 +116,7 @@ class BilibiliCrawler(AbstractCrawler):
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili.update_bilibili_video(video_item)
await bilibili_store.update_bilibili_video(video_item)
page += 1
await self.batch_get_video_comments(video_id_list)
@@ -150,7 +148,7 @@ class BilibiliCrawler(AbstractCrawler):
await self.bili_client.get_video_all_comments(
video_id=video_id,
crawl_interval=random.random(),
callback=bilibili.batch_update_bilibili_video_comments
callback=bilibili_store.batch_update_bilibili_video_comments
)
except DataFetchError as ex:
@@ -176,7 +174,7 @@ class BilibiliCrawler(AbstractCrawler):
video_aid: str = video_item_view.get("aid")
if video_aid:
video_aids_list.append(video_aid)
await bilibili.update_bilibili_video(video_detail)
await bilibili_store.update_bilibili_video(video_detail)
await self.batch_get_video_comments(video_aids_list)
async def get_video_info_task(self, aid: int, bvid: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@@ -195,7 +193,8 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.error(f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}")
return None
except KeyError as ex:
utils.logger.error(f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}")
utils.logger.error(
f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}")
return None
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 18:44
# @Desc : bilibli登录实现
# @Desc : bilibli登录实现
import asyncio
import functools

View File

@@ -8,8 +8,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import douyin
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import douyin as douyin_store
from tools import utils
from var import crawler_type_var
@@ -99,7 +99,7 @@ class DouYinCrawler(AbstractCrawler):
except TypeError:
continue
aweme_list.append(aweme_info.get("aweme_id", ""))
await douyin.update_douyin_aweme(aweme_item=aweme_info)
await douyin_store.update_douyin_aweme(aweme_item=aweme_info)
utils.logger.info(f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}")
await self.batch_get_note_comments(aweme_list)
@@ -112,7 +112,7 @@ class DouYinCrawler(AbstractCrawler):
aweme_details = await asyncio.gather(*task_list)
for aweme_detail in aweme_details:
if aweme_detail is not None:
await douyin.update_douyin_aweme(aweme_detail)
await douyin_store.update_douyin_aweme(aweme_detail)
await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any:
@@ -146,7 +146,7 @@ class DouYinCrawler(AbstractCrawler):
keywords=config.COMMENT_KEYWORDS # 关键词列表
)
# 现在返回的 comments 已经是经过关键词筛选的
await douyin.batch_update_dy_aweme_comments(aweme_id, comments)
await douyin_store.batch_update_dy_aweme_comments(aweme_id, comments)
utils.logger.info(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ...")
except DataFetchError as e:
utils.logger.error(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}")

View File

@@ -10,7 +10,7 @@ from playwright.async_api import BrowserContext, Page
import config
from tools import utils
from .exception import DataFetchError, IPBlockError
from .exception import DataFetchError
from .graphql import KuaiShouGraphQL
@@ -56,13 +56,21 @@ class KuaiShouClient:
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=self.headers)
@staticmethod
async def pong() -> bool:
async def pong(self) -> bool:
"""get a note to check if login state is ok"""
utils.logger.info("[KuaiShouClient.pong] Begin pong kuaishou...")
ping_flag = False
try:
pass
post_data = {
"operationName": "visionProfileUserList",
"variables": {
"ftype": 1,
},
"query": self.graphql.get("vision_profile")
}
res = await self.post("", post_data)
if res.get("visionProfileUserList", {}).get("result") == 1:
ping_flag = True
except Exception as e:
utils.logger.error(f"[KuaiShouClient.pong] Pong kuaishou failed: {e}, and try to login again...")
ping_flag = False

View File

@@ -10,8 +10,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import kuaishou
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import kuaishou as kuaishou_store
from tools import utils
from var import comment_tasks_var, crawler_type_var
@@ -106,7 +106,7 @@ class KuaishouCrawler(AbstractCrawler):
for video_detail in vision_search_photo.get("feeds"):
video_id_list.append(video_detail.get("photo", {}).get("id"))
await kuaishou.update_kuaishou_video(video_item=video_detail)
await kuaishou_store.update_kuaishou_video(video_item=video_detail)
# batch fetch video comments
page += 1
@@ -121,7 +121,7 @@ class KuaishouCrawler(AbstractCrawler):
video_details = await asyncio.gather(*task_list)
for video_detail in video_details:
if video_detail is not None:
await kuaishou.update_kuaishou_video(video_detail)
await kuaishou_store.update_kuaishou_video(video_detail)
await self.batch_get_video_comments(config.KS_SPECIFIED_ID_LIST)
async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@@ -167,7 +167,7 @@ class KuaishouCrawler(AbstractCrawler):
await self.ks_client.get_video_all_comments(
photo_id=video_id,
crawl_interval=random.random(),
callback=kuaishou.batch_update_ks_video_comments
callback=kuaishou_store.batch_update_ks_video_comments
)
except DataFetchError as ex:
utils.logger.error(f"[KuaishouCrawler.get_comments] get video_id: {video_id} comment error: {ex}")

View File

@@ -11,7 +11,7 @@ class KuaiShouGraphQL:
self.load_graphql_queries()
def load_graphql_queries(self):
graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql"]
graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql", "vision_profile.graphql"]
for file in graphql_files:
with open(self.graphql_dir + file, mode="r") as f:

View File

@@ -0,0 +1,16 @@
query visionProfileUserList($pcursor: String, $ftype: Int) {
visionProfileUserList(pcursor: $pcursor, ftype: $ftype) {
result
fols {
user_name
headurl
user_text
isFollowing
user_id
__typename
}
hostName
pcursor
__typename
}
}

View File

@@ -15,8 +15,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import weibo
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import weibo as weibo_store
from tools import utils
from var import crawler_type_var
@@ -120,7 +120,7 @@ class WeiboCrawler(AbstractCrawler):
if note_item:
mblog: Dict = note_item.get("mblog")
note_id_list.append(mblog.get("id"))
await weibo.update_weibo_note(note_item)
await weibo_store.update_weibo_note(note_item)
page += 1
await self.batch_get_notes_comments(note_id_list)
@@ -138,7 +138,7 @@ class WeiboCrawler(AbstractCrawler):
video_details = await asyncio.gather(*task_list)
for note_item in video_details:
if note_item:
await weibo.update_weibo_note(note_item)
await weibo_store.update_weibo_note(note_item)
await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST)
async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@@ -184,33 +184,11 @@ class WeiboCrawler(AbstractCrawler):
async with semaphore:
try:
utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...")
# Read keyword and quantity from config
keywords = config.COMMENT_KEYWORDS
max_comments = config.MAX_COMMENTS_PER_POST
# Download comments
all_comments = await self.wb_client.get_note_all_comments(
await self.wb_client.get_note_all_comments(
note_id=note_id,
crawl_interval=random.randint(1,10), # 微博对API的限流比较严重所以延时提高一些
callback=weibo_store.batch_update_weibo_note_comments
)
# Filter comments by keyword
if keywords:
filtered_comments = [
comment for comment in all_comments if
any(keyword in comment["content"]["message"] for keyword in keywords)
]
else:
filtered_comments = all_comments
# Limit the number of comments
if max_comments > 0:
filtered_comments = filtered_comments[:max_comments]
# Update weibo note comments
await weibo.batch_update_weibo_note_comments(note_id, filtered_comments)
except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}")
except Exception as e:

View File

@@ -9,8 +9,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import xiaohongshu as xhs_model
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import xhs as xhs_store
from tools import utils
from var import crawler_type_var
@@ -112,7 +112,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_model.update_xhs_note(note_detail)
await xhs_store.update_xhs_note(note_detail)
note_id_list.append(note_detail.get("note_id"))
page += 1
utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
@@ -127,7 +127,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_model.update_xhs_note(note_detail)
await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(config.XHS_SPECIFIED_ID_LIST)
async def get_note_detail(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@@ -174,7 +174,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
# 更新或保存过滤后的评论
for comment in filtered_comments:
await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment)
await xhs_store.update_xhs_note_comment(note_id=note_id, comment_item=comment)
@staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: