""" @author: luojunhui 品类文章发布到aigc系统的冷启层 """ import datetime import json import time import traceback from pandas import DataFrame from applications import aiditApi, log, bot, llm_sensitivity from config import apolloConfig apollo = apolloConfig() DAILY_CRAWLER_MAX_NUM = 1000 SIMILARITY_MIN_SCORE = 0.4 TITLE_NOT_SENSITIVE = 0 class CategoryColdStartTask(object): """ 品类冷启动发布任务 """ PUBLISHED_STATUS = 2 INIT_STATUS = 1 BAD_STATUS = 0 def __init__(self, db_client): """ :param db_client: """ self.db_client = db_client self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map")) self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold")) self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000) self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3) self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15) self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50) log( task="category_publish_task", function="__init__", message="数据库初始化连接完成,apollo配置获取完成", data={ "category": self.category_map, "threshold": self.category_cold_start_threshold } ) def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp): """ 插入抓取计划到数据库中 :param create_timestamp: :param crawler_plan_id: :param crawler_plan_name: :return: """ insert_sql = f""" INSERT INTO article_crawler_plan (crawler_plan_id, name, create_timestamp) values (%s, %s, %s) """ try: self.db_client.update( sql=insert_sql, params=(crawler_plan_id, crawler_plan_name, create_timestamp) ) except Exception as e: bot( title="品类冷启任务,记录抓取计划id失败", detail={ "error": str(e), "error_msg": traceback.format_exc(), "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name } ) def get_articles_from_meta_table(self, category, article_source): """ 从长文 meta 库中获取冷启文章 :return: """ sql = f""" SELECT article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score FROM crawler_meta_article WHERE category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE} ORDER BY score DESC; """ article_list = self.db_client.select(sql) log( task="category_publish_task", function="get_articles_from_meta_table", message="获取品类文章总数", data={ "total_articles": len(article_list), "category": category } ) article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status', 'llm_sensitivity', 'score']) return article_df def filter_each_category(self, category): """ 过滤单个生成计划类别的文章 """ plan_id = self.category_map.get(category) if plan_id: article_list = aiditApi.get_generated_article_list(plan_id) title_list = [i[1] for i in article_list] if title_list: # update update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE title in %s and status = %s; """ affected_rows = self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS) ) print(affected_rows) else: print("未获取到计划id") return def published_articles_title_filter(self): """ 已经发布到生成计划中的 id, :return: """ category_list = list(self.category_map.keys()) for category in category_list: try: self.filter_each_category(category) except Exception as e: log( task="category_publish_task", function="published_articles_title_filter", message="过滤已发布文章失败", data={ "error": str(e), "error_msg": traceback.format_exc(), "category": category } ) def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE article_id in %s and status = %s; """ affect_rows = self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS) ) if affect_rows != len(article_id_list): bot( title="品类冷启任务中,出现更新状文章状态失败异常", detail={ "affected_rows": affect_rows, "task_rows": len(article_id_list) } ) def filter_weixin_articles(self, articles_df, category): """ 微信抓取文章过滤漏斗 """ articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean') articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read'] total_length = articles_df.shape[0] # 第0层过滤已经发布的文章 filter_df = articles_df[articles_df['status'] == self.INIT_STATUS] length_level0 = filter_df.shape[0] # 第一层漏斗通过阅读均值倍数过滤 filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD] length_level1 = filter_df.shape[0] # 第二层漏斗通过阅读量过滤 filter_df = filter_df[ filter_df['read_cnt'] >= self.READ_THRESHOLD ] length_level2 = filter_df.shape[0] # 第三层漏斗通过标题长度过滤 filter_df = filter_df[ (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH) & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX) ] length_level3 = filter_df.shape[0] # 第四层通过敏感词过滤 filter_df = filter_df[ (~filter_df['title'].str.contains('农历')) & (~filter_df['title'].str.contains('太极')) & (~filter_df['title'].str.contains('节')) & (~filter_df['title'].str.contains('早上好')) & (~filter_df['title'].str.contains('赖清德')) & (~filter_df['title'].str.contains('普京')) & (~filter_df['title'].str.contains('俄')) & (~filter_df['title'].str.contains('南海')) & (~filter_df['title'].str.contains('台海')) & (~filter_df['title'].str.contains('解放军')) & (~filter_df['title'].str.contains('蔡英文')) & (~filter_df['title'].str.contains('中国')) ] length_level4 = filter_df.shape[0] # 第五层通过LLM敏感度过滤 filter_df = filter_df[ ~(filter_df['llm_sensitivity'] > 0) ] length_level5 = filter_df.shape[0] # 第六层通过相关性分数过滤 filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE] length_level6 = filter_df.shape[0] log( task="category_publish_task", function="publish_filter_articles", message="过滤后文章总数", data={ "total_articles": length_level5, "category": category } ) bot( title="冷启任务发布通知", detail={ "总文章数量": total_length, "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format( total_length - length_level0, length_level0), "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format( length_level0 - length_level1, length_level1), "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format( length_level1 - length_level2, length_level2), "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format( length_level2 - length_level3, length_level3), "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format( length_level3 - length_level4, length_level4), "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format( length_level4 - length_level5, length_level5 ), "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format( length_level5 - length_level6, length_level6 ), "品类": category, "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD, "阅读量阈值": self.READ_THRESHOLD, "标题长度阈值": self.LIMIT_TITLE_LENGTH }, mention=False ) return filter_df[:DAILY_CRAWLER_MAX_NUM] def filter_toutiao_articles(self, articles_df, category): """ 头条文章过滤漏斗 """ total_length = articles_df.shape[0] # 第一层漏斗通过状态过滤 zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS] zero_level_funnel_length = zero_level_funnel_df.shape[0] bot( title="账号冷启动---头条推荐流发布", detail={ "category": category, "总文章数量": total_length, "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length, zero_level_funnel_length), }, mention=False ) return zero_level_funnel_df def update_article_sensitive_status(self, article_id, status): """ 更新文章敏感状态 :return: """ update_sql = f""" update crawler_meta_article set llm_sensitivity = %s where article_id = %s; """ self.db_client.update(sql=update_sql, params=(status, article_id)) def publish_filter_articles(self, category, articles_df, article_source): """ 过滤文章 :param category: 文章品类 :param articles_df: 该品类下的文章data_frame :param article_source: 文章来源 :return: """ match article_source: case "weixin": filtered_articles_df = self.filter_weixin_articles(articles_df, category) input_source_channel = 5 case "toutiao": filtered_articles_df = self.filter_toutiao_articles(articles_df, category) input_source_channel = 6 case _: return success_titles = filtered_articles_df['title'].values.tolist() article_id_list = filtered_articles_df['article_id'].values.tolist() if success_titles: try: sensitive_results = llm_sensitivity.check_titles(success_titles) for article_id, sensitive_result in zip(article_id_list, sensitive_results): self.update_article_sensitive_status( article_id=article_id, status=sensitive_result['hit_rule'] ) if sensitive_result['hit_rule'] > TITLE_NOT_SENSITIVE: filtered_articles_df = filtered_articles_df[filtered_articles_df['article_id'] != article_id] except Exception as e: print("failed to update sensitive status: {}".format(e)) url_list = filtered_articles_df['link'].values.tolist() if url_list: # create_crawler_plan crawler_plan_response = aiditApi.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)), plan_tag="品类冷启动", article_source=article_source, url_list=url_list ) log( task="category_publish_task", function="publish_filter_articles", message="成功创建抓取计划", data=crawler_plan_response ) # save to db create_timestamp = int(time.time()) * 1000 crawler_plan_id = crawler_plan_response['data']['id'] crawler_plan_name = crawler_plan_response['data']['name'] self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp) # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": input_source_channel } ] generate_plan_response = aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=self.category_map[category] ) log( task="category_publish_task", function="publish_filter_articles", message="成功绑定到生成计划", data=generate_plan_response ) # change article status article_id_list = filtered_articles_df['article_id'].values.tolist() self.change_article_status_while_publishing(article_id_list=article_id_list) def do_job(self, article_source, category_list=None): """ 执行任务 :return: """ if not category_list: category_list = self.category_map.keys() log( task="category_publish_task", function="do_job", message="开始自动创建品类文章抓取计划", data={ "category_list": list(category_list) } ) for category in category_list: try: # 已发布标题去重 self.published_articles_title_filter() category_df = self.get_articles_from_meta_table(category=category, article_source=article_source) self.publish_filter_articles( category=category, articles_df=category_df, article_source=article_source ) except Exception as e: bot( title="品类冷启任务报错", detail={ "category": category, "error": str(e), "function": "do_job", "traceback": traceback.format_exc() } )