123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- """
- @author: luojunhui
- 品类文章发布到aigc系统的冷启层
- """
- import datetime
- import json
- import time
- import traceback
- from pandas import DataFrame
- from applications import aiditApi, log, bot, llm_sensitivity
- from config import apolloConfig
- apollo = apolloConfig()
- DAILY_CRAWLER_MAX_NUM = 1000
- SIMILARITY_MIN_SCORE = 0.4
- TITLE_NOT_SENSITIVE = 0
- class CategoryColdStartTask(object):
- """
- 品类冷启动发布任务
- """
- PUBLISHED_STATUS = 2
- INIT_STATUS = 1
- BAD_STATUS = 0
- def __init__(self, db_client):
- """
- :param db_client:
- """
- self.db_client = db_client
- self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
- self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
- self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
- self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
- self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
- self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
- log(
- task="category_publish_task",
- function="__init__",
- message="数据库初始化连接完成,apollo配置获取完成",
- data={
- "category": self.category_map,
- "threshold": self.category_cold_start_threshold
- }
- )
- def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):
- """
- 插入抓取计划到数据库中
- :param create_timestamp:
- :param crawler_plan_id:
- :param crawler_plan_name:
- :return:
- """
- insert_sql = f"""
- INSERT INTO article_crawler_plan
- (crawler_plan_id, name, create_timestamp)
- values
- (%s, %s, %s)
- """
- try:
- self.db_client.update(
- sql=insert_sql,
- params=(crawler_plan_id, crawler_plan_name, create_timestamp)
- )
- except Exception as e:
- bot(
- title="品类冷启任务,记录抓取计划id失败",
- detail={
- "error": str(e),
- "error_msg": traceback.format_exc(),
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name
- }
- )
- def get_articles_from_meta_table(self, category, article_source):
- """
- 从长文 meta 库中获取冷启文章
- :return:
- """
- sql = f"""
- SELECT
- article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
- FROM
- crawler_meta_article
- WHERE
- category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}
- ORDER BY score DESC;
- """
- article_list = self.db_client.select(sql)
- log(
- task="category_publish_task",
- function="get_articles_from_meta_table",
- message="获取品类文章总数",
- data={
- "total_articles": len(article_list),
- "category": category
- }
- )
- article_df = DataFrame(article_list,
- columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
- 'llm_sensitivity', 'score'])
- return article_df
- def filter_each_category(self, category):
- """
- 过滤单个生成计划类别的文章
- """
- plan_id = self.category_map.get(category)
- if plan_id:
- article_list = aiditApi.get_generated_article_list(plan_id)
- title_list = [i[1] for i in article_list]
- if title_list:
- # update
- update_sql = f"""
- UPDATE
- crawler_meta_article
- SET
- status = %s
- WHERE
- title in %s and status = %s;
- """
- affected_rows = self.db_client.update(
- sql=update_sql,
- params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
- )
- print(affected_rows)
- else:
- print("未获取到计划id")
- return
- def published_articles_title_filter(self):
- """
- 已经发布到生成计划中的 id,
- :return:
- """
- category_list = list(self.category_map.keys())
- for category in category_list:
- try:
- self.filter_each_category(category)
- except Exception as e:
- log(
- task="category_publish_task",
- function="published_articles_title_filter",
- message="过滤已发布文章失败",
- data={
- "error": str(e),
- "error_msg": traceback.format_exc(),
- "category": category
- }
- )
- def change_article_status_while_publishing(self, article_id_list):
- """
- :param: article_id_list: 文章的唯一 id
- :return:
- """
- update_sql = f"""
- UPDATE
- crawler_meta_article
- SET
- status = %s
- WHERE
- article_id in %s and status = %s;
- """
- affect_rows = self.db_client.update(
- sql=update_sql,
- params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
- )
- if affect_rows != len(article_id_list):
- bot(
- title="品类冷启任务中,出现更新状文章状态失败异常",
- detail={
- "affected_rows": affect_rows,
- "task_rows": len(article_id_list)
- }
- )
- def filter_weixin_articles(self, articles_df, category):
- """
- 微信抓取文章过滤漏斗
- """
- articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
- articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
- total_length = articles_df.shape[0]
- # 第0层过滤已经发布的文章
- filter_df = articles_df[articles_df['status'] == self.INIT_STATUS]
- length_level0 = filter_df.shape[0]
- # 第一层漏斗通过阅读均值倍数过滤
- filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD]
- length_level1 = filter_df.shape[0]
- # 第二层漏斗通过阅读量过滤
- filter_df = filter_df[
- filter_df['read_cnt'] >= self.READ_THRESHOLD
- ]
- length_level2 = filter_df.shape[0]
- # 第三层漏斗通过标题长度过滤
- filter_df = filter_df[
- (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
- & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
- ]
- length_level3 = filter_df.shape[0]
- # 第四层通过敏感词过滤
- filter_df = filter_df[
- (~filter_df['title'].str.contains('农历'))
- & (~filter_df['title'].str.contains('太极'))
- & (~filter_df['title'].str.contains('节'))
- & (~filter_df['title'].str.contains('早上好'))
- & (~filter_df['title'].str.contains('赖清德'))
- & (~filter_df['title'].str.contains('普京'))
- & (~filter_df['title'].str.contains('俄'))
- & (~filter_df['title'].str.contains('南海'))
- & (~filter_df['title'].str.contains('台海'))
- & (~filter_df['title'].str.contains('解放军'))
- & (~filter_df['title'].str.contains('蔡英文'))
- & (~filter_df['title'].str.contains('中国'))
- ]
- length_level4 = filter_df.shape[0]
- # 第五层通过LLM敏感度过滤
- filter_df = filter_df[
- ~(filter_df['llm_sensitivity'] > 0)
- ]
- length_level5 = filter_df.shape[0]
- # 第六层通过相关性分数过滤
- filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
- length_level6 = filter_df.shape[0]
- log(
- task="category_publish_task",
- function="publish_filter_articles",
- message="过滤后文章总数",
- data={
- "total_articles": length_level5,
- "category": category
- }
- )
- bot(
- title="冷启任务发布通知",
- detail={
- "总文章数量": total_length,
- "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(
- total_length - length_level0, length_level0),
- "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level0 - length_level1, length_level1),
- "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level1 - length_level2, length_level2),
- "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level2 - length_level3, length_level3),
- "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level3 - length_level4, length_level4),
- "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level4 - length_level5, length_level5
- ),
- "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level5 - length_level6, length_level6
- ),
- "品类": category,
- "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
- "阅读量阈值": self.READ_THRESHOLD,
- "标题长度阈值": self.LIMIT_TITLE_LENGTH
- },
- mention=False
- )
- return filter_df[:DAILY_CRAWLER_MAX_NUM]
- def filter_toutiao_articles(self, articles_df, category):
- """
- 头条文章过滤漏斗
- """
- total_length = articles_df.shape[0]
- # 第一层漏斗通过状态过滤
- zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
- zero_level_funnel_length = zero_level_funnel_df.shape[0]
- bot(
- title="账号冷启动---头条推荐流发布",
- detail={
- "category": category,
- "总文章数量": total_length,
- "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length,
- zero_level_funnel_length),
- },
- mention=False
- )
- return zero_level_funnel_df
- def update_article_sensitive_status(self, article_id, status):
- """
- 更新文章敏感状态
- :return:
- """
- update_sql = f"""
- update crawler_meta_article
- set llm_sensitivity = %s
- where article_id = %s;
- """
- self.db_client.update(sql=update_sql, params=(status, article_id))
- def publish_filter_articles(self, category, articles_df, article_source):
- """
- 过滤文章
- :param category: 文章品类
- :param articles_df: 该品类下的文章data_frame
- :param article_source: 文章来源
- :return:
- """
- match article_source:
- case "weixin":
- filtered_articles_df = self.filter_weixin_articles(articles_df, category)
- input_source_channel = 5
- case "toutiao":
- filtered_articles_df = self.filter_toutiao_articles(articles_df, category)
- input_source_channel = 6
- case _:
- return
- success_titles = filtered_articles_df['title'].values.tolist()
- article_id_list = filtered_articles_df['article_id'].values.tolist()
- if success_titles:
- try:
- sensitive_results = llm_sensitivity.check_titles(success_titles)
- for article_id, sensitive_result in zip(article_id_list, sensitive_results):
- self.update_article_sensitive_status(
- article_id=article_id,
- status=sensitive_result['hit_rule']
- )
- if sensitive_result['hit_rule'] > TITLE_NOT_SENSITIVE:
- filtered_articles_df = filtered_articles_df[filtered_articles_df['article_id'] != article_id]
- except Exception as e:
- print("failed to update sensitive status: {}".format(e))
- url_list = filtered_articles_df['link'].values.tolist()
- if url_list:
- # create_crawler_plan
- crawler_plan_response = aiditApi.auto_create_crawler_task(
- plan_id=None,
- plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
- plan_tag="品类冷启动",
- article_source=article_source,
- url_list=url_list
- )
- log(
- task="category_publish_task",
- function="publish_filter_articles",
- message="成功创建抓取计划",
- data=crawler_plan_response
- )
- # save to db
- create_timestamp = int(time.time()) * 1000
- crawler_plan_id = crawler_plan_response['data']['id']
- crawler_plan_name = crawler_plan_response['data']['name']
- self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
- # auto bind to generate plan
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": input_source_channel
- }
- ]
- generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list,
- generate_task_id=self.category_map[category]
- )
- log(
- task="category_publish_task",
- function="publish_filter_articles",
- message="成功绑定到生成计划",
- data=generate_plan_response
- )
- # change article status
- article_id_list = filtered_articles_df['article_id'].values.tolist()
- self.change_article_status_while_publishing(article_id_list=article_id_list)
- def do_job(self, article_source, category_list=None):
- """
- 执行任务
- :return:
- """
- if not category_list:
- category_list = self.category_map.keys()
- log(
- task="category_publish_task",
- function="do_job",
- message="开始自动创建品类文章抓取计划",
- data={
- "category_list": list(category_list)
- }
- )
- for category in category_list:
- try:
- # 已发布标题去重
- self.published_articles_title_filter()
- category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
- self.publish_filter_articles(
- category=category,
- articles_df=category_df,
- article_source=article_source
- )
- except Exception as e:
- bot(
- title="品类冷启任务报错",
- detail={
- "category": category,
- "error": str(e),
- "function": "do_job",
- "traceback": traceback.format_exc()
- }
- )
|