Merge pull request #658 from persist-1/feature/sqlite-support

增加对本地Sqlite数据库的支持(在不便于使用Mysql服务时也能使用数据库进行相关操作)
This commit is contained in:
程序员阿江(Relakkes)
2025-07-16 20:35:58 +08:00
committed by GitHub
35 changed files with 2559 additions and 1205 deletions

View File

@@ -199,11 +199,23 @@ python main.py --help
支持多种数据存储方式:
- **SQLite 数据库**:轻量级数据库,无需服务器,适合个人使用(推荐)
- 参数:`--save_data_option sqlite`
- 自动创建数据库文件
- **MySQL 数据库**:支持关系型数据库 MySQL 中保存(需要提前创建数据库)
- 执行 `python db.py` 初始化数据库表结构(只在首次执行)
- **CSV 文件**:支持保存到 CSV 中(`data/` 目录下)
- **JSON 文件**:支持保存到 JSON 中(`data/` 目录下)
### 使用示例:
```shell
# 使用 SQLite推荐个人用户使用
uv run main.py --platform xhs --lt qrcode --type search --save_data_option sqlite
# 使用 MySQL
uv run main.py --platform xhs --lt qrcode --type search --save_data_option db
```
---
[🚀 MediaCrawlerPro 重磅发布 🚀!更多的功能,更好的架构设计!](https://github.com/MediaCrawlerPro)

View File

@@ -195,11 +195,23 @@ python main.py --help
Supports multiple data storage methods:
- **SQLite Database**: Lightweight database without server, ideal for personal use (recommended)
- Parameter: `--save_data_option sqlite`
- Database file created automatically
- **MySQL Database**: Supports saving to relational database MySQL (need to create database in advance)
- Execute `python db.py` to initialize database table structure (only execute on first run)
- **CSV Files**: Supports saving to CSV (under `data/` directory)
- **JSON Files**: Supports saving to JSON (under `data/` directory)
### Usage Examples:
```shell
# Use SQLite (recommended for personal users)
uv run main.py --platform xhs --lt qrcode --type search --save_data_option sqlite
# Use MySQL
uv run main.py --platform xhs --lt qrcode --type search --save_data_option db
```
---
[🚀 MediaCrawlerPro Major Release 🚀! More features, better architectural design!](https://github.com/MediaCrawlerPro)

View File

@@ -195,11 +195,23 @@ python main.py --help
Soporta múltiples métodos de almacenamiento de datos:
- **Base de Datos SQLite**: Base de datos ligera sin servidor, ideal para uso personal (recomendado)
- Parámetro: `--save_data_option sqlite`
- Se crea automáticamente el archivo de base de datos
- **Base de Datos MySQL**: Soporta guardar en base de datos relacional MySQL (necesita crear base de datos con anticipación)
- Ejecute `python db.py` para inicializar la estructura de tablas de la base de datos (solo ejecutar en la primera ejecución)
- **Archivos CSV**: Soporta guardar en CSV (bajo el directorio `data/`)
- **Archivos JSON**: Soporta guardar en JSON (bajo el directorio `data/`)
### Ejemplos de Uso:
```shell
# Usar SQLite (recomendado para usuarios personales)
uv run main.py --platform xhs --lt qrcode --type search --save_data_option sqlite
# Usar MySQL
uv run main.py --platform xhs --lt qrcode --type search --save_data_option db
```
---
[🚀 ¡Lanzamiento Mayor de MediaCrawlerPro 🚀! ¡Más características, mejor diseño arquitectónico!](https://github.com/MediaCrawlerPro)

111
async_sqlite_db.py Normal file
View File

@@ -0,0 +1,111 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 14:21
# @Desc : 异步SQLite的增删改查封装
from typing import Any, Dict, List, Union
import aiosqlite
class AsyncSqliteDB:
def __init__(self, db_path: str) -> None:
self.__db_path = db_path
async def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:
"""
从给定的 SQL 中查询记录,返回的是一个列表
:param sql: 查询的sql
:param args: sql中传递动态参数列表
:return:
"""
async with aiosqlite.connect(self.__db_path) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(sql, args) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows] if rows else []
async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:
"""
从给定的 SQL 中查询记录,返回的是符合条件的第一个结果
:param sql: 查询的sql
:param args:sql中传递动态参数列表
:return:
"""
async with aiosqlite.connect(self.__db_path) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(sql, args) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:
"""
表中插入数据
:param table_name: 表名
:param item: 一条记录的字典信息
:return:
"""
fields = list(item.keys())
values = list(item.values())
fieldstr = ','.join(fields)
valstr = ','.join(['?'] * len(item))
sql = f"INSERT INTO {table_name} ({fieldstr}) VALUES({valstr})"
async with aiosqlite.connect(self.__db_path) as conn:
async with conn.execute(sql, values) as cursor:
await conn.commit()
return cursor.lastrowid
async def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,
value_where: Union[str, int, float]) -> int:
"""
更新指定表的记录
:param table_name: 表名
:param updates: 需要更新的字段和值的 key - value 映射
:param field_where: update 语句 where 条件中的字段名
:param value_where: update 语句 where 条件中的字段值
:return:
"""
upsets = []
values = []
for k, v in updates.items():
upsets.append(f'{k}=?')
values.append(v)
upsets_str = ','.join(upsets)
values.append(value_where)
sql = f'UPDATE {table_name} SET {upsets_str} WHERE {field_where}=?'
async with aiosqlite.connect(self.__db_path) as conn:
async with conn.execute(sql, values) as cursor:
await conn.commit()
return cursor.rowcount
async def execute(self, sql: str, *args: Union[str, int]) -> int:
"""
需要更新、写入等操作的 excute 执行语句
:param sql:
:param args:
:return:
"""
async with aiosqlite.connect(self.__db_path) as conn:
async with conn.execute(sql, args) as cursor:
await conn.commit()
return cursor.rowcount
async def executescript(self, sql_script: str) -> None:
"""
执行SQL脚本用于初始化数据库表结构
:param sql_script: SQL脚本内容
:return:
"""
async with aiosqlite.connect(self.__db_path) as conn:
await conn.executescript(sql_script)
await conn.commit()

View File

@@ -33,7 +33,7 @@ async def parse_cmd():
parser.add_argument('--get_sub_comment', type=str2bool,
help=''''whether to crawl level two comment, supported values case insensitive ('yes', 'true', 't', 'y', '1', 'no', 'false', 'f', 'n', '0')''', default=config.ENABLE_GET_SUB_COMMENTS)
parser.add_argument('--save_data_option', type=str,
help='where to save the data (csv or db or json)', choices=['csv', 'db', 'json'], default=config.SAVE_DATA_OPTION)
help='where to save the data (csv or db or json or sqlite)', choices=['csv', 'db', 'json', 'sqlite'], default=config.SAVE_DATA_OPTION)
parser.add_argument('--cookies', type=str,
help='cookies used for cookie login type', default=config.COOKIES)

