""" @author: luojunhui 品类文章发布到aigc系统的冷启层 """ import datetime import json import time import traceback from pandas import DataFrame from applications import log, bot from applications.api import aidit_api from config import apolloConfig apollo = apolloConfig() class CategoryColdStartTask(object): """ 品类冷启动发布任务 """ PUBLISHED_STATUS = 2 INIT_STATUS = 1 BAD_STATUS = 0 def __init__(self, db_client): """ :param db_client: """ self.db_client = db_client self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map")) self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold")) self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000) self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3) self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15) log( task="category_publish_task", function="__init__", message="数据库初始化连接完成,apollo配置获取完成", data={ "category": self.category_map, "threshold": self.category_cold_start_threshold } ) def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp): """ 插入抓取计划到数据库中 :param create_timestamp: :param crawler_plan_id: :param crawler_plan_name: :return: """ insert_sql = f""" INSERT INTO article_crawler_plan (crawler_plan_id, name, create_timestamp) values (%s, %s, %s) """ try: self.db_client.update( sql=insert_sql, params=(crawler_plan_id, crawler_plan_name, create_timestamp) ) except Exception as e: bot( title="品类冷启任务,记录抓取计划id失败", detail={ "error": str(e), "error_msg": traceback.format_exc(), "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name } ) def get_articles_from_meta_table(self, category): """ 从长文 meta 库中获取冷启文章 :return: """ sql = f""" SELECT article_id, out_account_id, article_index, title, link, read_cnt, status FROM crawler_meta_article WHERE category = "{category}"; """ article_list = self.db_client.select(sql) log( task="category_publish_task", function="get_articles_from_meta_table", message="获取品类文章总数", data={ "total_articles": len(article_list), "category": category } ) article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status']) return article_df def change_article_status(self, category): """ 已经发布到生成计划中的 id, :return: """ plan_id = self.category_map.get(category) if plan_id: article_list = aidit_api.get_generated_article_list(plan_id) title_list = [i[1] for i in article_list] if title_list: # update update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE title in %s and status = %s; """ self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS) ) else: return def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE article_id in %s and status = %s; """ affect_rows = self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS) ) if affect_rows != len(article_id_list): bot( title="品类冷启任务中,出现更新状文章状态失败异常", detail={ "affected_rows": affect_rows, "task_rows": len(article_id_list) } ) def publish_filter_articles(self, category, articles_df): """ 过滤文章 :param category: :param articles_df: :return: """ articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean') articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read'] total_length = articles_df.shape[0] # 第0层过滤已经发布的文章 zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS] zero_level_funnel_length = zero_level_funnel_df.shape[0] # 第一层漏斗通过阅读均值倍数过滤 first_level_funnel_df = zero_level_funnel_df[zero_level_funnel_df['read_times'] >= self.READ_TIMES_THRESHOLD] first_level_funnel_length = first_level_funnel_df.shape[0] # 第二层漏斗通过阅读量过滤 second_level_funnel_df = first_level_funnel_df[ first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD ] second_level_funnel_length = second_level_funnel_df.shape[0] # 第三层漏斗通过标题长度过滤 third_level_funnel_df = second_level_funnel_df[ second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH ] third_level_funnel_length = third_level_funnel_df.shape[0] # 最后一层通过敏感词过滤 filter_df = third_level_funnel_df[ (~third_level_funnel_df['title'].str.contains('农历')) & (~third_level_funnel_df['title'].str.contains('太极')) & (~third_level_funnel_df['title'].str.contains('节')) & (~third_level_funnel_df['title'].str.contains('早上好')) & (~third_level_funnel_df['title'].str.contains('赖清德')) & (~third_level_funnel_df['title'].str.contains('普京')) & (~third_level_funnel_df['title'].str.contains('俄')) & (~third_level_funnel_df['title'].str.contains('南海')) & (~third_level_funnel_df['title'].str.contains('台海')) & (~third_level_funnel_df['title'].str.contains('解放军')) & (~third_level_funnel_df['title'].str.contains('蔡英文')) & (~third_level_funnel_df['title'].str.contains('中国')) ] final_length = filter_df.shape[0] url_list = filter_df['link'].values.tolist() log( task="category_publish_task", function="publish_filter_articles", message="过滤后文章总数", data={ "total_articles": final_length, "category": category } ) bot( title="冷启任务发布通知", detail={ "总文章数量": total_length, "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length, zero_level_funnel_length), "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(zero_level_funnel_length - first_level_funnel_length, first_level_funnel_length), "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length), "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length), "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(third_level_funnel_length - final_length, final_length), "品类": category, "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD, "阅读量阈值": self.READ_THRESHOLD, "标题长度阈值": self.LIMIT_TITLE_LENGTH }, mention=False ) if url_list: # create_crawler_plan crawler_plan_response = aidit_api.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)), plan_tag="品类冷启动", url_list=url_list ) log( task="category_publish_task", function="publish_filter_articles", message="成功创建抓取计划", data=crawler_plan_response ) # save to db create_timestamp = int(time.time()) * 1000 crawler_plan_id = crawler_plan_response['data']['id'] crawler_plan_name = crawler_plan_response['data']['name'] self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp) # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_response['data']['id'], "inputSourceLabel": crawler_plan_response['data']['name'], "inputSourceModal": 3, "inputSourceChannel": 5 } ] generate_plan_response = aidit_api.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=self.category_map[category] ) log( task="category_publish_task", function="publish_filter_articles", message="成功绑定到生成计划", data=generate_plan_response ) # change article status article_id_list = filter_df['article_id'].values.tolist() self.change_article_status_while_publishing(article_id_list=article_id_list) def do_job(self, category_list=None): """ 执行任务 :return: """ if not category_list: category_list = self.category_map.keys() log( task="category_publish_task", function="do_job", message="开始自动创建品类文章抓取计划", data={ "category_list": list(category_list) } ) for category in category_list: try: category_df = self.get_articles_from_meta_table(category=category) self.publish_filter_articles( category=category, articles_df=category_df ) except Exception as e: bot( title="品类冷启任务报错", detail={ "category": category, "error": str(e), "function": "do_job" } )