import time from typing import Any, Dict, Tuple, Callable from pydantic import BaseModel from applications.api import AsyncApolloApi from applications.utils import CrawlerMetaArticle from applications.utils import CrawlerMetaAccount class CrawlerPipeline(AsyncApolloApi): MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = { "article": (CrawlerMetaArticle, "crawler_meta_article"), "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"), # 如后续有新类型,直接在这里加即可 } def __init__(self, pool, log_client): super().__init__() self.pool = pool self.log_client = log_client async def whether_title_sensitive(self, title: str) -> bool: sensitive_word_list = await self.get_config_value("sensitive_word_list") for word in sensitive_word_list: if word in title: return True return False async def whether_article_title_duplicate(self, title: str) -> bool: query = f"""select article_id from crawler_meta_article where title = %s;""" duplicated_id = await self.pool.async_fetch(query=query, params=(title,)) return True if duplicated_id else False async def whether_account_exist(self, account_id: str, media_type: str) -> bool: query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;" duplicated_id = await self.pool.async_fetch( query=query, params=(account_id, media_type) ) return True if duplicated_id else False async def save_single_record(self, media_type: str, item: dict) -> None: try: model_cls, table_name = self.MODEL_TABLE_MAP[media_type] except KeyError: raise ValueError(f"Unknown media type: {media_type!r}") record = model_cls(**item).model_dump(mode="python") insert_data = {k: v for k, v in record.items() if v is not None} if not insert_data: raise ValueError("All fields are None, nothing to insert") columns = ", ".join(f"`{col}`" for col in insert_data) placeholders = ", ".join(["%s"] * len(insert_data)) sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})" await self.pool.async_save(sql, tuple(insert_data.values())) async def save_item_to_database(self, media_type: str, item: dict, trace_id: str): """deal function""" item["trace_id"] = trace_id match media_type: case "video": await self.save_single_record(media_type, item) case "article": log_data = { "title": item["title"], "platform": item["platform"], "mode": item["mode"], "source": item["category"], } await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "start crawler article", "code": 1001, } ) if await self.whether_article_title_duplicate(log_data["title"]): await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "duplicate article title", "code": 1002, } ) return if await self.whether_title_sensitive(item["title"]): await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "title_sensitive", "code": 1003, } ) item["title_sensitive"] = 1 await self.save_single_record(media_type, item) await self.log_client.log( contents={ "trace_id": trace_id, "function": "save_article", "data": item, "message": "save article successfully", } ) case "account": if await self.whether_account_exist( item["account_id"], item["media_type"] ): return await self.save_single_record(media_type, item) await self.log_client.log( contents={ "trace_id": trace_id, "function": "save_account", "data": item, "message": "save account successfully", } ) case _: raise Exception("Unknown media type")