View File

@@ -74,8 +74,8 @@ BROWSER_LAUNCH_TIMEOUT = 30
# 设置为False可以保持浏览器运行便于调试
AUTO_CLOSE_BROWSER = True
# 数据保存类型选项配置,支持种类型csv、db、json, 最好保存到DB有排重的功能。
SAVE_DATA_OPTION = "json" # csv or db or json
# 数据保存类型选项配置,支持种类型csv、db、json、sqlite, 最好保存到DB有排重的功能。
SAVE_DATA_OPTION = "json" # csv or db or json or sqlite
# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name

View File

@@ -12,11 +12,11 @@
import os
# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456")
RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root")
RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost")
RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306)
RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler")
MYSQL_DB_PWD = os.getenv("MYSQL_DB_PWD", "123456")
MYSQL_DB_USER = os.getenv("MYSQL_DB_USER", "root")
MYSQL_DB_HOST = os.getenv("MYSQL_DB_HOST", "localhost")
MYSQL_DB_PORT = os.getenv("MYSQL_DB_PORT", 3306)
MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME", "media_crawler")
# redis config
@@ -27,4 +27,7 @@ REDIS_DB_NUM = os.getenv("REDIS_DB_NUM", 0) # your redis db num
# cache type
CACHE_TYPE_REDIS = "redis"
CACHE_TYPE_MEMORY = "memory"
CACHE_TYPE_MEMORY = "memory"
# sqlite config
SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schema", "sqlite_tables.db")

163
db.py
View File

