import asyncio import csv import json import os import pathlib from typing import Dict, List import aiofiles from tools.utils import utils class AsyncFileWriter: def __init__(self, platform: str, crawler_type: str): self.lock = asyncio.Lock() self.platform = platform self.crawler_type = crawler_type def _get_file_path(self, file_type: str, item_type: str) -> str: base_path = f"data/{self.platform}/{file_type}" pathlib.Path(base_path).mkdir(parents=True, exist_ok=True) file_name = f"{self.crawler_type}_{item_type}_{utils.get_current_date()}.{file_type}" return f"{base_path}/{file_name}" async def write_to_csv(self, item: Dict, item_type: str): file_path = self._get_file_path('csv', item_type) async with self.lock: file_exists = os.path.exists(file_path) async with aiofiles.open(file_path, 'a', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=item.keys()) if not file_exists or await f.tell() == 0: await writer.writeheader() await writer.writerow(item) async def write_single_item_to_json(self, item: Dict, item_type: str): file_path = self._get_file_path('json', item_type) async with self.lock: existing_data = [] if os.path.exists(file_path) and os.path.getsize(file_path) > 0: async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: try: content = await f.read() if content: existing_data = json.loads(content) if not isinstance(existing_data, list): existing_data = [existing_data] except json.JSONDecodeError: existing_data = [] existing_data.append(item) async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))