From 8a0fd49b960ef4dbe8df76fc0a4c28160d51fc0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=91=98=E9=98=BF=E6=B1=9F=28Relakkes?= =?UTF-8?q?=29?= Date: Mon, 15 Dec 2025 18:06:57 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=8A=BD=E7=A6=BB=E5=BA=94?= =?UTF-8?q?=E7=94=A8=20runner=20=E5=B9=B6=E4=BC=98=E5=8C=96=E9=80=80?= =?UTF-8?q?=E5=87=BA=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 tools/app_runner.py 统一信号/取消/清理超时逻辑 - main.py 精简为业务入口与资源清理实现 - CDPBrowserManager 不再覆盖已有 SIGINT/SIGTERM 处理器 --- main.py | 133 ++++++++++++++++++------------------------- tools/app_runner.py | 109 +++++++++++++++++++++++++++++++++++ tools/cdp_browser.py | 25 ++++++-- 3 files changed, 186 insertions(+), 81 deletions(-) create mode 100644 tools/app_runner.py diff --git a/main.py b/main.py index 4bf2268..1eb52a9 100644 --- a/main.py +++ b/main.py @@ -19,9 +19,7 @@ import asyncio -import sys -import signal -from typing import Optional +from typing import Optional, Type import cmd_arg import config @@ -39,7 +37,7 @@ from var import crawler_type_var class CrawlerFactory: - CRAWLERS = { + CRAWLERS: dict[str, Type[AbstractCrawler]] = { "xhs": XiaoHongShuCrawler, "dy": DouYinCrawler, "ks": KuaishouCrawler, @@ -53,115 +51,96 @@ class CrawlerFactory: def create_crawler(platform: str) -> AbstractCrawler: crawler_class = CrawlerFactory.CRAWLERS.get(platform) if not crawler_class: - raise ValueError( - "Invalid Media Platform Currently only supported xhs or dy or ks or bili ..." - ) + supported = ", ".join(sorted(CrawlerFactory.CRAWLERS)) + raise ValueError(f"Invalid media platform: {platform!r}. Supported: {supported}") return crawler_class() crawler: Optional[AbstractCrawler] = None -# persist-1 -# 原因:增加 --init_db 功能,用于数据库初始化。 -# 副作用:无 -# 回滚策略:还原此文件。 -async def main(): - # Init crawler +def _flush_excel_if_needed() -> None: + if config.SAVE_DATA_OPTION != "excel": + return + + try: + from store.excel_store_base import ExcelStoreBase + + ExcelStoreBase.flush_all() + print("[Main] Excel files saved successfully") + except Exception as e: + print(f"[Main] Error flushing Excel data: {e}") + + +async def _generate_wordcloud_if_needed() -> None: + if config.SAVE_DATA_OPTION != "json" or not config.ENABLE_GET_WORDCLOUD: + return + + try: + file_writer = AsyncFileWriter( + platform=config.PLATFORM, + crawler_type=crawler_type_var.get(), + ) + await file_writer.generate_wordcloud_from_comments() + except Exception as e: + print(f"[Main] Error generating wordcloud: {e}") + + +async def main() -> None: global crawler - # parse cmd args = await cmd_arg.parse_cmd() - - # init db if args.init_db: await db.init_db(args.init_db) print(f"Database {args.init_db} initialized successfully.") - return # Exit the main function cleanly - - + return crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) await crawler.start() - # Flush Excel data if using Excel export - if config.SAVE_DATA_OPTION == "excel": - try: - from store.excel_store_base import ExcelStoreBase - ExcelStoreBase.flush_all() - print("[Main] Excel files saved successfully") - except Exception as e: - print(f"[Main] Error flushing Excel data: {e}") + _flush_excel_if_needed() # Generate wordcloud after crawling is complete # Only for JSON save mode - if config.SAVE_DATA_OPTION == "json" and config.ENABLE_GET_WORDCLOUD: - try: - file_writer = AsyncFileWriter( - platform=config.PLATFORM, - crawler_type=crawler_type_var.get() - ) - await file_writer.generate_wordcloud_from_comments() - except Exception as e: - print(f"Error generating wordcloud: {e}") + await _generate_wordcloud_if_needed() -async def async_cleanup(): - """异步清理函数,用于处理CDP浏览器等异步资源""" +async def async_cleanup() -> None: global crawler if crawler: - # 检查并清理CDP浏览器 - if hasattr(crawler, 'cdp_manager') and crawler.cdp_manager: + if getattr(crawler, "cdp_manager", None): try: - await crawler.cdp_manager.cleanup(force=True) # 强制清理浏览器进程 + await crawler.cdp_manager.cleanup(force=True) except Exception as e: - # 只在非预期错误时打印 error_msg = str(e).lower() if "closed" not in error_msg and "disconnected" not in error_msg: print(f"[Main] 清理CDP浏览器时出错: {e}") - # 检查并清理标准浏览器上下文(仅在非CDP模式下) - elif hasattr(crawler, 'browser_context') and crawler.browser_context: + elif getattr(crawler, "browser_context", None): try: - # 检查上下文是否仍然打开 - if hasattr(crawler.browser_context, 'pages'): - await crawler.browser_context.close() + await crawler.browser_context.close() except Exception as e: - # 只在非预期错误时打印 error_msg = str(e).lower() if "closed" not in error_msg and "disconnected" not in error_msg: print(f"[Main] 关闭浏览器上下文时出错: {e}") - # 关闭数据库连接 - if config.SAVE_DATA_OPTION in ["db", "sqlite"]: + if config.SAVE_DATA_OPTION in ("db", "sqlite"): await db.close() -def cleanup(): - """同步清理函数""" - try: - # 创建新的事件循环来执行异步清理 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(async_cleanup()) - loop.close() - except Exception as e: - print(f"[Main] 清理时出错: {e}") - - -def signal_handler(signum, _frame): - """信号处理器,处理Ctrl+C等中断信号""" - print(f"\n[Main] 收到中断信号 {signum},正在清理资源...") - cleanup() - sys.exit(0) - if __name__ == "__main__": - # 注册信号处理器 - signal.signal(signal.SIGINT, signal_handler) # Ctrl+C - signal.signal(signal.SIGTERM, signal_handler) # 终止信号 + from tools.app_runner import run - try: - asyncio.get_event_loop().run_until_complete(main()) - except KeyboardInterrupt: - print("\n[Main] 收到键盘中断,正在清理资源...") - finally: - cleanup() + def _force_stop() -> None: + c = crawler + if not c: + return + cdp_manager = getattr(c, "cdp_manager", None) + launcher = getattr(cdp_manager, "launcher", None) + if not launcher: + return + try: + launcher.cleanup() + except Exception: + pass + + run(main, async_cleanup, cleanup_timeout_seconds=15.0, on_first_interrupt=_force_stop) diff --git a/tools/app_runner.py b/tools/app_runner.py new file mode 100644 index 0000000..4a38c71 --- /dev/null +++ b/tools/app_runner.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 relakkes@gmail.com +# +# This file is part of MediaCrawler project. +# Repository: https://github.com/NanmiCoder/MediaCrawler +# GitHub: https://github.com/NanmiCoder +# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 +# + +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 5. 不得用于任何非法或不当的用途。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 + + +from __future__ import annotations + +import asyncio +import os +import signal +from collections.abc import Awaitable, Callable +from typing import Optional + +AsyncFn = Callable[[], Awaitable[None]] + + +def run( + app_main: AsyncFn, + app_cleanup: AsyncFn, + *, + cleanup_timeout_seconds: float = 15.0, + on_first_interrupt: Optional[Callable[[], None]] = None, + force_exit_code: int = 130, +) -> None: + async def _cleanup_with_timeout() -> None: + try: + await asyncio.wait_for(asyncio.shield(app_cleanup()), timeout=cleanup_timeout_seconds) + except asyncio.TimeoutError: + print(f"[Main] 清理超时({cleanup_timeout_seconds}s),跳过剩余清理。") + + async def _cancel_remaining_tasks(timeout_seconds: float = 2.0) -> None: + current = asyncio.current_task() + tasks = [t for t in asyncio.all_tasks() if t is not current and not t.done()] + if not tasks: + return + for t in tasks: + t.cancel() + try: + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=timeout_seconds, + ) + except asyncio.TimeoutError: + pass + + async def _runner() -> None: + loop = asyncio.get_running_loop() + runner_task = asyncio.current_task() + if runner_task is None: + raise RuntimeError("Runner task not found") + + shutdown_requested = False + + def _on_signal(signum: int) -> None: + nonlocal shutdown_requested + + if shutdown_requested: + print("[Main] 再次收到中断信号,强制退出。") + os._exit(force_exit_code) + + shutdown_requested = True + print(f"\n[Main] 收到中断信号 {signum},正在退出(清理最多{cleanup_timeout_seconds}s)...") + + if on_first_interrupt is not None: + try: + on_first_interrupt() + except Exception: + pass + + runner_task.cancel() + + try: + loop.add_signal_handler(signal.SIGINT, _on_signal, signal.SIGINT) + loop.add_signal_handler(signal.SIGTERM, _on_signal, signal.SIGTERM) + except NotImplementedError: + signal.signal(signal.SIGINT, lambda signum, _frame: _on_signal(signum)) + signal.signal(signal.SIGTERM, lambda signum, _frame: _on_signal(signum)) + + cancelled = False + try: + await app_main() + except asyncio.CancelledError: + cancelled = True + finally: + try: + await _cleanup_with_timeout() + except Exception as e: + print(f"[Main] 清理时出错: {e}") + await _cancel_remaining_tasks() + + if cancelled: + return + + asyncio.run(_runner()) diff --git a/tools/cdp_browser.py b/tools/cdp_browser.py index d8a454d..2375733 100644 --- a/tools/cdp_browser.py +++ b/tools/cdp_browser.py @@ -60,19 +60,36 @@ class CDPBrowserManager: # 注册atexit清理 atexit.register(sync_cleanup) - # 注册信号处理器 + # 注册信号处理器(仅在没有自定义处理器时注册,避免覆盖主入口的信号处理逻辑) + prev_sigint = signal.getsignal(signal.SIGINT) + prev_sigterm = signal.getsignal(signal.SIGTERM) + def signal_handler(signum, frame): """信号处理器""" utils.logger.info(f"[CDPBrowserManager] 收到信号 {signum},清理浏览器进程") if self.launcher and self.launcher.browser_process: self.launcher.cleanup() - # 重新引发KeyboardInterrupt以便正常退出流程 + if signum == signal.SIGINT: + if prev_sigint == signal.default_int_handler: + return prev_sigint(signum, frame) raise KeyboardInterrupt + raise SystemExit(0) + + install_sigint = prev_sigint in (signal.default_int_handler, signal.SIG_DFL) + install_sigterm = prev_sigterm == signal.SIG_DFL + # 注册SIGINT (Ctrl+C) 和 SIGTERM - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + if install_sigint: + signal.signal(signal.SIGINT, signal_handler) + else: + utils.logger.info("[CDPBrowserManager] 已存在SIGINT处理器,跳过注册以避免覆盖") + + if install_sigterm: + signal.signal(signal.SIGTERM, signal_handler) + else: + utils.logger.info("[CDPBrowserManager] 已存在SIGTERM处理器,跳过注册以避免覆盖") self._cleanup_registered = True utils.logger.info("[CDPBrowserManager] 清理处理器已注册")