@@ -22,6 +22,7 @@ import aiomysql
import config
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from tools import utils
from var import db_conn_pool_var, media_crawler_db_var
@@ -33,11 +34,11 @@ async def init_mediacrawler_db():
"""
pool = await aiomysql.create_pool(
host=config.RELATION_DB_HOST,
port=config.RELATION_DB_PORT,
user=config.RELATION_DB_USER,
password=config.RELATION_DB_PWD,
db=config.RELATION_DB_NAME,
host=config.MYSQL_DB_HOST,
port=config.MYSQL_DB_PORT,
user=config.MYSQL_DB_USER,
password=config.MYSQL_DB_PWD,
db=config.MYSQL_DB_NAME,
autocommit=True,
)
async_db_obj = AsyncMysqlDB(pool)
@@ -47,6 +48,18 @@ async def init_mediacrawler_db():
media_crawler_db_var.set(async_db_obj)
async def init_sqlite_db():
"""
初始化SQLite数据库对象并将该对象塞给media_crawler_db_var上下文变量
Returns:
"""
async_db_obj = AsyncSqliteDB(config.SQLITE_DB_PATH)
# 将SQLite数据库对象放到上下文变量中
media_crawler_db_var.set(async_db_obj)
async def init_db():
"""
初始化db连接池
@@ -54,37 +67,143 @@ async def init_db():
"""
utils.logger.info("[init_db] start init mediacrawler db connect object")
await init_mediacrawler_db()
utils.logger.info("[init_db] end init mediacrawler db connect object")
if config.SAVE_DATA_OPTION == "sqlite":
await init_sqlite_db()
utils.logger.info("[init_db] end init sqlite db connect object")
else:
await init_mediacrawler_db()
utils.logger.info("[init_db] end init mysql db connect object")
async def close():
"""
关闭连接
关闭数据库连接
Returns:
"""
utils.logger.info("[close] close mediacrawler db pool")
db_pool: aiomysql.Pool = db_conn_pool_var.get()
if db_pool is not None:
db_pool.close()
utils.logger.info("[close] close mediacrawler db connection")
if config.SAVE_DATA_OPTION == "sqlite":
# SQLite数据库连接会在AsyncSqliteDB对象销毁时自动关闭
utils.logger.info("[close] sqlite db connection will be closed automatically")
else:
# MySQL连接池关闭
db_pool: aiomysql.Pool = db_conn_pool_var.get()
if db_pool is not None:
db_pool.close()
utils.logger.info("[close] mysql db pool closed")
async def init_table_schema():
async def init_table_schema(db_type: str = None):
"""
用来初始化数据库表结构,请在第一次需要创建表结构的时候使用,多次执行该函数会将已有的表以及数据全部删除
Args:
db_type: 数据库类型,可选值为 'sqlite''mysql',如果不指定则使用配置文件中的设置
Returns:
"""
utils.logger.info("[init_table_schema] begin init mysql table schema ...")
await init_mediacrawler_db()
async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()
async with aiofiles.open("schema/tables.sql", mode="r", encoding="utf-8") as f:
schema_sql = await f.read()
await async_db_obj.execute(schema_sql)
utils.logger.info("[init_table_schema] mediacrawler table schema init successful")
await close()
# 如果没有指定数据库类型,则使用配置文件中的设置
if db_type is None:
db_type = config.SAVE_DATA_OPTION
if db_type == "sqlite":
utils.logger.info("[init_table_schema] begin init sqlite table schema ...")
# 检查并删除可能存在的损坏数据库文件
import os
if os.path.exists(config.SQLITE_DB_PATH):
try:
# 尝试删除现有的数据库文件
os.remove(config.SQLITE_DB_PATH)
utils.logger.info(f"[init_table_schema] removed existing sqlite db file: {config.SQLITE_DB_PATH}")
except Exception as e:
utils.logger.warning(f"[init_table_schema] failed to remove existing sqlite db file: {e}")
# 如果删除失败,尝试重命名文件
try:
backup_path = f"{config.SQLITE_DB_PATH}.backup_{utils.get_current_timestamp()}"
os.rename(config.SQLITE_DB_PATH, backup_path)
utils.logger.info(f"[init_table_schema] renamed existing sqlite db file to: {backup_path}")
except Exception as rename_e:
utils.logger.error(f"[init_table_schema] failed to rename existing sqlite db file: {rename_e}")
raise rename_e
await init_sqlite_db()
async_db_obj: AsyncSqliteDB = media_crawler_db_var.get()
async with aiofiles.open("schema/sqlite_tables.sql", mode="r", encoding="utf-8") as f:
schema_sql = await f.read()
await async_db_obj.executescript(schema_sql)
utils.logger.info("[init_table_schema] sqlite table schema init successful")
elif db_type == "mysql":
utils.logger.info("[init_table_schema] begin init mysql table schema ...")
await init_mediacrawler_db()
async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()
async with aiofiles.open("schema/tables.sql", mode="r", encoding="utf-8") as f:
schema_sql = await f.read()
await async_db_obj.execute(schema_sql)
utils.logger.info("[init_table_schema] mysql table schema init successful")
await close()
else:
utils.logger.error(f"[init_table_schema] 不支持的数据库类型: {db_type}")
raise ValueError(f"不支持的数据库类型: {db_type},支持的类型: sqlite, mysql")
def show_database_options():
"""
显示支持的数据库选项
"""
print("\n=== MediaCrawler 数据库初始化工具 ===")
print("支持的数据库类型:")
print("1. sqlite - SQLite 数据库 (轻量级,无需额外配置)")
print("2. mysql - MySQL 数据库 (需要配置数据库连接信息)")
print("3. config - 使用配置文件中的设置")
print("4. exit - 退出程序")
print("="*50)
def get_user_choice():
"""
获取用户选择的数据库类型
Returns:
str: 用户选择的数据库类型
"""
while True:
choice = input("请输入数据库类型 (sqlite/mysql/config/exit): ").strip().lower()
if choice in ['sqlite', 'mysql', 'config', 'exit']:
return choice
else:
print("❌ 无效的选择,请输入: sqlite, mysql, config 或 exit")
async def main():
"""
主函数,处理用户交互和数据库初始化
"""
try:
show_database_options()
while True:
choice = get_user_choice()
if choice == 'exit':
print("👋 程序已退出")
break
elif choice == 'config':
print(f"📋 使用配置文件中的设置: {config.SAVE_DATA_OPTION}")
await init_table_schema()
print("✅ 数据库表结构初始化完成!")
break
else:
print(f"🚀 开始初始化 {choice.upper()} 数据库...")
await init_table_schema(choice)
print("✅ 数据库表结构初始化完成!")
break
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断操作")
except Exception as e:
print(f"\n❌ 初始化失败: {str(e)}")
utils.logger.error(f"[main] 数据库初始化失败: {str(e)}")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(init_table_schema())
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -41,6 +41,12 @@
# 从配置文件中读取指定的帖子ID列表获取指定帖子的信息与评论信息
python main.py --platform xhs --lt qrcode --type detail
# 使用SQLite数据库存储数据推荐个人用户使用
python main.py --platform xhs --lt qrcode --type search --save_data_option sqlite
# 使用MySQL数据库存储数据
python main.py --platform xhs --lt qrcode --type search --save_data_option db
# 打开对应APP扫二维码登录
@@ -51,6 +57,10 @@
## 数据保存
- 支持关系型数据库Mysql中保存需要提前创建数据库
- 执行 `python db.py` 初始化数据库数据库表结构(只在首次执行)
- 支持轻量级SQLite数据库保存无需额外安装数据库服务器
- 本地文件数据库,适合个人使用和小规模数据存储
- 使用参数 `--save_data_option sqlite` 启用SQLite存储
- 数据库文件自动创建在项目目录下schema/sqlite_tables.db
- 支持保存到csv中data/目录下)
- 支持保存到json中data/目录下)

View File

@@ -50,13 +50,13 @@ async def main():
await cmd_arg.parse_cmd()
# init db
if config.SAVE_DATA_OPTION == "db":
if config.SAVE_DATA_OPTION in ["db", "sqlite"]:
await db.init_db()
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start()
if config.SAVE_DATA_OPTION == "db":
if config.SAVE_DATA_OPTION in ["db", "sqlite"]:
await db.close()

View File

@@ -8,6 +8,7 @@ requires-python = ">=3.9"
dependencies = [
"aiofiles~=23.2.1",
"aiomysql==0.2.0",
"aiosqlite>=0.21.0",
"fastapi==0.110.2",
"httpx==0.24.0",
"jieba==0.42.1",
@@ -28,5 +29,5 @@ dependencies = [
]
[[tool.uv.index]]
url = "https://mirrors.aliyun.com/pypi/simple"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
default = true

BIN
schema/sqlite_tables.db Normal file
View File

Binary file not shown.

567
schema/sqlite_tables.sql Normal file
View File

@@ -0,0 +1,567 @@
-- SQLite版本的MediaCrawler数据库表结构
-- 从MySQL tables.sql转换而来适配SQLite语法
-- ----------------------------
-- Table structure for bilibili_video
-- ----------------------------
DROP TABLE IF EXISTS bilibili_video;
CREATE TABLE bilibili_video (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
video_id TEXT NOT NULL,
video_type TEXT NOT NULL,
title TEXT DEFAULT NULL,
desc TEXT,
create_time INTEGER NOT NULL,
liked_count TEXT DEFAULT NULL,
disliked_count TEXT DEFAULT NULL,
video_play_count TEXT DEFAULT NULL,
video_favorite_count TEXT DEFAULT NULL,
video_share_count TEXT DEFAULT NULL,
video_coin_count TEXT DEFAULT NULL,
video_danmaku TEXT DEFAULT NULL,
video_comment TEXT DEFAULT NULL,
video_url TEXT DEFAULT NULL,
video_cover_url TEXT DEFAULT NULL,
source_keyword TEXT DEFAULT ''
);
CREATE INDEX idx_bilibili_vi_video_i_31c36e ON bilibili_video(video_id);
CREATE INDEX idx_bilibili_vi_create__73e0ec ON bilibili_video(create_time);
-- ----------------------------
-- Table structure for bilibili_video_comment
-- ----------------------------
DROP TABLE IF EXISTS bilibili_video_comment;
CREATE TABLE bilibili_video_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
sex TEXT DEFAULT NULL,
sign TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
comment_id TEXT NOT NULL,
video_id TEXT NOT NULL,
content TEXT,
create_time INTEGER NOT NULL,
sub_comment_count TEXT NOT NULL,
parent_comment_id TEXT DEFAULT NULL,
like_count TEXT NOT NULL DEFAULT '0'
);
CREATE INDEX idx_bilibili_vi_comment_41c34e ON bilibili_video_comment(comment_id);
CREATE INDEX idx_bilibili_vi_video_i_f22873 ON bilibili_video_comment(video_id);
-- ----------------------------
-- Table structure for bilibili_up_info
-- ----------------------------
DROP TABLE IF EXISTS bilibili_up_info;
CREATE TABLE bilibili_up_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
sex TEXT DEFAULT NULL,
sign TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
total_fans INTEGER DEFAULT NULL,
total_liked INTEGER DEFAULT NULL,
user_rank INTEGER DEFAULT NULL,
is_official INTEGER DEFAULT NULL
);
CREATE INDEX idx_bilibili_vi_user_123456 ON bilibili_up_info(user_id);
-- ----------------------------
-- Table structure for bilibili_contact_info
-- ----------------------------
DROP TABLE IF EXISTS bilibili_contact_info;
CREATE TABLE bilibili_contact_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
up_id TEXT DEFAULT NULL,
fan_id TEXT DEFAULT NULL,
up_name TEXT DEFAULT NULL,
fan_name TEXT DEFAULT NULL,
up_sign TEXT DEFAULT NULL,
fan_sign TEXT DEFAULT NULL,
up_avatar TEXT DEFAULT NULL,
fan_avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE INDEX idx_bilibili_contact_info_up_id ON bilibili_contact_info(up_id);
CREATE INDEX idx_bilibili_contact_info_fan_id ON bilibili_contact_info(fan_id);
-- ----------------------------
-- Table structure for bilibili_up_dynamic
-- ----------------------------
DROP TABLE IF EXISTS bilibili_up_dynamic;
CREATE TABLE bilibili_up_dynamic (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dynamic_id TEXT DEFAULT NULL,
user_id TEXT DEFAULT NULL,
user_name TEXT DEFAULT NULL,
text TEXT DEFAULT NULL,
type TEXT DEFAULT NULL,
pub_ts INTEGER DEFAULT NULL,
total_comments INTEGER DEFAULT NULL,
total_forwards INTEGER DEFAULT NULL,
total_liked INTEGER DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE INDEX idx_bilibili_up_dynamic_dynamic_id ON bilibili_up_dynamic(dynamic_id);
-- ----------------------------
-- Table structure for douyin_aweme
-- ----------------------------
DROP TABLE IF EXISTS douyin_aweme;
CREATE TABLE douyin_aweme (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
sec_uid TEXT DEFAULT NULL,
short_user_id TEXT DEFAULT NULL,
user_unique_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
user_signature TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
aweme_id TEXT NOT NULL,
aweme_type TEXT NOT NULL,
title TEXT DEFAULT NULL,
desc TEXT,
create_time INTEGER NOT NULL,
liked_count TEXT DEFAULT NULL,
comment_count TEXT DEFAULT NULL,
share_count TEXT DEFAULT NULL,
collected_count TEXT DEFAULT NULL,
aweme_url TEXT DEFAULT NULL,
cover_url TEXT DEFAULT NULL,
video_download_url TEXT DEFAULT NULL,
source_keyword TEXT DEFAULT ''
);
CREATE INDEX idx_douyin_awem_aweme_i_6f7bc6 ON douyin_aweme(aweme_id);
CREATE INDEX idx_douyin_awem_create__299dfe ON douyin_aweme(create_time);
-- ----------------------------
-- Table structure for douyin_aweme_comment
-- ----------------------------
DROP TABLE IF EXISTS douyin_aweme_comment;
CREATE TABLE douyin_aweme_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
sec_uid TEXT DEFAULT NULL,
short_user_id TEXT DEFAULT NULL,
user_unique_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
user_signature TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
comment_id TEXT NOT NULL,
aweme_id TEXT NOT NULL,
content TEXT,
create_time INTEGER NOT NULL,
sub_comment_count TEXT NOT NULL,
parent_comment_id TEXT DEFAULT NULL,
like_count TEXT NOT NULL DEFAULT '0',
pictures TEXT NOT NULL DEFAULT ''
);
CREATE INDEX idx_douyin_awem_comment_fcd7e4 ON douyin_aweme_comment(comment_id);
CREATE INDEX idx_douyin_awem_aweme_i_c50049 ON douyin_aweme_comment(aweme_id);
-- ----------------------------
-- Table structure for dy_creator
-- ----------------------------
DROP TABLE IF EXISTS dy_creator;
CREATE TABLE dy_creator (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
desc TEXT,
gender TEXT DEFAULT NULL,
follows TEXT DEFAULT NULL,
fans TEXT DEFAULT NULL,
interaction TEXT DEFAULT NULL,
videos_count TEXT DEFAULT NULL
);
-- ----------------------------
-- Table structure for kuaishou_video
-- ----------------------------
DROP TABLE IF EXISTS kuaishou_video;
CREATE TABLE kuaishou_video (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
video_id TEXT NOT NULL,
video_type TEXT NOT NULL,
title TEXT DEFAULT NULL,
desc TEXT,
create_time INTEGER NOT NULL,
liked_count TEXT DEFAULT NULL,
viewd_count TEXT DEFAULT NULL,
video_url TEXT DEFAULT NULL,
video_cover_url TEXT DEFAULT NULL,
video_play_url TEXT DEFAULT NULL,
source_keyword TEXT DEFAULT ''
);
CREATE INDEX idx_kuaishou_vi_video_i_c5c6a6 ON kuaishou_video(video_id);
CREATE INDEX idx_kuaishou_vi_create__a10dee ON kuaishou_video(create_time);
-- ----------------------------
-- Table structure for kuaishou_video_comment
-- ----------------------------
DROP TABLE IF EXISTS kuaishou_video_comment;
CREATE TABLE kuaishou_video_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
comment_id TEXT NOT NULL,
video_id TEXT NOT NULL,
content TEXT,
create_time INTEGER NOT NULL,
sub_comment_count TEXT NOT NULL
);
CREATE INDEX idx_kuaishou_vi_comment_ed48fa ON kuaishou_video_comment(comment_id);
CREATE INDEX idx_kuaishou_vi_video_i_e50914 ON kuaishou_video_comment(video_id);
-- ----------------------------
-- Table structure for weibo_note
-- ----------------------------
DROP TABLE IF EXISTS weibo_note;
CREATE TABLE weibo_note (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
gender TEXT DEFAULT NULL,
profile_url TEXT DEFAULT NULL,
ip_location TEXT DEFAULT '',
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
note_id TEXT NOT NULL,
content TEXT,
create_time INTEGER NOT NULL,
create_date_time TEXT NOT NULL,
liked_count TEXT DEFAULT NULL,
comments_count TEXT DEFAULT NULL,
shared_count TEXT DEFAULT NULL,
note_url TEXT DEFAULT NULL,
source_keyword TEXT DEFAULT ''
);
CREATE INDEX idx_weibo_note_note_id_f95b1a ON weibo_note(note_id);
CREATE INDEX idx_weibo_note_create__692709 ON weibo_note(create_time);
CREATE INDEX idx_weibo_note_create__d05ed2 ON weibo_note(create_date_time);
-- ----------------------------
-- Table structure for weibo_note_comment
-- ----------------------------
DROP TABLE IF EXISTS weibo_note_comment;
CREATE TABLE weibo_note_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT DEFAULT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
gender TEXT DEFAULT NULL,
profile_url TEXT DEFAULT NULL,
ip_location TEXT DEFAULT '',
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
comment_id TEXT NOT NULL,
note_id TEXT NOT NULL,
content TEXT,
create_time INTEGER NOT NULL,
create_date_time TEXT NOT NULL,
comment_like_count TEXT NOT NULL,
sub_comment_count TEXT NOT NULL,
parent_comment_id TEXT DEFAULT NULL
);
CREATE INDEX idx_weibo_note__comment_c7611c ON weibo_note_comment(comment_id);
CREATE INDEX idx_weibo_note__note_id_24f108 ON weibo_note_comment(note_id);
CREATE INDEX idx_weibo_note__create__667fe3 ON weibo_note_comment(create_date_time);
-- ----------------------------
-- Table structure for weibo_creator
-- ----------------------------
DROP TABLE IF EXISTS weibo_creator;
CREATE TABLE weibo_creator (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
desc TEXT,
gender TEXT DEFAULT NULL,
follows TEXT DEFAULT NULL,
fans TEXT DEFAULT NULL,
tag_list TEXT
);
-- ----------------------------
-- Table structure for xhs_creator
-- ----------------------------
DROP TABLE IF EXISTS xhs_creator;
CREATE TABLE xhs_creator (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
desc TEXT,
gender TEXT DEFAULT NULL,
follows TEXT DEFAULT NULL,
fans TEXT DEFAULT NULL,
interaction TEXT DEFAULT NULL,
tag_list TEXT
);
-- ----------------------------
-- Table structure for xhs_note
-- ----------------------------
DROP TABLE IF EXISTS xhs_note;
CREATE TABLE xhs_note (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
note_id TEXT NOT NULL,
type TEXT DEFAULT NULL,
title TEXT DEFAULT NULL,
desc TEXT,
video_url TEXT,
time INTEGER NOT NULL,
last_update_time INTEGER NOT NULL,
liked_count TEXT DEFAULT NULL,
collected_count TEXT DEFAULT NULL,
comment_count TEXT DEFAULT NULL,
share_count TEXT DEFAULT NULL,
image_list TEXT,
tag_list TEXT,
note_url TEXT DEFAULT NULL,
source_keyword TEXT DEFAULT '',
xsec_token TEXT DEFAULT NULL
);
CREATE INDEX idx_xhs_note_note_id_209457 ON xhs_note(note_id);
CREATE INDEX idx_xhs_note_time_eaa910 ON xhs_note(time);
-- ----------------------------
-- Table structure for xhs_note_comment
-- ----------------------------
DROP TABLE IF EXISTS xhs_note_comment;
CREATE TABLE xhs_note_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
comment_id TEXT NOT NULL,
create_time INTEGER NOT NULL,
note_id TEXT NOT NULL,
content TEXT NOT NULL,
sub_comment_count INTEGER NOT NULL,
pictures TEXT DEFAULT NULL,
parent_comment_id TEXT DEFAULT NULL,
like_count TEXT DEFAULT NULL
);
CREATE INDEX idx_xhs_note_co_comment_8e8349 ON xhs_note_comment(comment_id);
CREATE INDEX idx_xhs_note_co_create__204f8d ON xhs_note_comment(create_time);
-- ----------------------------
-- Table structure for tieba_note
-- ----------------------------
DROP TABLE IF EXISTS tieba_note;
CREATE TABLE tieba_note (
id INTEGER PRIMARY KEY AUTOINCREMENT,
note_id TEXT NOT NULL,
title TEXT NOT NULL,
desc TEXT,
note_url TEXT NOT NULL,
publish_time TEXT NOT NULL,
user_link TEXT DEFAULT '',
user_nickname TEXT DEFAULT '',
user_avatar TEXT DEFAULT '',
tieba_id TEXT DEFAULT '',
tieba_name TEXT NOT NULL,
tieba_link TEXT NOT NULL,
total_replay_num INTEGER DEFAULT 0,
total_replay_page INTEGER DEFAULT 0,
ip_location TEXT DEFAULT '',
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
source_keyword TEXT DEFAULT ''
);
CREATE INDEX idx_tieba_note_note_id ON tieba_note(note_id);
CREATE INDEX idx_tieba_note_publish_time ON tieba_note(publish_time);
-- ----------------------------
-- Table structure for tieba_comment
-- ----------------------------
DROP TABLE IF EXISTS tieba_comment;
CREATE TABLE tieba_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id TEXT NOT NULL,
parent_comment_id TEXT DEFAULT '',
content TEXT NOT NULL,
user_link TEXT DEFAULT '',
user_nickname TEXT DEFAULT '',
user_avatar TEXT DEFAULT '',
tieba_id TEXT DEFAULT '',
tieba_name TEXT NOT NULL,
tieba_link TEXT NOT NULL,
publish_time TEXT DEFAULT '',
ip_location TEXT DEFAULT '',
sub_comment_count INTEGER DEFAULT 0,
note_id TEXT NOT NULL,
note_url TEXT NOT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE INDEX idx_tieba_comment_comment_id ON tieba_comment(comment_id);
CREATE INDEX idx_tieba_comment_note_id ON tieba_comment(note_id);
CREATE INDEX idx_tieba_comment_publish_time ON tieba_comment(publish_time);
-- ----------------------------
-- Table structure for tieba_creator
-- ----------------------------
DROP TABLE IF EXISTS tieba_creator;
CREATE TABLE tieba_creator (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
user_name TEXT NOT NULL,
nickname TEXT DEFAULT NULL,
avatar TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL,
gender TEXT DEFAULT NULL,
follows TEXT DEFAULT NULL,
fans TEXT DEFAULT NULL,
registration_duration TEXT DEFAULT NULL
);
-- ----------------------------
-- Table structure for zhihu_content
-- ----------------------------
DROP TABLE IF EXISTS zhihu_content;
CREATE TABLE zhihu_content (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_id TEXT NOT NULL,
content_type TEXT NOT NULL,
content_text TEXT,
content_url TEXT NOT NULL,
question_id TEXT DEFAULT NULL,
title TEXT NOT NULL,
desc TEXT,
created_time TEXT NOT NULL,
updated_time TEXT NOT NULL,
voteup_count INTEGER NOT NULL DEFAULT 0,
comment_count INTEGER NOT NULL DEFAULT 0,
source_keyword TEXT DEFAULT NULL,
user_id TEXT NOT NULL,
user_link TEXT NOT NULL,
user_nickname TEXT NOT NULL,
user_avatar TEXT NOT NULL,
user_url_token TEXT NOT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE INDEX idx_zhihu_content_content_id ON zhihu_content(content_id);
CREATE INDEX idx_zhihu_content_created_time ON zhihu_content(created_time);
-- ----------------------------
-- Table structure for zhihu_comment
-- ----------------------------
DROP TABLE IF EXISTS zhihu_comment;
CREATE TABLE zhihu_comment (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id TEXT NOT NULL,
parent_comment_id TEXT DEFAULT NULL,
content TEXT NOT NULL,
publish_time TEXT NOT NULL,
ip_location TEXT DEFAULT NULL,
sub_comment_count INTEGER NOT NULL DEFAULT 0,
like_count INTEGER NOT NULL DEFAULT 0,
dislike_count INTEGER NOT NULL DEFAULT 0,
content_id TEXT NOT NULL,
content_type TEXT NOT NULL,
user_id TEXT NOT NULL,
user_link TEXT NOT NULL,
user_nickname TEXT NOT NULL,
user_avatar TEXT NOT NULL,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE INDEX idx_zhihu_comment_comment_id ON zhihu_comment(comment_id);
CREATE INDEX idx_zhihu_comment_content_id ON zhihu_comment(content_id);
CREATE INDEX idx_zhihu_comment_publish_time ON zhihu_comment(publish_time);
-- ----------------------------
-- Table structure for zhihu_creator
-- ----------------------------
DROP TABLE IF EXISTS zhihu_creator;
CREATE TABLE zhihu_creator (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL UNIQUE,
user_link TEXT NOT NULL,
user_nickname TEXT NOT NULL,
user_avatar TEXT NOT NULL,
url_token TEXT NOT NULL,
gender TEXT DEFAULT NULL,
ip_location TEXT DEFAULT NULL,
follows INTEGER NOT NULL DEFAULT 0,
fans INTEGER NOT NULL DEFAULT 0,
anwser_count INTEGER NOT NULL DEFAULT 0,
video_count INTEGER NOT NULL DEFAULT 0,
question_count INTEGER NOT NULL DEFAULT 0,
article_count INTEGER NOT NULL DEFAULT 0,
column_count INTEGER NOT NULL DEFAULT 0,
get_voteup_count INTEGER NOT NULL DEFAULT 0,
add_ts INTEGER NOT NULL,
last_modify_ts INTEGER NOT NULL
);
CREATE UNIQUE INDEX idx_zhihu_creator_user_id ON zhihu_creator(user_id);

View File

@@ -28,6 +28,7 @@ class BiliStoreFactory:
"csv": BiliCsvStoreImplement,
"db": BiliDbStoreImplement,
"json": BiliJsonStoreImplement,
"sqlite": BiliSqliteStoreImplement,
}
@staticmethod
@@ -35,7 +36,7 @@ class BiliStoreFactory:
store_class = BiliStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json ..."
"[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ..."
)
return store_class()

View File

@@ -352,3 +352,114 @@ class BiliJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics")
class BiliSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Bilibili content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .bilibili_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
video_id = content_item.get("video_id")
video_detail: Dict = await query_content_by_content_id(content_id=video_id)
if not video_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(video_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Bilibili comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .bilibili_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Bilibili creator SQLite storage implementation
Args:
creator: creator item dict
Returns:
"""
from .bilibili_store_sql import (add_new_creator,
query_creator_by_creator_id,
update_creator_by_creator_id)
creator_id = creator.get("user_id")
creator_detail: Dict = await query_creator_by_creator_id(creator_id=creator_id)
if not creator_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_creator_id(creator_id, creator_item=creator)
async def store_contact(self, contact_item: Dict):
"""
Bilibili contact SQLite storage implementation
Args:
contact_item: contact item dict
Returns:
"""
from .bilibili_store_sql import (add_new_contact,
query_contact_by_up_and_fan,
update_contact_by_id, )
up_id = contact_item.get("up_id")
fan_id = contact_item.get("fan_id")
contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id)
if not contact_detail:
contact_item["add_ts"] = utils.get_current_timestamp()
await add_new_contact(contact_item)
else:
key_id = contact_detail.get("id")
await update_contact_by_id(id=key_id, contact_item=contact_item)
async def store_dynamic(self, dynamic_item):
"""
Bilibili dynamic SQLite storage implementation
Args:
dynamic_item: dynamic item dict
Returns:
"""
from .bilibili_store_sql import (add_new_dynamic,
query_dynamic_by_dynamic_id,
update_dynamic_by_dynamic_id)
dynamic_id = dynamic_item.get("dynamic_id")
dynamic_detail = await query_dynamic_by_dynamic_id(dynamic_id=dynamic_id)
if not dynamic_detail:
dynamic_item["add_ts"] = utils.get_current_timestamp()
await add_new_dynamic(dynamic_item)
else:
await update_dynamic_by_dynamic_id(dynamic_id, dynamic_item=dynamic_item)

