123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import time
- from typing import Any, Dict, Tuple, Callable
- from pydantic import BaseModel
- from applications.api import AsyncApolloApi
- from applications.utils import CrawlerMetaArticle
- from applications.utils import CrawlerMetaAccount
- class CrawlerPipeline(AsyncApolloApi):
- MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
- "article": (CrawlerMetaArticle, "crawler_meta_article"),
- "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
- # 如后续有新类型,直接在这里加即可
- }
- def __init__(self, pool, log_client):
- super().__init__()
- self.pool = pool
- self.log_client = log_client
- async def whether_title_sensitive(self, title: str) -> bool:
- sensitive_word_list = await self.get_config_value("sensitive_word_list")
- for word in sensitive_word_list:
- if word in title:
- return True
- return False
- async def whether_article_title_duplicate(self, title: str) -> bool:
- query = f"""select article_id from crawler_meta_article where title = %s;"""
- duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
- return True if duplicated_id else False
- async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
- query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
- duplicated_id = await self.pool.async_fetch(
- query=query, params=(account_id, media_type)
- )
- return True if duplicated_id else False
- async def save_single_record(self, media_type: str, item: dict) -> None:
- try:
- model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
- except KeyError:
- raise ValueError(f"Unknown media type: {media_type!r}")
- record = model_cls(**item).model_dump(mode="python")
- insert_data = {k: v for k, v in record.items() if v is not None}
- if not insert_data:
- raise ValueError("All fields are None, nothing to insert")
- columns = ", ".join(f"`{col}`" for col in insert_data)
- placeholders = ", ".join(["%s"] * len(insert_data))
- sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
- await self.pool.async_save(sql, tuple(insert_data.values()))
- async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
- """deal function"""
- item["trace_id"] = trace_id
- match media_type:
- case "video":
- await self.save_single_record(media_type, item)
- case "article":
- log_data = {
- "title": item["title"],
- "platform": item["platform"],
- "mode": item["mode"],
- "source": item["category"],
- }
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "start crawler article",
- "code": 1001,
- }
- )
- if await self.whether_article_title_duplicate(log_data["title"]):
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "duplicate article title",
- "code": 1002,
- }
- )
- return
- if await self.whether_title_sensitive(item["title"]):
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "title_sensitive",
- "code": 1003,
- }
- )
- item["title_sensitive"] = 1
- await self.save_single_record(media_type, item)
- await self.log_client.log(
- contents={
- "trace_id": trace_id,
- "function": "save_article",
- "data": item,
- "message": "save article successfully",
- }
- )
- case "account":
- if await self.whether_account_exist(
- item["account_id"], item["media_type"]
- ):
- return
- await self.save_single_record(media_type, item)
- await self.log_client.log(
- contents={
- "trace_id": trace_id,
- "function": "save_account",
- "data": item,
- "message": "save account successfully",
- }
- )
- case _:
- raise Exception("Unknown media type")
|