|  | @@ -0,0 +1,256 @@
 | 
	
		
			
				|  |  | +"""
 | 
	
		
			
				|  |  | +@author: luojunhui
 | 
	
		
			
				|  |  | +品类文章发布到aigc系统的冷启层
 | 
	
		
			
				|  |  | +"""
 | 
	
		
			
				|  |  | +import datetime
 | 
	
		
			
				|  |  | +import json
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +from pandas import DataFrame
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +from applications import aiditApi, log, bot, DeNetMysql
 | 
	
		
			
				|  |  | +from config import apolloConfig
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +apollo = apolloConfig()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class CategoryColdStartTask(object):
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    品类冷启动发布任务
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    PUBLISHED_STATUS = 2
 | 
	
		
			
				|  |  | +    INIT_STATUS = 1
 | 
	
		
			
				|  |  | +    BAD_STATUS = 0
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __init__(self, db_client):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        :param db_client:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        self.db_client = db_client
 | 
	
		
			
				|  |  | +        self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
 | 
	
		
			
				|  |  | +        self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
 | 
	
		
			
				|  |  | +        self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
 | 
	
		
			
				|  |  | +        self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
 | 
	
		
			
				|  |  | +        self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
 | 
	
		
			
				|  |  | +        log(
 | 
	
		
			
				|  |  | +            task="category_publish_task",
 | 
	
		
			
				|  |  | +            function="__init__",
 | 
	
		
			
				|  |  | +            message="数据库初始化连接完成,apollo配置获取完成",
 | 
	
		
			
				|  |  | +            status="success",
 | 
	
		
			
				|  |  | +            data={
 | 
	
		
			
				|  |  | +                "category": self.category_map,
 | 
	
		
			
				|  |  | +                "threshold": self.category_cold_start_threshold
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def get_articles_from_meta_table(self, category):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        从长文 meta 库中获取冷启文章
 | 
	
		
			
				|  |  | +        :return:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        sql = f"""
 | 
	
		
			
				|  |  | +        SELECT 
 | 
	
		
			
				|  |  | +            article_id, out_account_id, article_index, title, link, read_cnt
 | 
	
		
			
				|  |  | +        FROM
 | 
	
		
			
				|  |  | +            crawler_meta_article
 | 
	
		
			
				|  |  | +        WHERE 
 | 
	
		
			
				|  |  | +            category = "{category}" and status = '{self.INIT_STATUS}';
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        article_list = self.db_client.select(sql)
 | 
	
		
			
				|  |  | +        log(
 | 
	
		
			
				|  |  | +            task="category_publish_task",
 | 
	
		
			
				|  |  | +            function="get_articles_from_meta_table",
 | 
	
		
			
				|  |  | +            message="获取品类文章总数",
 | 
	
		
			
				|  |  | +            status="success",
 | 
	
		
			
				|  |  | +            data={
 | 
	
		
			
				|  |  | +                "total_articles": len(article_list),
 | 
	
		
			
				|  |  | +                "category": category
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt'])
 | 
	
		
			
				|  |  | +        return article_df
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def change_article_status(self, category):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        已经发布到生成计划中的 id,
 | 
	
		
			
				|  |  | +        :return:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        plan_id = self.category_map.get(category)
 | 
	
		
			
				|  |  | +        if plan_id:
 | 
	
		
			
				|  |  | +            sql = f"""
 | 
	
		
			
				|  |  | +            SELECT 
 | 
	
		
			
				|  |  | +                account.wx_gh,
 | 
	
		
			
				|  |  | +                content.title,
 | 
	
		
			
				|  |  | +                content.content_link,
 | 
	
		
			
				|  |  | +                content.view_count,
 | 
	
		
			
				|  |  | +                content.like_count,
 | 
	
		
			
				|  |  | +                from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
 | 
	
		
			
				|  |  | +                from_unixtime(content.publish_timestamp / 1000) AS 发布时间
 | 
	
		
			
				|  |  | +            FROM crawler_plan_result_rel cprr
 | 
	
		
			
				|  |  | +            JOIN crawler_plan plan ON cprr.plan_id = plan.id
 | 
	
		
			
				|  |  | +            JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
 | 
	
		
			
				|  |  | +            JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
 | 
	
		
			
				|  |  | +            WHERE plan_id IN (
 | 
	
		
			
				|  |  | +                SELECT
 | 
	
		
			
				|  |  | +                    input_source_value
 | 
	
		
			
				|  |  | +                FROM
 | 
	
		
			
				|  |  | +                    produce_plan_input_source
 | 
	
		
			
				|  |  | +                WHERE plan_id = '{plan_id}'
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            """
 | 
	
		
			
				|  |  | +            article_list = self.db_client.select(sql)
 | 
	
		
			
				|  |  | +            title_list = [i[1] for i in article_list]
 | 
	
		
			
				|  |  | +            if title_list:
 | 
	
		
			
				|  |  | +                # update
 | 
	
		
			
				|  |  | +                update_sql = f"""
 | 
	
		
			
				|  |  | +                UPDATE 
 | 
	
		
			
				|  |  | +                    crawler_meta_article
 | 
	
		
			
				|  |  | +                SET
 | 
	
		
			
				|  |  | +                    status = %s
 | 
	
		
			
				|  |  | +                WHERE
 | 
	
		
			
				|  |  | +                    title in %s and status = %s;
 | 
	
		
			
				|  |  | +                """
 | 
	
		
			
				|  |  | +                self.db_client.update(
 | 
	
		
			
				|  |  | +                    sql=update_sql,
 | 
	
		
			
				|  |  | +                    params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def change_article_status_while_publishing(self, article_id_list):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        :param: article_id_list: 文章的唯一 id
 | 
	
		
			
				|  |  | +        :return:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        update_sql = f"""
 | 
	
		
			
				|  |  | +                    UPDATE 
 | 
	
		
			
				|  |  | +                        crawler_meta_article
 | 
	
		
			
				|  |  | +                    SET
 | 
	
		
			
				|  |  | +                        status = %s
 | 
	
		
			
				|  |  | +                    WHERE
 | 
	
		
			
				|  |  | +                        article_id in %s and status = %s;
 | 
	
		
			
				|  |  | +                    """
 | 
	
		
			
				|  |  | +        affect_rows = self.db_client.update(
 | 
	
		
			
				|  |  | +            sql=update_sql,
 | 
	
		
			
				|  |  | +            params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        if affect_rows != len(article_id_list):
 | 
	
		
			
				|  |  | +            bot(
 | 
	
		
			
				|  |  | +                title="品类冷启任务中,出现更新状文章状态失败异常",
 | 
	
		
			
				|  |  | +                detail={
 | 
	
		
			
				|  |  | +                    "affected_rows": affect_rows,
 | 
	
		
			
				|  |  | +                    "task_rows": len(article_id_list)
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def publish_filter_articles(self, category, articles_df):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        过滤文章
 | 
	
		
			
				|  |  | +        :param category:
 | 
	
		
			
				|  |  | +        :param articles_df:
 | 
	
		
			
				|  |  | +        :return:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
 | 
	
		
			
				|  |  | +        articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
 | 
	
		
			
				|  |  | +        filter_df = articles_df[
 | 
	
		
			
				|  |  | +            (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
 | 
	
		
			
				|  |  | +            & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
 | 
	
		
			
				|  |  | +            & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH)
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('农历'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('太极'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('节'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('早上好'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('赖清德'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('普京'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('俄'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('南海'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('台海'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('解放军'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('蔡英文'))
 | 
	
		
			
				|  |  | +            & (~articles_df['title'].str.contains('中国'))
 | 
	
		
			
				|  |  | +            ]
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        url_list = filter_df['link'].values.tolist()
 | 
	
		
			
				|  |  | +        log(
 | 
	
		
			
				|  |  | +            task="category_publish_task",
 | 
	
		
			
				|  |  | +            function="publish_filter_articles",
 | 
	
		
			
				|  |  | +            message="过滤后文章总数",
 | 
	
		
			
				|  |  | +            status="success",
 | 
	
		
			
				|  |  | +            data={
 | 
	
		
			
				|  |  | +                "total_articles": len(url_list),
 | 
	
		
			
				|  |  | +                "category": category
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        if url_list:
 | 
	
		
			
				|  |  | +            crawler_plan_response = aiditApi.auto_create_crawler_task(
 | 
	
		
			
				|  |  | +                plan_id=None,
 | 
	
		
			
				|  |  | +                plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
 | 
	
		
			
				|  |  | +                plan_tag="品类冷启动",
 | 
	
		
			
				|  |  | +                url_list=url_list
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            log(
 | 
	
		
			
				|  |  | +                task="category_publish_task",
 | 
	
		
			
				|  |  | +                function="publish_filter_articles",
 | 
	
		
			
				|  |  | +                message="成功创建抓取计划",
 | 
	
		
			
				|  |  | +                status="success",
 | 
	
		
			
				|  |  | +                data=crawler_plan_response
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            # auto bind to generate plan
 | 
	
		
			
				|  |  | +            new_crawler_task_list = [
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    "contentType": 1,
 | 
	
		
			
				|  |  | +                    "inputSourceType": 2,
 | 
	
		
			
				|  |  | +                    "inputSourceSubType": None,
 | 
	
		
			
				|  |  | +                    "fieldName": None,
 | 
	
		
			
				|  |  | +                    "inputSourceValue": crawler_plan_response['data']['id'],
 | 
	
		
			
				|  |  | +                    "inputSourceLabel": crawler_plan_response['data']['name'],
 | 
	
		
			
				|  |  | +                    "inputSourceModal": 3,
 | 
	
		
			
				|  |  | +                    "inputSourceChannel": 5
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            ]
 | 
	
		
			
				|  |  | +            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
 | 
	
		
			
				|  |  | +                crawler_task_list=new_crawler_task_list,
 | 
	
		
			
				|  |  | +                generate_task_id=self.category_map[category]
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            log(
 | 
	
		
			
				|  |  | +                task="category_publish_task",
 | 
	
		
			
				|  |  | +                function="publish_filter_articles",
 | 
	
		
			
				|  |  | +                message="成功绑定到生成计划",
 | 
	
		
			
				|  |  | +                status="success",
 | 
	
		
			
				|  |  | +                data=generate_plan_response
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            article_id_list = filter_df['article_id'].values.tolist()
 | 
	
		
			
				|  |  | +            self.change_article_status_while_publishing(article_id_list=article_id_list)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def do_job(self):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        执行任务
 | 
	
		
			
				|  |  | +        :return:
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        category_list = self.category_map.keys()
 | 
	
		
			
				|  |  | +        log(
 | 
	
		
			
				|  |  | +            task="category_publish_task",
 | 
	
		
			
				|  |  | +            function="do_job",
 | 
	
		
			
				|  |  | +            message="开始自动创建品类文章抓取计划",
 | 
	
		
			
				|  |  | +            status="success",
 | 
	
		
			
				|  |  | +            data={
 | 
	
		
			
				|  |  | +                "category_list": list(category_list)
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        for category in category_list:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                category_df = self.get_articles_from_meta_table(category=category)
 | 
	
		
			
				|  |  | +                self.publish_filter_articles(
 | 
	
		
			
				|  |  | +                    category=category,
 | 
	
		
			
				|  |  | +                    articles_df=category_df
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                bot(
 | 
	
		
			
				|  |  | +                    title="品类冷启任务报错",
 | 
	
		
			
				|  |  | +                    detail={
 | 
	
		
			
				|  |  | +                        "category": category,
 | 
	
		
			
				|  |  | +                        "error": str(e),
 | 
	
		
			
				|  |  | +                        "function": "do_job"
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                )
 |