View File

@@ -14,9 +14,10 @@
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -29,7 +30,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from bilibili_video where video_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -46,7 +47,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_video", content_item)
return last_row_id
@@ -61,7 +62,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_video", content_item, "video_id", content_id)
return effect_row
@@ -76,7 +77,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from bilibili_video_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -93,7 +94,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_video_comment", comment_item)
return last_row_id
@@ -108,7 +109,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_video_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -122,7 +123,7 @@ async def query_creator_by_creator_id(creator_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from bilibili_up_info where user_id = '{creator_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -139,7 +140,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_up_info", creator_item)
return last_row_id
@@ -154,7 +155,7 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id)
return effect_row
@@ -169,7 +170,7 @@ async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -186,7 +187,7 @@ async def add_new_contact(contact_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item)
return last_row_id
@@ -201,7 +202,7 @@ async def update_contact_by_id(id: str, contact_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id)
return effect_row
@@ -215,7 +216,7 @@ async def query_dynamic_by_dynamic_id(dynamic_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from bilibili_up_dynamic where dynamic_id = '{dynamic_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -232,7 +233,7 @@ async def add_new_dynamic(dynamic_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_up_dynamic", dynamic_item)
return last_row_id
@@ -247,6 +248,6 @@ async def update_dynamic_by_dynamic_id(dynamic_id: str, dynamic_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_up_dynamic", dynamic_item, "dynamic_id", dynamic_id)
return effect_row

View File

@@ -26,6 +26,7 @@ class DouyinStoreFactory:
"csv": DouyinCsvStoreImplement,
"db": DouyinDbStoreImplement,
"json": DouyinJsonStoreImplement,
"sqlite": DouyinSqliteStoreImplement
}
@staticmethod
@@ -33,7 +34,7 @@ class DouyinStoreFactory:
store_class = DouyinStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json ..."
"[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ..."
)
return store_class()

