import time from applications.api import AsyncApolloApi from applications.utils import CrawlerMetaArticle class CrawlerPipeline(AsyncApolloApi): def __init__(self, pool, log_client): super().__init__() self.pool = pool self.log_client = log_client async def whether_title_sensitive(self, title: str) -> bool: sensitive_word_list = await self.get_config_value("sensitive_word_list") for word in sensitive_word_list: if word in title: return True return False async def whether_article_title_duplicate(self, title: str) -> bool: query = f"""select article_id from crawler_meta_article where title = %s;""" duplicated_id = await self.pool.async_fetch(query=query, params=(title,)) return True if duplicated_id else False async def save_article(self, article_item: dict) -> None: """save articles into database""" query = f""" insert into crawler_meta_article (platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, score, status, unique_index, source_article_title, source_account, title_sensitivity) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s); """ await self.pool.async_save( query=query, params=( article_item.get("platform", "weixin"), article_item.get("mode"), article_item.get("category"), article_item.get("out_account_id"), article_item.get("article_index"), article_item.get("title"), article_item.get("link"), article_item.get("read_cnt", 0), article_item.get("like_cnt", 0), article_item.get("description"), article_item.get("publish_time"), article_item.get("crawler_time", int(time.time())), article_item.get("score"), article_item.get("status", 1), article_item.get("unique_index"), article_item.get("source_article_title", None), article_item.get("source_account", None), article_item.get("title_sensitivity", 0), ), ) async def save_article_v2(self, article_item: dict) -> None: """save articles into database""" new_article = CrawlerMetaArticle(**article_item) new_article_dict = new_article.model_dump() insert_template = ( """insert into crawler_meta_article ({columns}) values ({values});""" ) insert_data = {k: v for k, v in new_article_dict.items() if v is not None} columns = ", ".join(insert_data.keys()) values = ", ".join([f"%s" for i in range(len(insert_data))]) query = insert_template.format(columns=columns, values=values) await self.pool.async_save( query=query, params=tuple(list(insert_data.values())), ) async def save_video(self, video_item: dict) -> str: pass async def save_item_to_database(self, media_type: str, item: dict): """deal function""" match media_type: case "video": await self.save_video(item) case "article": log_data = { "title": item["title"], "platform": item["platform"], "mode": item["mode"], "source": item["category"], } await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "start crawler article", "code": 1001, } ) # 判断文章标题是否已经存在 if await self.whether_article_title_duplicate(log_data["title"]): await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "duplicate article title", "code": 1002, } ) return # 判断标题是否敏感 if await self.whether_title_sensitive(item["title"]): await self.log_client.log( contents={ "task": "save_article", "data": log_data, "message": "title_sensitive", "code": 1003, } ) item["title_sensitive"] = 1 # save article await self.save_article_v2(item) case _: raise Exception("Unknown media type")