from __future__ import annotations import time import datetime import traceback from typing import List, Dict from pandas import DataFrame from applications.api import task_apollo, feishu_robot from applications.api import auto_create_crawler_task from applications.api import auto_bind_crawler_task_to_generate_task from applications.utils import get_titles_from_produce_plan class ArticlePoolColdStartConst: # article DAILY_ARTICLE_NUM = 1000 SIMILARITY_SCORE_THRESHOLD = 0.5 TITLE_NOT_SENSITIVE = 0 TITLE_SENSITIVE = 1 PUBLISHED_STATUS = 2 INIT_STATUS = 1 BAD_STATUS = 0 READ_TIMES_THRESHOLD = 1.3 READ_THRESHOLD = 5000 TITLE_LENGTH_LIMIT = 15 TITLE_LENGTH_MAX = 50 DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"] class ArticlePoolColdStart(ArticlePoolColdStartConst): def __init__(self, pool, log_client, trace_id): self.pool = pool self.log_client = log_client self.trace_id = trace_id async def get_article_from_meta_table( self, platform: str, crawl_method: str, strategy: str ) -> DataFrame: """ @param platform: 文章抓取平台 @param crawl_method: 文章抓取模式 @param strategy: 供给策略 """ match platform: case "weixin": article_list = await self.get_weixin_cold_start_articles( crawl_method, strategy ) case "toutiao": article_list = await self.get_toutiao_cold_start_articles( crawl_method, strategy ) case _: raise ValueError("Invalid platform") return DataFrame( article_list, columns=[ "article_id", "title", "link", "llm_sensitivity", "score", "category_by_ai", ], ) async def get_weixin_cold_start_articles( self, crawl_method: str, strategy: str ) -> List[Dict]: match strategy: case "strategy_v1": query = f""" select article_id, title, link, llm_sensitivity, score, category_by_ai from crawler_meta_article t1 join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position where category = %s and platform = %s and title_sensitivity = %s and t1.status = %s and t1.read_cnt / t2.read_avg >= %s and t1.read_cnt >= %s and t2.status = %s order by score desc; """ article_list = await self.pool.async_fetch( query=query, params=( crawl_method, "weixin", self.TITLE_NOT_SENSITIVE, self.INIT_STATUS, self.READ_TIMES_THRESHOLD, self.READ_THRESHOLD, self.INIT_STATUS, ), ) return article_list case _: raise ValueError("Invalid strategy") async def get_toutiao_cold_start_articles( self, crawl_method: str, strategy: str ) -> List[Dict]: match strategy: case "strategy_v1": query = f""" select article_id, title, link, llm_sensitivity, score, category_by_ai from crawler_meta_article where category = %s and platform = %s and status = %s; """ article_list = await self.pool.async_fetch( query=query, params=(crawl_method, "toutiao", self.INIT_STATUS) ) return article_list case _: raise ValueError("Invalid strategy") async def filter_published_titles(self, plan_id): """ 过滤已添加至aigc中的标题 """ published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id) update_query = f""" update crawler_meta_article set status = %s where title in %s and status = %s; """ changed_rows = await self.pool.async_save( query=update_query, params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS), ) return changed_rows async def filter_weixin_articles(self, dataframe, crawl_method): """微信过滤漏斗""" total_length: int = dataframe.shape[0] # 通过标题长度过滤 filter_df = dataframe[ (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX) & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT) ] length_level1 = filter_df.shape[0] # 通过敏感词过滤 sensitive_keywords = [ "农历", "太极", "节", "早上好", "赖清德", "普京", "俄", "南海", "台海", "解放军", "蔡英文", "中国", ] # 构建正则表达式,使用 | 连接表示“或”的关系 pattern = "|".join(sensitive_keywords) filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)] # 获取过滤后的行数 length_level2 = filter_df.shape[0] filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)] length_level3 = filter_df.shape[0] # 第4层通过相关性分数过滤 filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD] length_level4 = filter_df.shape[0] await feishu_robot.bot( title="冷启任务发布通知", detail={ "总文章数量": total_length, "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format( total_length - length_level1, length_level1 ), "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format( length_level1 - length_level2, length_level2 ), "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format( length_level2 - length_level3, length_level3 ), "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format( length_level3 - length_level4, length_level4 ), "渠道": crawl_method, "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD, "阅读量阈值": self.READ_THRESHOLD, "标题长度阈值": self.TITLE_LENGTH_LIMIT, }, mention=False, ) return filter_df[: self.DAILY_ARTICLE_NUM] async def filter_toutiao_articles(self, dataframe, crawl_method): total_length = dataframe.shape[0] filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD] await feishu_robot.bot( title="冷启动创建抓取计划", detail={ "渠道": crawl_method, "总文章数量": total_length, "相关性分数过滤剩余": filter_df.shape[0], }, mention=False, ) return filter_df[: self.DAILY_ARTICLE_NUM] async def insert_crawler_plan_into_database( self, crawler_plan_id, crawler_plan_name, create_timestamp ): query = f""" insert into article_crawler_plan (crawler_plan_id, name, create_timestamp) values (%s, %s, %s) """ try: await self.pool.async_save( query=query, params=(crawler_plan_id, crawler_plan_name, create_timestamp), ) except Exception as e: await feishu_robot.bot( title="品类冷启任务,记录抓取计划id失败", detail={ "error": str(e), "error_msg": traceback.format_exc(), "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name, }, ) async def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ query = f""" update crawler_meta_article set status = %s where article_id in %s and status = %s; """ affect_rows = await self.pool.async_save( query=query, params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS), ) return affect_rows async def create_cold_start_plan( self, platform, crawl_method, plan_id, strategy="strategy_v1" ): article_dataframe = await self.get_article_from_meta_table( platform, crawl_method, strategy ) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "获取文章成功", "data": {"article_length": article_dataframe.shape[0]}, } ) match platform: case "weixin": input_source_channel = 5 filter_article_df = await self.filter_weixin_articles( article_dataframe, crawl_method ) case "toutiao": input_source_channel = 6 filter_article_df = await self.filter_toutiao_articles( article_dataframe, crawl_method ) case _: raise ValueError("Invalid platform") # split article into each category category_list = await task_apollo.get_config_value(key="category_list") for ai_category in category_list: filter_category_df = filter_article_df[ filter_article_df["category_by_ai"] == ai_category ] url_list = filter_category_df["link"].values.tolist() if url_list: # create_crawler_plan crawler_plan_response = await auto_create_crawler_task( plan_id=None, plan_name="自动绑定-{}-{}-{}--{}".format( crawl_method, ai_category, datetime.date.today().__str__(), len(url_list), ), plan_tag="品类冷启动", platform=platform, url_list=url_list, ) # save to db create_timestamp = int(time.time()) * 1000 crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] await self.insert_crawler_plan_into_database( crawler_plan_id, crawler_plan_name, create_timestamp ) # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": input_source_channel, } ] generate_plan_response = await auto_bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=plan_id ) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "绑定至生成计划成功", "data": generate_plan_response, } ) # change article status article_id_list = filter_category_df["article_id"].values.tolist() await self.change_article_status_while_publishing( article_id_list=article_id_list ) async def deal(self, platform: str, crawl_methods: List[str]) -> None: if not crawl_methods: crawl_methods = self.DEFAULT_CRAWLER_METHODS await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_methods": crawl_methods, "status": "success", "trace_id": self.trace_id, } ) crawl_methods_map = await task_apollo.get_config_value( key="category_cold_start_map" ) for crawl_method in crawl_methods: try: plan_id = crawl_methods_map[crawl_method] affected_rows = await self.filter_published_titles(plan_id) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "通过已抓取标题修改文章状态", "data": {"affected_rows": affected_rows}, } ) await self.create_cold_start_plan(platform, crawl_method, plan_id) except Exception as e: await feishu_robot.bot( title="文章冷启动异常", detail={ "crawl_method": crawl_method, "error": str(e), "function": "deal", "traceback": traceback.format_exc(), }, )