from __future__ import annotations import time import asyncio import datetime import traceback from typing import List from pandas import DataFrame from tqdm.asyncio import tqdm from applications.api import task_apollo, feishu_robot from applications.api import auto_create_crawler_task from applications.api import auto_bind_crawler_task_to_generate_task from applications.config import cold_start_category_map, input_source_map from applications.utils import get_titles_from_produce_plan from applications.tasks.cold_start_tasks.article_pool import ( ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy ) class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def get_article_from_meta_table( self, platform: str, crawl_method: str, strategy: str, category: str | None ) -> DataFrame: """ @param platform: 文章抓取平台 @param crawl_method: 文章抓取模式 @param strategy: 供给策略 """ match platform: case "weixin": article_list = await self.get_weixin_cold_start_articles( crawl_method, strategy, category ) case "toutiao": article_list = await self.get_toutiao_cold_start_articles( crawl_method, strategy, category ) case _: raise ValueError("Invalid platform") return DataFrame( article_list, columns=[ "article_id", "title", "link", "llm_sensitivity", "score", "category_by_ai", ], ) async def filter_published_titles(self, plan_id): """ 过滤已添加至aigc中的标题 """ published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id) if not published_title_tuple: return 0 update_query = """ update crawler_meta_article set status = %s where title in %s and status = %s; """ changed_rows = await self.pool.async_save( query=update_query, params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS), ) return changed_rows async def insert_crawler_plan_into_database( self, crawler_plan_id, crawler_plan_name, create_timestamp ): query = """ insert into article_crawler_plan (crawler_plan_id, name, create_timestamp) values (%s, %s, %s) """ try: await self.pool.async_save( query=query, params=(crawler_plan_id, crawler_plan_name, create_timestamp), ) except Exception as e: await feishu_robot.bot( title="品类冷启任务,记录抓取计划id失败", detail={ "error": str(e), "error_msg": traceback.format_exc(), "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name, }, ) async def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ query = """ update crawler_meta_article set status = %s where article_id in %s and status = %s; """ affect_rows = await self.pool.async_save( query=query, params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS), ) return affect_rows async def create_crawler_plan_and_bind_to_produce_plan( self, strategy: str, crawl_method: str, category: str, platform: str, url_list: List[str], plan_id: str, ): # create_crawler_plan crawler_plan_response = await auto_create_crawler_task( plan_id=None, plan_name=f"冷启动-{strategy}-{category}-{datetime.date.today().__str__()}-{len(url_list)}", plan_tag="品类冷启动", platform=platform, url_list=url_list, ) # save to db create_timestamp = int(time.time()) * 1000 crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] await self.insert_crawler_plan_into_database( crawler_plan_id, crawler_plan_name, create_timestamp ) # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": input_source_map[platform], } ] generate_plan_response = await auto_bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=plan_id ) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "绑定至生成计划成功", "data": generate_plan_response, } ) async def create_cold_start_plan( self, platform, plan_id, strategy="strategy_v1", category=None, crawl_method=None, ): # get article data_frame from meta article article_dataframe = await self.get_article_from_meta_table( platform, crawl_method, strategy, category ) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "获取文章成功", "data": {"article_length": article_dataframe.shape[0]}, } ) filter_article_df = await self.article_pool_filter( strategy, platform, article_dataframe, crawl_method, category ) match strategy: case "strategy_v1": # split article into each category category_list = await task_apollo.get_config_value(key="category_list") for ai_category in category_list: filter_category_df = filter_article_df[ filter_article_df["category_by_ai"] == ai_category ] url_list = filter_category_df["link"].values.tolist() if url_list: await self.create_crawler_plan_and_bind_to_produce_plan( strategy, crawl_method, ai_category, platform, url_list, plan_id ) # change article status article_id_list = filter_category_df["article_id"].values.tolist() await self.change_article_status_while_publishing( article_id_list=article_id_list ) case "strategy_v2": url_list = filter_article_df["link"].values.tolist() await self.create_crawler_plan_and_bind_to_produce_plan( strategy, crawl_method, category, platform, url_list, plan_id ) # change article status article_id_list = filter_article_df["article_id"].values.tolist() await self.change_article_status_while_publishing( article_id_list=article_id_list ) async def deal( self, platform: str, strategy="strategy_v1", crawl_methods=None, category_list=None, ) -> None: """execute cold start task in different strategy""" match strategy: case "strategy_v1": if not crawl_methods: crawl_methods = self.DEFAULT_CRAWLER_METHODS await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_methods": crawl_methods, "status": "success", "trace_id": self.trace_id, } ) crawl_methods_map = await task_apollo.get_config_value( key="category_cold_start_map" ) for crawl_method in crawl_methods: try: plan_id = crawl_methods_map[crawl_method] affected_rows = await self.filter_published_titles(plan_id) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "crawl_method": crawl_method, "status": "success", "trace_id": self.trace_id, "message": "通过已抓取标题修改文章状态", "data": {"affected_rows": affected_rows}, } ) await self.create_cold_start_plan( platform=platform, plan_id=plan_id, crawl_method=crawl_method, ) except Exception as e: await feishu_robot.bot( title="文章冷启动异常", detail={ "crawl_method": crawl_method, "error": str(e), "function": "deal", "traceback": traceback.format_exc(), }, ) case "strategy_v2": if not category_list: category_list = list(cold_start_category_map.keys()) for category in tqdm(category_list): try: plan_id = cold_start_category_map[category] affected_rows = await self.filter_published_titles(plan_id) await self.log_client.log( contents={ "task": "article_pool_cold_start", "platform": platform, "category": category, "status": "success", "trace_id": self.trace_id, "message": "通过已抓取标题修改文章状态", "data": {"affected_rows": affected_rows}, } ) await self.create_cold_start_plan( platform=platform, strategy=strategy, plan_id=plan_id, category=category, ) await asyncio.sleep(120) except Exception as e: await feishu_robot.bot( title="文章冷启动异常", detail={ "category": category, "strategy": strategy, "error": str(e), "function": "deal", "traceback": traceback.format_exc(), }, ) if self.cold_start_records: columns = [ feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="category", display_name="文章品类", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="cold_start_num", display_name="本次冷启数量", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="total_length", display_name="总文章剩余数量", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="filter_by_title_length", display_name="标题长度过滤", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="filter_by_sensitivity", display_name="敏感词过滤", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="filter_by_llm_sensitity", display_name="经过大模型判断敏感过滤", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="filter_by_score", display_name="经过相关性分过滤", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="read_avg_threshold", display_name="阅读均值倍数阈值", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="read_threshold", display_name="阅读量阈值", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="title_length_threshold", display_name="标题长度阈值", ), ] await feishu_robot.bot( title="长文文章路冷启动发布", detail={ "columns": columns, "rows": self.cold_start_records, }, table=True, mention=False, ) case _: raise Exception(f"error strategy {strategy}")