View File

@@ -257,4 +257,68 @@ class DouyinJsonStoreImplement(AbstractStore):
Returns:
"""
await self.save_data_to_json(save_item=creator, store_type="creator")
await self.save_data_to_json(save_item=creator, store_type="creator")
class DouyinSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Douyin content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .douyin_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
aweme_id = content_item.get("aweme_id")
aweme_detail: Dict = await query_content_by_content_id(content_id=aweme_id)
if not aweme_detail:
content_item["add_ts"] = utils.get_current_timestamp()
if content_item.get("title"):
await add_new_content(content_item)
else:
await update_content_by_content_id(aweme_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Douyin comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .douyin_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Douyin creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
from .douyin_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)

View File

@@ -14,9 +14,10 @@
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -29,7 +30,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from douyin_aweme where aweme_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -46,7 +47,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("douyin_aweme", content_item)
return last_row_id
@@ -61,7 +62,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("douyin_aweme", content_item, "aweme_id", content_id)
return effect_row
@@ -76,7 +77,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from douyin_aweme_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -93,7 +94,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("douyin_aweme_comment", comment_item)
return last_row_id
@@ -108,7 +109,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("douyin_aweme_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -122,7 +123,7 @@ async def query_creator_by_user_id(user_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from dy_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -139,7 +140,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("dy_creator", creator_item)
return last_row_id
@@ -154,6 +155,6 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("dy_creator", creator_item, "user_id", user_id)
return effect_row

View File

@@ -25,7 +25,8 @@ class KuaishouStoreFactory:
STORES = {
"csv": KuaishouCsvStoreImplement,
"db": KuaishouDbStoreImplement,
"json": KuaishouJsonStoreImplement
"json": KuaishouJsonStoreImplement,
"sqlite": KuaishouSqliteStoreImplement
}
@staticmethod
@@ -33,7 +34,7 @@ class KuaishouStoreFactory:
store_class = KuaishouStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()

View File

@@ -233,4 +233,58 @@ class KuaishouJsonStoreImplement(AbstractStore):
Returns:
"""
await self.save_data_to_json(creator, "creator")
await self.save_data_to_json(creator, "creator")
class KuaishouSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Kuaishou content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .kuaishou_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
video_id = content_item.get("video_id")
video_detail: Dict = await query_content_by_content_id(content_id=video_id)
if not video_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(video_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Kuaishou comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .kuaishou_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Kuaishou creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
pass

View File

@@ -14,9 +14,10 @@
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -29,7 +30,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from kuaishou_video where video_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -46,7 +47,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("kuaishou_video", content_item)
return last_row_id
@@ -61,7 +62,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("kuaishou_video", content_item, "video_id", content_id)
return effect_row
@@ -76,7 +77,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from kuaishou_video_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -93,7 +94,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("kuaishou_video_comment", comment_item)
return last_row_id
@@ -108,6 +109,6 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("kuaishou_video_comment", comment_item, "comment_id", comment_id)
return effect_row

View File

@@ -23,7 +23,8 @@ class TieBaStoreFactory:
STORES = {
"csv": TieBaCsvStoreImplement,
"db": TieBaDbStoreImplement,
"json": TieBaJsonStoreImplement
"json": TieBaJsonStoreImplement,
"sqlite": TieBaSqliteStoreImplement
}
@staticmethod

View File

@@ -254,3 +254,65 @@ class TieBaJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(creator, "creator")
class TieBaSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
tieba content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .tieba_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
tieba comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .tieba_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
tieba creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
from .tieba_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)

