""" @author: luojunhui 品类文章发布到aigc系统的冷启层 """ import datetime import json from pandas import DataFrame from applications import aiditApi, log, bot, DeNetMysql from config import apolloConfig apollo = apolloConfig() class CategoryColdStartTask(object): """ 品类冷启动发布任务 """ PUBLISHED_STATUS = 2 INIT_STATUS = 1 BAD_STATUS = 0 def __init__(self, db_client): """ :param db_client: """ self.db_client = db_client self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map")) self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold")) self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000) self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3) self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15) log( task="category_publish_task", function="__init__", message="数据库初始化连接完成,apollo配置获取完成", status="success", data={ "category": self.category_map, "threshold": self.category_cold_start_threshold } ) def get_articles_from_meta_table(self, category): """ 从长文 meta 库中获取冷启文章 :return: """ sql = f""" SELECT article_id, out_account_id, article_index, title, link, read_cnt FROM crawler_meta_article WHERE category = "{category}" and status = '{self.INIT_STATUS}'; """ article_list = self.db_client.select(sql) log( task="category_publish_task", function="get_articles_from_meta_table", message="获取品类文章总数", status="success", data={ "total_articles": len(article_list), "category": category } ) article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt']) return article_df def change_article_status(self, category): """ 已经发布到生成计划中的 id, :return: """ plan_id = self.category_map.get(category) if plan_id: sql = f""" SELECT account.wx_gh, content.title, content.content_link, content.view_count, content.like_count, from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间, from_unixtime(content.publish_timestamp / 1000) AS 发布时间 FROM crawler_plan_result_rel cprr JOIN crawler_plan plan ON cprr.plan_id = plan.id JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id JOIN crawler_account account ON content.channel_account_id = account.channel_account_id WHERE plan_id IN ( SELECT input_source_value FROM produce_plan_input_source WHERE plan_id = '{plan_id}' ); """ article_list = self.db_client.select(sql) title_list = [i[1] for i in article_list] if title_list: # update update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE title in %s and status = %s; """ self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS) ) else: return def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE article_id in %s and status = %s; """ affect_rows = self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS) ) if affect_rows != len(article_id_list): bot( title="品类冷启任务中,出现更新状文章状态失败异常", detail={ "affected_rows": affect_rows, "task_rows": len(article_id_list) } ) def publish_filter_articles(self, category, articles_df): """ 过滤文章 :param category: :param articles_df: :return: """ articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean') articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read'] filter_df = articles_df[ (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD) & (articles_df['read_cnt'] >= self.READ_THRESHOLD) & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH) & (~articles_df['title'].str.contains('农历')) & (~articles_df['title'].str.contains('太极')) & (~articles_df['title'].str.contains('节')) & (~articles_df['title'].str.contains('早上好')) & (~articles_df['title'].str.contains('赖清德')) & (~articles_df['title'].str.contains('普京')) & (~articles_df['title'].str.contains('俄')) & (~articles_df['title'].str.contains('南海')) & (~articles_df['title'].str.contains('台海')) & (~articles_df['title'].str.contains('解放军')) & (~articles_df['title'].str.contains('蔡英文')) & (~articles_df['title'].str.contains('中国')) ] url_list = filter_df['link'].values.tolist() log( task="category_publish_task", function="publish_filter_articles", message="过滤后文章总数", status="success", data={ "total_articles": len(url_list), "category": category } ) if url_list: crawler_plan_response = aiditApi.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)), plan_tag="品类冷启动", url_list=url_list ) log( task="category_publish_task", function="publish_filter_articles", message="成功创建抓取计划", status="success", data=crawler_plan_response ) # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_response['data']['id'], "inputSourceLabel": crawler_plan_response['data']['name'], "inputSourceModal": 3, "inputSourceChannel": 5 } ] generate_plan_response = aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=self.category_map[category] ) log( task="category_publish_task", function="publish_filter_articles", message="成功绑定到生成计划", status="success", data=generate_plan_response ) article_id_list = filter_df['article_id'].values.tolist() self.change_article_status_while_publishing(article_id_list=article_id_list) def do_job(self): """ 执行任务 :return: """ category_list = self.category_map.keys() log( task="category_publish_task", function="do_job", message="开始自动创建品类文章抓取计划", status="success", data={ "category_list": list(category_list) } ) for category in category_list: try: category_df = self.get_articles_from_meta_table(category=category) self.publish_filter_articles( category=category, articles_df=category_df ) except Exception as e: bot( title="品类冷启任务报错", detail={ "category": category, "error": str(e), "function": "do_job" } )