# -*- coding: utf-8 -*- # Copyright (c) 2025 relakkes@gmail.com # # This file is part of MediaCrawler project. # Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/api/routers/data.py # GitHub: https://github.com/NanmiCoder # Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 # # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 1. 不得用于任何商业用途。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 3. 不得进行大规模爬取或对平台造成运营干扰。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 # # 详细许可条款请参阅项目根目录下的LICENSE文件。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 import os import json from pathlib import Path from typing import Optional from fastapi import APIRouter, HTTPException from fastapi.responses import FileResponse router = APIRouter(prefix="/data", tags=["data"]) # 数据目录 DATA_DIR = Path(__file__).parent.parent.parent / "data" def get_file_info(file_path: Path) -> dict: """获取文件信息""" stat = file_path.stat() record_count = None # 尝试获取记录数 try: if file_path.suffix == ".json": with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): record_count = len(data) elif file_path.suffix == ".csv": with open(file_path, "r", encoding="utf-8") as f: record_count = sum(1 for _ in f) - 1 # 减去标题行 except Exception: pass return { "name": file_path.name, "path": str(file_path.relative_to(DATA_DIR)), "size": stat.st_size, "modified_at": stat.st_mtime, "record_count": record_count, "type": file_path.suffix[1:] if file_path.suffix else "unknown" } @router.get("/files") async def list_data_files(platform: Optional[str] = None, file_type: Optional[str] = None): """获取数据文件列表""" if not DATA_DIR.exists(): return {"files": []} files = [] supported_extensions = {".json", ".csv", ".xlsx", ".xls"} for root, dirs, filenames in os.walk(DATA_DIR): root_path = Path(root) for filename in filenames: file_path = root_path / filename if file_path.suffix.lower() not in supported_extensions: continue # 平台过滤 if platform: rel_path = str(file_path.relative_to(DATA_DIR)) if platform.lower() not in rel_path.lower(): continue # 类型过滤 if file_type and file_path.suffix[1:].lower() != file_type.lower(): continue try: files.append(get_file_info(file_path)) except Exception: continue # 按修改时间排序(最新的在前) files.sort(key=lambda x: x["modified_at"], reverse=True) return {"files": files} @router.get("/files/{file_path:path}") async def get_file_content(file_path: str, preview: bool = True, limit: int = 100): """获取文件内容或预览""" full_path = DATA_DIR / file_path if not full_path.exists(): raise HTTPException(status_code=404, detail="File not found") if not full_path.is_file(): raise HTTPException(status_code=400, detail="Not a file") # 安全检查:确保在 DATA_DIR 内 try: full_path.resolve().relative_to(DATA_DIR.resolve()) except ValueError: raise HTTPException(status_code=403, detail="Access denied") if preview: # 返回预览数据 try: if full_path.suffix == ".json": with open(full_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return {"data": data[:limit], "total": len(data)} return {"data": data, "total": 1} elif full_path.suffix == ".csv": import csv with open(full_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) rows = [] for i, row in enumerate(reader): if i >= limit: break rows.append(row) # 重新读取获取总数 f.seek(0) total = sum(1 for _ in f) - 1 return {"data": rows, "total": total} elif full_path.suffix.lower() in (".xlsx", ".xls"): import pandas as pd # 读取前 limit 行 df = pd.read_excel(full_path, nrows=limit) # 获取总行数(只读取第一列来节省内存) df_count = pd.read_excel(full_path, usecols=[0]) total = len(df_count) # 转换为字典列表,处理 NaN 值 rows = df.where(pd.notnull(df), None).to_dict(orient='records') return { "data": rows, "total": total, "columns": list(df.columns) } else: raise HTTPException(status_code=400, detail="Unsupported file type for preview") except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON file") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: # 返回文件下载 return FileResponse( path=full_path, filename=full_path.name, media_type="application/octet-stream" ) @router.get("/download/{file_path:path}") async def download_file(file_path: str): """下载文件""" full_path = DATA_DIR / file_path if not full_path.exists(): raise HTTPException(status_code=404, detail="File not found") if not full_path.is_file(): raise HTTPException(status_code=400, detail="Not a file") # 安全检查 try: full_path.resolve().relative_to(DATA_DIR.resolve()) except ValueError: raise HTTPException(status_code=403, detail="Access denied") return FileResponse( path=full_path, filename=full_path.name, media_type="application/octet-stream" ) @router.get("/stats") async def get_data_stats(): """获取数据统计""" if not DATA_DIR.exists(): return {"total_files": 0, "total_size": 0, "by_platform": {}, "by_type": {}} stats = { "total_files": 0, "total_size": 0, "by_platform": {}, "by_type": {} } supported_extensions = {".json", ".csv", ".xlsx", ".xls"} for root, dirs, filenames in os.walk(DATA_DIR): root_path = Path(root) for filename in filenames: file_path = root_path / filename if file_path.suffix.lower() not in supported_extensions: continue try: stat = file_path.stat() stats["total_files"] += 1 stats["total_size"] += stat.st_size # 按类型统计 file_type = file_path.suffix[1:].lower() stats["by_type"][file_type] = stats["by_type"].get(file_type, 0) + 1 # 按平台统计(从路径推断) rel_path = str(file_path.relative_to(DATA_DIR)) for platform in ["xhs", "dy", "ks", "bili", "wb", "tieba", "zhihu"]: if platform in rel_path.lower(): stats["by_platform"][platform] = stats["by_platform"].get(platform, 0) + 1 break except Exception: continue return stats