mirror of
https://github.com/NanmiCoder/MediaCrawler.git
synced 2026-02-17 12:31:07 +08:00
283 lines
10 KiB
Python
283 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2025 relakkes@gmail.com
|
|
#
|
|
# This file is part of MediaCrawler project.
|
|
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/api/services/crawler_manager.py
|
|
# GitHub: https://github.com/NanmiCoder
|
|
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
|
|
#
|
|
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
|
|
# 1. 不得用于任何商业用途。
|
|
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
|
|
# 3. 不得进行大规模爬取或对平台造成运营干扰。
|
|
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
|
|
# 5. 不得用于任何非法或不当的用途。
|
|
#
|
|
# 详细许可条款请参阅项目根目录下的LICENSE文件。
|
|
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
|
|
|
|
import asyncio
|
|
import subprocess
|
|
import signal
|
|
import os
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from ..schemas import CrawlerStartRequest, LogEntry
|
|
|
|
|
|
class CrawlerManager:
|
|
"""Crawler process manager"""
|
|
|
|
def __init__(self):
|
|
self._lock = asyncio.Lock()
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self.status = "idle"
|
|
self.started_at: Optional[datetime] = None
|
|
self.current_config: Optional[CrawlerStartRequest] = None
|
|
self._log_id = 0
|
|
self._logs: List[LogEntry] = []
|
|
self._read_task: Optional[asyncio.Task] = None
|
|
# Project root directory
|
|
self._project_root = Path(__file__).parent.parent.parent
|
|
# Log queue - for pushing to WebSocket
|
|
self._log_queue: Optional[asyncio.Queue] = None
|
|
|
|
@property
|
|
def logs(self) -> List[LogEntry]:
|
|
return self._logs
|
|
|
|
def get_log_queue(self) -> asyncio.Queue:
|
|
"""Get or create log queue"""
|
|
if self._log_queue is None:
|
|
self._log_queue = asyncio.Queue()
|
|
return self._log_queue
|
|
|
|
def _create_log_entry(self, message: str, level: str = "info") -> LogEntry:
|
|
"""Create log entry"""
|
|
self._log_id += 1
|
|
entry = LogEntry(
|
|
id=self._log_id,
|
|
timestamp=datetime.now().strftime("%H:%M:%S"),
|
|
level=level,
|
|
message=message
|
|
)
|
|
self._logs.append(entry)
|
|
# Keep last 500 logs
|
|
if len(self._logs) > 500:
|
|
self._logs = self._logs[-500:]
|
|
return entry
|
|
|
|
async def _push_log(self, entry: LogEntry):
|
|
"""Push log to queue"""
|
|
if self._log_queue is not None:
|
|
try:
|
|
self._log_queue.put_nowait(entry)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
def _parse_log_level(self, line: str) -> str:
|
|
"""Parse log level"""
|
|
line_upper = line.upper()
|
|
if "ERROR" in line_upper or "FAILED" in line_upper:
|
|
return "error"
|
|
elif "WARNING" in line_upper or "WARN" in line_upper:
|
|
return "warning"
|
|
elif "SUCCESS" in line_upper or "完成" in line or "成功" in line:
|
|
return "success"
|
|
elif "DEBUG" in line_upper:
|
|
return "debug"
|
|
return "info"
|
|
|
|
async def start(self, config: CrawlerStartRequest) -> bool:
|
|
"""Start crawler process"""
|
|
async with self._lock:
|
|
if self.process and self.process.poll() is None:
|
|
return False
|
|
|
|
# Clear old logs
|
|
self._logs = []
|
|
self._log_id = 0
|
|
|
|
# Clear pending queue (don't replace object to avoid WebSocket broadcast coroutine holding old queue reference)
|
|
if self._log_queue is None:
|
|
self._log_queue = asyncio.Queue()
|
|
else:
|
|
try:
|
|
while True:
|
|
self._log_queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
pass
|
|
|
|
# Build command line arguments
|
|
cmd = self._build_command(config)
|
|
|
|
# Log start information
|
|
entry = self._create_log_entry(f"Starting crawler: {' '.join(cmd)}", "info")
|
|
await self._push_log(entry)
|
|
|
|
try:
|
|
# Start subprocess
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding='utf-8',
|
|
bufsize=1,
|
|
cwd=str(self._project_root),
|
|
env={**os.environ, "PYTHONUNBUFFERED": "1"}
|
|
)
|
|
|
|
self.status = "running"
|
|
self.started_at = datetime.now()
|
|
self.current_config = config
|
|
|
|
entry = self._create_log_entry(
|
|
f"Crawler started on platform: {config.platform.value}, type: {config.crawler_type.value}",
|
|
"success"
|
|
)
|
|
await self._push_log(entry)
|
|
|
|
# Start log reading task
|
|
self._read_task = asyncio.create_task(self._read_output())
|
|
|
|
return True
|
|
except Exception as e:
|
|
self.status = "error"
|
|
entry = self._create_log_entry(f"Failed to start crawler: {str(e)}", "error")
|
|
await self._push_log(entry)
|
|
return False
|
|
|
|
async def stop(self) -> bool:
|
|
"""Stop crawler process"""
|
|
async with self._lock:
|
|
if not self.process or self.process.poll() is not None:
|
|
return False
|
|
|
|
self.status = "stopping"
|
|
entry = self._create_log_entry("Sending SIGTERM to crawler process...", "warning")
|
|
await self._push_log(entry)
|
|
|
|
try:
|
|
self.process.send_signal(signal.SIGTERM)
|
|
|
|
# Wait for graceful exit (up to 15 seconds)
|
|
for _ in range(30):
|
|
if self.process.poll() is not None:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
# If still not exited, force kill
|
|
if self.process.poll() is None:
|
|
entry = self._create_log_entry("Process not responding, sending SIGKILL...", "warning")
|
|
await self._push_log(entry)
|
|
self.process.kill()
|
|
|
|
entry = self._create_log_entry("Crawler process terminated", "info")
|
|
await self._push_log(entry)
|
|
|
|
except Exception as e:
|
|
entry = self._create_log_entry(f"Error stopping crawler: {str(e)}", "error")
|
|
await self._push_log(entry)
|
|
|
|
self.status = "idle"
|
|
self.current_config = None
|
|
|
|
# Cancel log reading task
|
|
if self._read_task:
|
|
self._read_task.cancel()
|
|
self._read_task = None
|
|
|
|
return True
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current status"""
|
|
return {
|
|
"status": self.status,
|
|
"platform": self.current_config.platform.value if self.current_config else None,
|
|
"crawler_type": self.current_config.crawler_type.value if self.current_config else None,
|
|
"started_at": self.started_at.isoformat() if self.started_at else None,
|
|
"error_message": None
|
|
}
|
|
|
|
def _build_command(self, config: CrawlerStartRequest) -> list:
|
|
"""Build main.py command line arguments"""
|
|
cmd = ["uv", "run", "python", "main.py"]
|
|
|
|
cmd.extend(["--platform", config.platform.value])
|
|
cmd.extend(["--lt", config.login_type.value])
|
|
cmd.extend(["--type", config.crawler_type.value])
|
|
cmd.extend(["--save_data_option", config.save_option.value])
|
|
|
|
# Pass different arguments based on crawler type
|
|
if config.crawler_type.value == "search" and config.keywords:
|
|
cmd.extend(["--keywords", config.keywords])
|
|
elif config.crawler_type.value == "detail" and config.specified_ids:
|
|
cmd.extend(["--specified_id", config.specified_ids])
|
|
elif config.crawler_type.value == "creator" and config.creator_ids:
|
|
cmd.extend(["--creator_id", config.creator_ids])
|
|
|
|
if config.start_page != 1:
|
|
cmd.extend(["--start", str(config.start_page)])
|
|
|
|
cmd.extend(["--get_comment", "true" if config.enable_comments else "false"])
|
|
cmd.extend(["--get_sub_comment", "true" if config.enable_sub_comments else "false"])
|
|
|
|
if config.cookies:
|
|
cmd.extend(["--cookies", config.cookies])
|
|
|
|
cmd.extend(["--headless", "true" if config.headless else "false"])
|
|
|
|
return cmd
|
|
|
|
async def _read_output(self):
|
|
"""Asynchronously read process output"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
while self.process and self.process.poll() is None:
|
|
# Read a line in thread pool
|
|
line = await loop.run_in_executor(
|
|
None, self.process.stdout.readline
|
|
)
|
|
if line:
|
|
line = line.strip()
|
|
if line:
|
|
level = self._parse_log_level(line)
|
|
entry = self._create_log_entry(line, level)
|
|
await self._push_log(entry)
|
|
|
|
# Read remaining output
|
|
if self.process and self.process.stdout:
|
|
remaining = await loop.run_in_executor(
|
|
None, self.process.stdout.read
|
|
)
|
|
if remaining:
|
|
for line in remaining.strip().split('\n'):
|
|
if line.strip():
|
|
level = self._parse_log_level(line)
|
|
entry = self._create_log_entry(line.strip(), level)
|
|
await self._push_log(entry)
|
|
|
|
# Process ended
|
|
if self.status == "running":
|
|
exit_code = self.process.returncode if self.process else -1
|
|
if exit_code == 0:
|
|
entry = self._create_log_entry("Crawler completed successfully", "success")
|
|
else:
|
|
entry = self._create_log_entry(f"Crawler exited with code: {exit_code}", "warning")
|
|
await self._push_log(entry)
|
|
self.status = "idle"
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
entry = self._create_log_entry(f"Error reading output: {str(e)}", "error")
|
|
await self._push_log(entry)
|
|
|
|
|
|
# Global singleton
|
|
crawler_manager = CrawlerManager()
|