# -*- coding: utf-8 -*- # Copyright (c) 2025 relakkes@gmail.com # # This file is part of MediaCrawler project. # Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/api/services/crawler_manager.py # GitHub: https://github.com/NanmiCoder # Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 # # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 1. 不得用于任何商业用途。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 3. 不得进行大规模爬取或对平台造成运营干扰。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 # # 详细许可条款请参阅项目根目录下的LICENSE文件。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 import asyncio import subprocess import signal import os from typing import Optional, List from datetime import datetime from pathlib import Path from ..schemas import CrawlerStartRequest, LogEntry class CrawlerManager: """Crawler process manager""" def __init__(self): self._lock = asyncio.Lock() self.process: Optional[subprocess.Popen] = None self.status = "idle" self.started_at: Optional[datetime] = None self.current_config: Optional[CrawlerStartRequest] = None self._log_id = 0 self._logs: List[LogEntry] = [] self._read_task: Optional[asyncio.Task] = None # Project root directory self._project_root = Path(__file__).parent.parent.parent # Log queue - for pushing to WebSocket self._log_queue: Optional[asyncio.Queue] = None @property def logs(self) -> List[LogEntry]: return self._logs def get_log_queue(self) -> asyncio.Queue: """Get or create log queue""" if self._log_queue is None: self._log_queue = asyncio.Queue() return self._log_queue def _create_log_entry(self, message: str, level: str = "info") -> LogEntry: """Create log entry""" self._log_id += 1 entry = LogEntry( id=self._log_id, timestamp=datetime.now().strftime("%H:%M:%S"), level=level, message=message ) self._logs.append(entry) # Keep last 500 logs if len(self._logs) > 500: self._logs = self._logs[-500:] return entry async def _push_log(self, entry: LogEntry): """Push log to queue""" if self._log_queue is not None: try: self._log_queue.put_nowait(entry) except asyncio.QueueFull: pass def _parse_log_level(self, line: str) -> str: """Parse log level""" line_upper = line.upper() if "ERROR" in line_upper or "FAILED" in line_upper: return "error" elif "WARNING" in line_upper or "WARN" in line_upper: return "warning" elif "SUCCESS" in line_upper or "完成" in line or "成功" in line: return "success" elif "DEBUG" in line_upper: return "debug" return "info" async def start(self, config: CrawlerStartRequest) -> bool: """Start crawler process""" async with self._lock: if self.process and self.process.poll() is None: return False # Clear old logs self._logs = [] self._log_id = 0 # Clear pending queue (don't replace object to avoid WebSocket broadcast coroutine holding old queue reference) if self._log_queue is None: self._log_queue = asyncio.Queue() else: try: while True: self._log_queue.get_nowait() except asyncio.QueueEmpty: pass # Build command line arguments cmd = self._build_command(config) # Log start information entry = self._create_log_entry(f"Starting crawler: {' '.join(cmd)}", "info") await self._push_log(entry) try: # Start subprocess self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', bufsize=1, cwd=str(self._project_root), env={**os.environ, "PYTHONUNBUFFERED": "1"} ) self.status = "running" self.started_at = datetime.now() self.current_config = config entry = self._create_log_entry( f"Crawler started on platform: {config.platform.value}, type: {config.crawler_type.value}", "success" ) await self._push_log(entry) # Start log reading task self._read_task = asyncio.create_task(self._read_output()) return True except Exception as e: self.status = "error" entry = self._create_log_entry(f"Failed to start crawler: {str(e)}", "error") await self._push_log(entry) return False async def stop(self) -> bool: """Stop crawler process""" async with self._lock: if not self.process or self.process.poll() is not None: return False self.status = "stopping" entry = self._create_log_entry("Sending SIGTERM to crawler process...", "warning") await self._push_log(entry) try: self.process.send_signal(signal.SIGTERM) # Wait for graceful exit (up to 15 seconds) for _ in range(30): if self.process.poll() is not None: break await asyncio.sleep(0.5) # If still not exited, force kill if self.process.poll() is None: entry = self._create_log_entry("Process not responding, sending SIGKILL...", "warning") await self._push_log(entry) self.process.kill() entry = self._create_log_entry("Crawler process terminated", "info") await self._push_log(entry) except Exception as e: entry = self._create_log_entry(f"Error stopping crawler: {str(e)}", "error") await self._push_log(entry) self.status = "idle" self.current_config = None # Cancel log reading task if self._read_task: self._read_task.cancel() self._read_task = None return True def get_status(self) -> dict: """Get current status""" return { "status": self.status, "platform": self.current_config.platform.value if self.current_config else None, "crawler_type": self.current_config.crawler_type.value if self.current_config else None, "started_at": self.started_at.isoformat() if self.started_at else None, "error_message": None } def _build_command(self, config: CrawlerStartRequest) -> list: """Build main.py command line arguments""" cmd = ["uv", "run", "python", "main.py"] cmd.extend(["--platform", config.platform.value]) cmd.extend(["--lt", config.login_type.value]) cmd.extend(["--type", config.crawler_type.value]) cmd.extend(["--save_data_option", config.save_option.value]) # Pass different arguments based on crawler type if config.crawler_type.value == "search" and config.keywords: cmd.extend(["--keywords", config.keywords]) elif config.crawler_type.value == "detail" and config.specified_ids: cmd.extend(["--specified_id", config.specified_ids]) elif config.crawler_type.value == "creator" and config.creator_ids: cmd.extend(["--creator_id", config.creator_ids]) if config.start_page != 1: cmd.extend(["--start", str(config.start_page)]) cmd.extend(["--get_comment", "true" if config.enable_comments else "false"]) cmd.extend(["--get_sub_comment", "true" if config.enable_sub_comments else "false"]) if config.cookies: cmd.extend(["--cookies", config.cookies]) cmd.extend(["--headless", "true" if config.headless else "false"]) return cmd async def _read_output(self): """Asynchronously read process output""" loop = asyncio.get_event_loop() try: while self.process and self.process.poll() is None: # Read a line in thread pool line = await loop.run_in_executor( None, self.process.stdout.readline ) if line: line = line.strip() if line: level = self._parse_log_level(line) entry = self._create_log_entry(line, level) await self._push_log(entry) # Read remaining output if self.process and self.process.stdout: remaining = await loop.run_in_executor( None, self.process.stdout.read ) if remaining: for line in remaining.strip().split('\n'): if line.strip(): level = self._parse_log_level(line) entry = self._create_log_entry(line.strip(), level) await self._push_log(entry) # Process ended if self.status == "running": exit_code = self.process.returncode if self.process else -1 if exit_code == 0: entry = self._create_log_entry("Crawler completed successfully", "success") else: entry = self._create_log_entry(f"Crawler exited with code: {exit_code}", "warning") await self._push_log(entry) self.status = "idle" except asyncio.CancelledError: pass except Exception as e: entry = self._create_log_entry(f"Error reading output: {str(e)}", "error") await self._push_log(entry) # Global singleton crawler_manager = CrawlerManager()