| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 | """@author: luojunhui品类文章发布到aigc系统的冷启层"""import datetimeimport jsonimport timeimport tracebackfrom pandas import DataFramefrom applications import aiditApi, log, botfrom config import apolloConfigapollo = apolloConfig()DAILY_CRAWLER_MAX_NUM = 1000SIMILARITY_MIN_SCORE = 0.4TITLE_NOT_SENSITIVE = 0class CategoryColdStartTask(object):    """    品类冷启动发布任务    """    PUBLISHED_STATUS = 2    INIT_STATUS = 1    BAD_STATUS = 0    def __init__(self, db_client):        """        :param db_client:        """        self.db_client = db_client        self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))        self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))        self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)        self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)        self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)        self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)        log(            task="category_publish_task",            function="__init__",            message="数据库初始化连接完成,apollo配置获取完成",            data={                "category": self.category_map,                "threshold": self.category_cold_start_threshold            }        )    def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):        """        插入抓取计划到数据库中        :param create_timestamp:        :param crawler_plan_id:        :param crawler_plan_name:        :return:        """        insert_sql = f"""            INSERT INTO article_crawler_plan            (crawler_plan_id, name, create_timestamp)            values             (%s, %s, %s)        """        try:            self.db_client.update(                sql=insert_sql,                params=(crawler_plan_id, crawler_plan_name, create_timestamp)            )        except Exception as e:            bot(                title="品类冷启任务,记录抓取计划id失败",                detail={                    "error": str(e),                    "error_msg": traceback.format_exc(),                    "crawler_plan_id": crawler_plan_id,                    "crawler_plan_name": crawler_plan_name                }            )    def get_articles_from_meta_table(self, category, article_source):        """        从长文 meta 库中获取冷启文章        :return:        """        sql = f"""        SELECT             article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score        FROM            crawler_meta_article        WHERE             category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}        ORDER BY score DESC;        """        article_list = self.db_client.select(sql)        log(            task="category_publish_task",            function="get_articles_from_meta_table",            message="获取品类文章总数",            data={                "total_articles": len(article_list),                "category": category            }        )        article_df = DataFrame(article_list,                               columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',                                        'llm_sensitivity', 'score'])        return article_df    def filter_each_category(self, category):        """        过滤单个生成计划类别的文章        """        plan_id = self.category_map.get(category)        if plan_id:            article_list = aiditApi.get_generated_article_list(plan_id)            title_list = [i[1] for i in article_list]            if title_list:                # update                update_sql = f"""                UPDATE                     crawler_meta_article                SET                    status = %s                WHERE                    title in %s and status = %s;                """                affected_rows = self.db_client.update(                    sql=update_sql,                    params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)                )                print(affected_rows)        else:            print("未获取到计划id")            return    def published_articles_title_filter(self):        """        已经发布到生成计划中的 id,        :return:        """        category_list = list(self.category_map.keys())        for category in category_list:            try:                self.filter_each_category(category)            except Exception as e:                log(                    task="category_publish_task",                    function="published_articles_title_filter",                    message="过滤已发布文章失败",                    data={                        "error": str(e),                        "error_msg": traceback.format_exc(),                        "category": category                    }                )    def change_article_status_while_publishing(self, article_id_list):        """        :param: article_id_list: 文章的唯一 id        :return:        """        update_sql = f"""                    UPDATE                         crawler_meta_article                    SET                        status = %s                    WHERE                        article_id in %s and status = %s;                    """        affect_rows = self.db_client.update(            sql=update_sql,            params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)        )        if affect_rows != len(article_id_list):            bot(                title="品类冷启任务中,出现更新状文章状态失败异常",                detail={                    "affected_rows": affect_rows,                    "task_rows": len(article_id_list)                }            )    def filter_weixin_articles(self, articles_df, category):        """        微信抓取文章过滤漏斗        """        articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')        articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']        total_length = articles_df.shape[0]        # 第0层过滤已经发布的文章        filter_df = articles_df[articles_df['status'] == self.INIT_STATUS]        length_level0 = filter_df.shape[0]        # 第一层漏斗通过阅读均值倍数过滤        filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD]        length_level1 = filter_df.shape[0]        # 第二层漏斗通过阅读量过滤        filter_df = filter_df[            filter_df['read_cnt'] >= self.READ_THRESHOLD            ]        length_level2 = filter_df.shape[0]        # 第三层漏斗通过标题长度过滤        filter_df = filter_df[            (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)            & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX)            ]        length_level3 = filter_df.shape[0]        # 第四层通过敏感词过滤        filter_df = filter_df[            (~filter_df['title'].str.contains('农历'))            & (~filter_df['title'].str.contains('太极'))            & (~filter_df['title'].str.contains('节'))            & (~filter_df['title'].str.contains('早上好'))            & (~filter_df['title'].str.contains('赖清德'))            & (~filter_df['title'].str.contains('普京'))            & (~filter_df['title'].str.contains('俄'))            & (~filter_df['title'].str.contains('南海'))            & (~filter_df['title'].str.contains('台海'))            & (~filter_df['title'].str.contains('解放军'))            & (~filter_df['title'].str.contains('蔡英文'))            & (~filter_df['title'].str.contains('中国'))            ]        length_level4 = filter_df.shape[0]        # 第五层通过LLM敏感度过滤        filter_df = filter_df[            ~(filter_df['llm_sensitivity'] > 0)        ]        length_level5 = filter_df.shape[0]        # 第六层通过相关性分数过滤        filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]        length_level6 = filter_df.shape[0]        log(            task="category_publish_task",            function="publish_filter_articles",            message="过滤后文章总数",            data={                "total_articles": length_level5,                "category": category            }        )        bot(            title="冷启任务发布通知",            detail={                "总文章数量": total_length,                "通过已经发布状态过滤": "过滤数量: {}    剩余数量: {}".format(                    total_length - length_level0, length_level0),                "通过阅读均值倍数过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level0 - length_level1, length_level1),                "通过阅读量过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level1 - length_level2, length_level2),                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level2 - length_level3, length_level3),                "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level3 - length_level4, length_level4),                "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level4 - length_level5, length_level5                ),                "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(                    length_level5 - length_level6, length_level6                ),                "品类": category,                "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,                "阅读量阈值": self.READ_THRESHOLD,                "标题长度阈值": self.LIMIT_TITLE_LENGTH            },            mention=False        )        return filter_df[:DAILY_CRAWLER_MAX_NUM]    def filter_toutiao_articles(self, articles_df, category):        """        头条文章过滤漏斗        """        total_length = articles_df.shape[0]        # 第一层漏斗通过状态过滤        zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]        zero_level_funnel_length = zero_level_funnel_df.shape[0]        bot(            title="账号冷启动---头条推荐流发布",            detail={                "category": category,                "总文章数量": total_length,                "通过已经发布状态过滤": "过滤数量: {}    剩余数量: {}".format(total_length - zero_level_funnel_length,                                                                              zero_level_funnel_length),            },            mention=False        )        return zero_level_funnel_df    def publish_filter_articles(self, category, articles_df, article_source):        """        过滤文章        :param category: 文章品类        :param articles_df: 该品类下的文章data_frame        :param article_source: 文章来源        :return:        """        match article_source:            case "weixin":                filtered_articles_df = self.filter_weixin_articles(articles_df, category)                input_source_channel = 5            case "toutiao":                filtered_articles_df = self.filter_toutiao_articles(articles_df, category)                input_source_channel = 6            case _:                return        url_list = filtered_articles_df['link'].values.tolist()        if url_list:            # create_crawler_plan            crawler_plan_response = aiditApi.auto_create_crawler_task(                plan_id=None,                plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),                plan_tag="品类冷启动",                article_source=article_source,                url_list=url_list            )            log(                task="category_publish_task",                function="publish_filter_articles",                message="成功创建抓取计划",                data=crawler_plan_response            )            # save to db            create_timestamp = int(time.time()) * 1000            crawler_plan_id = crawler_plan_response['data']['id']            crawler_plan_name = crawler_plan_response['data']['name']            self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)            # auto bind to generate plan            new_crawler_task_list = [                {                    "contentType": 1,                    "inputSourceType": 2,                    "inputSourceSubType": None,                    "fieldName": None,                    "inputSourceValue": crawler_plan_id,                    "inputSourceLabel": crawler_plan_name,                    "inputSourceModal": 3,                    "inputSourceChannel": input_source_channel                }            ]            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(                crawler_task_list=new_crawler_task_list,                generate_task_id=self.category_map[category]            )            log(                task="category_publish_task",                function="publish_filter_articles",                message="成功绑定到生成计划",                data=generate_plan_response            )            # change article status            article_id_list = filtered_articles_df['article_id'].values.tolist()            self.change_article_status_while_publishing(article_id_list=article_id_list)    def do_job(self, article_source, category_list=None):        """        执行任务        :return:        """        if not category_list:            category_list = self.category_map.keys()        log(            task="category_publish_task",            function="do_job",            message="开始自动创建品类文章抓取计划",            data={                "category_list": list(category_list)            }        )        for category in category_list:            try:                # 已发布标题去重                self.published_articles_title_filter()                category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)                self.publish_filter_articles(                    category=category,                    articles_df=category_df,                    article_source=article_source                )            except Exception as e:                bot(                    title="品类冷启任务报错",                    detail={                        "category": category,                        "error": str(e),                        "function": "do_job",                        "traceback": traceback.format_exc()                    }                )
 |