View File

@@ -10,9 +10,10 @@
# -*- coding: utf-8 -*-
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -25,7 +26,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from tieba_note where note_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -42,7 +43,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("tieba_note", content_item)
return last_row_id
@@ -57,7 +58,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("tieba_note", content_item, "note_id", content_id)
return effect_row
@@ -72,7 +73,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from tieba_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -89,7 +90,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("tieba_comment", comment_item)
return last_row_id
@@ -104,7 +105,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("tieba_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -118,7 +119,7 @@ async def query_creator_by_user_id(user_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from tieba_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -135,7 +136,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("tieba_creator", creator_item)
return last_row_id
@@ -150,6 +151,6 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("tieba_creator", creator_item, "user_id", user_id)
return effect_row

View File

@@ -28,6 +28,7 @@ class WeibostoreFactory:
"csv": WeiboCsvStoreImplement,
"db": WeiboDbStoreImplement,
"json": WeiboJsonStoreImplement,
"sqlite": WeiboSqliteStoreImplement,
}
@staticmethod
@@ -35,7 +36,7 @@ class WeibostoreFactory:
store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json ...")
"[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()

View File

@@ -260,3 +260,67 @@ class WeiboJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(creator, "creators")
class WeiboSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Weibo content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .weibo_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Weibo comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .weibo_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Weibo creator SQLite storage implementation
Args:
creator:
Returns:
"""
from .weibo_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)

View File

@@ -14,9 +14,10 @@
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -29,7 +30,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from weibo_note where note_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -46,7 +47,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("weibo_note", content_item)
return last_row_id
@@ -61,7 +62,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("weibo_note", content_item, "note_id", content_id)
return effect_row
@@ -76,7 +77,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from weibo_note_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -93,7 +94,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("weibo_note_comment", comment_item)
return last_row_id
@@ -108,7 +109,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("weibo_note_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -122,7 +123,7 @@ async def query_creator_by_user_id(user_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from weibo_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -139,7 +140,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("weibo_creator", creator_item)
return last_row_id
@@ -154,6 +155,6 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("weibo_creator", creator_item, "user_id", user_id)
return effect_row

View File

@@ -27,14 +27,15 @@ class XhsStoreFactory:
STORES = {
"csv": XhsCsvStoreImplement,
"db": XhsDbStoreImplement,
"json": XhsJsonStoreImplement
"json": XhsJsonStoreImplement,
"sqlite": XhsSqliteStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()

View File

@@ -255,3 +255,64 @@ class XhsJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(creator, "creator")
class XhsSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Xiaohongshu content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .xhs_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Xiaohongshu comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .xhs_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Xiaohongshu creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
from .xhs_store_sql import (add_new_creator, query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)

View File

@@ -14,9 +14,10 @@
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -29,7 +30,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from xhs_note where note_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -46,7 +47,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_note", content_item)
return last_row_id
@@ -61,7 +62,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_note", content_item, "note_id", content_id)
return effect_row
@@ -76,7 +77,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from xhs_note_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -93,7 +94,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_note_comment", comment_item)
return last_row_id
@@ -108,7 +109,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_note_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -122,7 +123,7 @@ async def query_creator_by_user_id(user_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from xhs_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -139,7 +140,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_creator", creator_item)
return last_row_id
@@ -154,6 +155,6 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_creator", creator_item, "user_id", user_id)
return effect_row

View File

@@ -17,7 +17,8 @@ from base.base_crawler import AbstractStore
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from store.zhihu.zhihu_store_impl import (ZhihuCsvStoreImplement,
ZhihuDbStoreImplement,
ZhihuJsonStoreImplement)
ZhihuJsonStoreImplement,
ZhihuSqliteStoreImplement)
from tools import utils
from var import source_keyword_var
@@ -26,14 +27,15 @@ class ZhihuStoreFactory:
STORES = {
"csv": ZhihuCsvStoreImplement,
"db": ZhihuDbStoreImplement,
"json": ZhihuJsonStoreImplement
"json": ZhihuJsonStoreImplement,
"sqlite": ZhihuSqliteStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()
async def batch_update_zhihu_contents(contents: List[ZhihuContent]):

View File

@@ -254,3 +254,65 @@ class ZhihuJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(creator, "creator")
class ZhihuSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Zhihu content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .zhihu_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Zhihu comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .zhihu_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Zhihu creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
from .zhihu_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)

View File

@@ -10,9 +10,10 @@
# -*- coding: utf-8 -*-
from typing import Dict, List
from typing import Dict, List, Union
from db import AsyncMysqlDB
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
@@ -25,7 +26,7 @@ async def query_content_by_content_id(content_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_content where content_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -42,7 +43,7 @@ async def add_new_content(content_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_content", content_item)
return last_row_id
@@ -57,7 +58,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_content", content_item, "content_id", content_id)
return effect_row
@@ -72,7 +73,7 @@ async def query_comment_by_comment_id(comment_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -89,7 +90,7 @@ async def add_new_comment(comment_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_comment", comment_item)
return last_row_id
@@ -104,7 +105,7 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_comment", comment_item, "comment_id", comment_id)
return effect_row
@@ -118,7 +119,7 @@ async def query_creator_by_user_id(user_id: str) -> Dict:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
@@ -135,7 +136,7 @@ async def add_new_creator(creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_creator", creator_item)
return last_row_id
@@ -150,6 +151,6 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_creator", creator_item, "user_id", user_id)
return effect_row

2170
uv.lock generated
View File

File diff suppressed because it is too large Load Diff