소스 검색

Merge branch '2025-02-07-account-assocaition-improve' of luojunhui/LongArticlesJob into master

luojunhui 2 달 전
부모
커밋
c70fc5f40d

+ 27 - 7
account_cold_start_daily.py

@@ -4,11 +4,14 @@
 import datetime
 import traceback
 
+from argparse import ArgumentParser
+
 from applications import longArticlesMySQL, bot
 from coldStartTasks.crawler.weixinCategoryCrawler import weixinCategory
 from coldStartTasks.publish.publishCategoryArticles import CategoryColdStartTask
+from coldStartTasks.filter.title_similarity_task import ColdStartTitleSimilarityTask
 
-DEFAULT_CATEGORY_LIST = ['1030-手动挑号']
+DEFAULT_CATEGORY_LIST = ['1030-手动挑号', 'account_association']
 
 
 class AccountColdStartDailyTask(object):
@@ -39,14 +42,20 @@ class AccountColdStartDailyTask(object):
             )
             return False
 
-    def crawler_task(self, category_list):
+    def crawler_task(self, category_list, date_str):
         """
         :return:
         """
         # 初始化category抓取类
         try:
             weixin_category_crawler = weixinCategory(db_client=self.db_client)
-            weixin_category_crawler.deal(category_list=category_list)
+            weixin_category_crawler.deal(category_list=category_list, date_str=date_str)
+
+            # 抓取完成之后,给抓取到的标题进行相似度打分
+            cold_start_title_similarity_task = ColdStartTitleSimilarityTask()
+            cold_start_title_similarity_task.init_database()
+            cold_start_title_similarity_task.run()
+
             bot(
                 title="账号冷启动任务,抓取完成",
                 detail={
@@ -95,10 +104,9 @@ class AccountColdStartDailyTask(object):
             )
 
 
-def main(category_list=None, article_source=None):
+def main(date_str, category_list=None, article_source=None):
     """
     main job, use crontab to do job daily
-    todo: 1. 开放一个输入可以输入指定品类  2. 增加对指定账号的抓取&&发布
     :return:
     """
     if not category_list:
@@ -108,17 +116,29 @@ def main(category_list=None, article_source=None):
     task = AccountColdStartDailyTask()
     if task.init_db():
         if article_source == 'weixin':
-            task.crawler_task(category_list=category_list)
+            task.crawler_task(category_list=category_list, date_str=date_str)
 
         task.publish_task(category_list=category_list, article_source=article_source)
 
 
 if __name__ == '__main__':
+    parser = ArgumentParser()
+    parser.add_argument("--run_date", help="--run_date format: %Y-%m-%d")
+    args = parser.parse_args()
+
+    if args.run_date:
+        run_date = args.run_date
+    else:
+        run_date = datetime.date.today().isoformat()
+
     # 执行微信抓取发布
-    main()
+    main(date_str=run_date)
 
     # 执行头条发布
     main(
+        date_str=run_date,
         category_list=['history', 'tech', 'finance', 'entertainment'],
         article_source='toutiao'
     )
+
+

+ 2 - 1
applications/api/__init__.py

@@ -1,4 +1,5 @@
 """
 @author: luojunhui
 """
-from .moon_shot_api import generate_mini_program_title
+from .moon_shot_api import generate_mini_program_title
+from .nlp_api import similarity_between_title_list

+ 26 - 0
applications/api/nlp_api.py

@@ -0,0 +1,26 @@
+"""
+@author: luojunhui
+"""
+import requests
+
+
+def similarity_between_title_list(target_title_list: list[str], base_title_list: list[str]) -> list[list[float]]:
+    """
+    cal the similarity between two list of title
+    :param target_title_list: target title_list
+    :param base_title_list: base title_list
+    :return: list of similarity
+    """
+    url = 'http://61.48.133.26:6060/nlp'
+    body = {
+        "data": {
+            "text_list_a": target_title_list,
+            "text_list_b": base_title_list
+        },
+        "function": "similarities_cross",
+        "use_cache": False
+    }
+    response_json = requests.post(url, json=body, timeout=120).json()
+    score_array = response_json['score_list_list']
+    return score_array
+

+ 96 - 40
coldStartTasks/crawler/weixinCategoryCrawler.py

@@ -2,19 +2,46 @@
 @author: luojunhui
 抓取全局品类文章
 """
-
+import json
 import time
 
 from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
 from applications import WeixinSpider, Functions, llm_sensitivity, log
 from coldStartTasks.filter import article_crawler_duplicate_filter
+from config import apolloConfig
 
 # 常量
 ACCOUNT_GOOD_STATUS = 1
+
+# 账号是否每日抓取
+ACCOUNT_DAILY_SCRAPE = 1
+ACCOUNT_NOT_DAILY_SCRAPE = 0
+
+# 默认值
 DEFAULT_VIEW_COUNT = 0
 DEFAULT_LIKE_COUNT = 0
 DEFAULT_ARTICLE_STATUS = 1
-DEFAULT_TIMESTAMP = 1704038400
+DEFAULT_TIMESTAMP = 1717171200
+
+# 标题sensitivity
+TITLE_SENSITIVE = 1
+TITLE_NOT_SENSITIVE = 0
+
+config = apolloConfig()
+sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
+
+
+def whether_title_sensitive(title: str) -> bool:
+    """
+    : param title:
+    判断视频是否的标题是否包含敏感词
+    """
+    for word in sensitive_word_list:
+        if word in title:
+            return True
+    return False
 
 
 class weixinCategory(object):
@@ -36,7 +63,7 @@ class weixinCategory(object):
         sql = f"""
             select gh_id, account_source, account_name, account_category, latest_update_time
             from long_articles_accounts 
-            where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
+            where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
             """
         account_tuple = self.db_client_lam.select(sql)
         result = [
@@ -51,10 +78,25 @@ class weixinCategory(object):
         ]
         return result
 
+    def get_association_account_list(self, date_str):
+        """
+        获取账号联想的轮询账号
+        """
+        group_id = date_str[-1]
+        sql = f"""
+            select account_id, gh_id, account_name, latest_update_time
+            from long_articles_accounts
+            where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
+        """
+        account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
+        today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
+        return today_crawler_account_list
+
     def insert_data_into_db(self, gh_id, category, article_list):
         """
         将数据更新到数据库
         :return:
+
         """
         success_records = []
         for article_obj in article_list:
@@ -63,7 +105,7 @@ class weixinCategory(object):
                 try:
                     # 判断文章是否存在相同的标题
                     if article_crawler_duplicate_filter(
-                        new_article_title=obj["Title"], db_client=self.db_client_lam
+                            new_article_title=obj["Title"], db_client=self.db_client_lam
                     ):
                         log(
                             function="weixinCategory",
@@ -72,6 +114,9 @@ class weixinCategory(object):
                             data={"title": obj["Title"]}
                         )
                         continue
+
+                    # 判断标题是否包含敏感词
+                    title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE
                     show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
                     show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
                     show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
@@ -80,10 +125,10 @@ class weixinCategory(object):
                         insert into crawler_meta_article
                         (
                          platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
-                         description, publish_time, crawler_time, status, unique_index, llm_sensitivity
+                         description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
                         )
                         VALUES 
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                     """
                     self.db_client_lam.update(
                         sql=insert_sql,
@@ -102,7 +147,8 @@ class weixinCategory(object):
                             int(time.time()),
                             DEFAULT_ARTICLE_STATUS,
                             unique_idx,
-                            obj.get("llm_sensitivity", -1)
+                            obj.get("llm_sensitivity", -1),
+                            title_sensitivity
                         ),
                     )
                     success_records.append({
@@ -175,48 +221,59 @@ class weixinCategory(object):
             print("No more data")
             return []
 
-    def deal(self, category_list):
+    def crawler_each_category(self, account_list, category):
         """
+        抓取每个品类
+        :return:
+        """
+        success_records = []
+        for account in tqdm(account_list, desc="crawler_each_category"):
+            try:
+                gh_id = account['gh_id']
+                try:
+                    timestamp = int(account['latest_timestamp'].timestamp())
+                except Exception as e:
+                    timestamp = DEFAULT_TIMESTAMP
+                success_records += self.update_each_account(
+                    gh_id=gh_id,
+                    category=category,
+                    latest_time_stamp=timestamp
+                )
+                print("success")
+            except Exception as e:
+                print("fail because of {}".format(e))
+        success_titles = [x['title'] for x in success_records]
+        if success_titles:
+            try:
+                sensitive_results = llm_sensitivity.check_titles(success_titles)
+                for record, sensitive_result in zip(success_records, sensitive_results):
+                    self.update_article_sensitive_status(
+                        category=category,
+                        unique_index=record['unique_index'],
+                        status=sensitive_result['hit_rule']
+                    )
+            except Exception as e:
+                print("failed to update sensitive status: {}".format(e))
 
+    def deal(self, category_list, date_str):
+        """
         :param category_list:
+        :param date_str: YYYY-MM-DD
         :return:
         """
+        # daily 品类账号抓取
         for category in category_list:
-            success_records = []
             account_list = self.get_account_list(category)
-            for account in tqdm(account_list):
-                try:
-                    gh_id = account['gh_id']
-                    category = account['category']
-                    try:
-                        timestamp = int(account['latest_timestamp'].timestamp())
-                    except Exception as e:
-                        timestamp = DEFAULT_TIMESTAMP
-                    success_records += self.update_each_account(
-                        gh_id=gh_id,
-                        category=category,
-                        latest_time_stamp=timestamp
-                    )
-                    print("success")
-                except Exception as e:
-                    print("fail because of {}".format(e))
-            success_titles = [x['title'] for x in success_records]
-            if success_titles:
-                try:
-                    sensitive_results = llm_sensitivity.check_titles(success_titles)
-                    for record, sensitive_result in zip(success_records, sensitive_results):
-                        self.update_article_sensitive_status(
-                            category=category,
-                            unique_index=record['unique_index'],
-                            status=sensitive_result['hit_rule']
-                        )
-                except Exception as e:
-                    print("failed to update sensitive status: {}".format(e))
+            self.crawler_each_category(account_list=account_list, category=category)
+
+        # 账号联想账号轮询抓取
+        association_account_list = self.get_association_account_list(date_str)
+        self.crawler_each_category(account_list=association_account_list, category="association")
 
     def deal_accounts(self, account_list):
         """
         input account list
-        :param account_list:
+        :param account_list: 具体账号抓取,只抓一页
         :return:
         """
         account_tuple = tuple(account_list)
@@ -233,6 +290,7 @@ class weixinCategory(object):
                 try:
                     latest_timestamp = account[3].timestamp()
                 except Exception as e:
+                    print(e)
                     latest_timestamp = DEFAULT_TIMESTAMP
                 self.update_each_account(
                     gh_id=gh_id,
@@ -241,5 +299,3 @@ class weixinCategory(object):
                 )
             except Exception as e:
                 print(e)
-
-

+ 14 - 2
coldStartTasks/filter/__init__.py

@@ -12,7 +12,13 @@ def article_crawler_duplicate_filter(new_article_title, db_client) -> bool:
     select_sql = f"""
         select article_id from crawler_meta_article where title = '{new_article_title}';
     """
-    response = db_client.select(select_sql)
+    if hasattr(db_client, "fetch"):
+        response = db_client.fetch(select_sql)
+    elif hasattr(db_client, "select"):
+        response = db_client.select(select_sql)
+    else:
+        raise AttributeError("db_client must has fetch or select method")
+
     if response:
         return True
     else:
@@ -28,7 +34,13 @@ def video_crawler_duplicate_filter(new_video_title, db_client) -> bool:
     select_sql = f"""
         select article_title from publish_single_video_source where article_title = '{new_video_title}';
     """
-    response = db_client.select(select_sql)
+    if hasattr(db_client, "fetch"):
+        response = db_client.fetch(select_sql)
+    elif hasattr(db_client, "select"):
+        response = db_client.select(select_sql)
+    else:
+        raise AttributeError("db_client must has fetch or select method")
+
     if response:
         return True
     else:

+ 115 - 0
coldStartTasks/filter/title_similarity_task.py

@@ -0,0 +1,115 @@
+"""
+@author: luojunhui
+"""
+import numpy as np
+
+from pymysql.cursors import DictCursor
+
+from applications.api import similarity_between_title_list
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+
+TIMESTAMP_MS_THRESHOLD = 1732982400000
+ARTICLE_BATCH = 1000
+PERCENT_THRESHOLD = 95
+
+
+def chunks(total_list, batch_size):
+    """
+    yield batch tasks
+    """
+    for i in range(0, len(total_list), batch_size):
+        yield total_list[i:i + batch_size]
+
+
+class ColdStartTitleSimilarityTask(object):
+    """
+    冷启动文章标题相似度任务
+    """
+
+    def __init__(self):
+        self.db_client = None
+
+    def init_database(self):
+        """
+        init database
+        """
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+
+    def get_level_up_title_list(self):
+        """
+        获取晋级文章标题列表
+        status: 1 表示文章已经溯源完成
+        deleted: 0 表示文章正常
+        level = 'autoArticlePoolLevel1' 表示头条
+        """
+        sql = f"""
+        select distinct title from article_pool_promotion_source where level = 'autoArticlePoolLevel1' and status = 1 and deleted = 0;
+        """
+        mysql_response = self.db_client.fetch(query=sql)
+        title_list = [i[0] for i in mysql_response]
+        return title_list
+
+    def get_title_from_meta_base(self):
+        """
+        获取meta_base表中文章标题列表
+        status: 1 表示文章初始化状态
+        """
+        sql = f"""
+            select article_id, title from crawler_meta_article where status = 1 and score is null;
+        """
+        mysql_response = self.db_client.fetch(query=sql, cursor_type=DictCursor)
+        return mysql_response
+
+    def update_meta_article_batch(self, update_data_list: list[tuple]) -> int:
+        """
+        批量更新crawler_meta_article
+        """
+        sql = """
+            update crawler_meta_article
+            set score = case article_id
+                {}
+            end
+            where article_id in %s;
+        """
+        case_statement = []
+        article_id_list = []
+        params = []
+        for score, article_id in update_data_list:
+            case_statement.append(f"when %s then %s")
+            article_id_list.append(article_id)
+            params.extend([article_id, score])
+
+        params.append(tuple(article_id_list))
+        case_statements = "\n".join(case_statement)
+        formatted_sql = sql.format(case_statements)
+        affected_rows = self.db_client.save(formatted_sql, params)
+        return affected_rows
+
+    def run(self):
+        """
+        执行任务
+        """
+        target_article_list = self.get_title_from_meta_base()
+        if not target_article_list:
+            print("No more articles to process.")
+            return
+
+        base_title_list = self.get_level_up_title_list()
+
+        batch_task_list = chunks(target_article_list, ARTICLE_BATCH)
+
+        for batch_task in batch_task_list:
+            batch_target_title_list = [i['title'] for i in batch_task]
+            similarity_array = similarity_between_title_list(batch_target_title_list, base_title_list)
+
+            update_data_list = []
+            for index, score_list in enumerate(similarity_array):
+                sorted_score_list = sorted(score_list)
+                percent_threshold_score = np.percentile(sorted_score_list, PERCENT_THRESHOLD)
+                update_data_list.append((percent_threshold_score, batch_target_title_list[index]['article_id']))
+
+            affected_rows = self.update_meta_article_batch(update_data_list)
+            print(affected_rows)

+ 44 - 8
coldStartTasks/publish/publishCategoryArticles.py

@@ -13,6 +13,9 @@ from applications import aiditApi, log, bot
 from config import apolloConfig
 
 apollo = apolloConfig()
+DAILY_CRAWLER_MAX_NUM = 1000
+SIMILARITY_MIN_SCORE = 0.4
+TITLE_NOT_SENSITIVE = 0
 
 
 class CategoryColdStartTask(object):
@@ -79,14 +82,16 @@ class CategoryColdStartTask(object):
         """
         从长文 meta 库中获取冷启文章
         :return:
+
         """
         sql = f"""
         SELECT 
-            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity
+            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
         FROM
             crawler_meta_article
         WHERE 
-            category = "{category}" and platform = "{article_source}";
+            category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}
+        ORDER BY score DESC;
         """
         article_list = self.db_client.select(sql)
         log(
@@ -99,13 +104,13 @@ class CategoryColdStartTask(object):
             }
         )
         article_df = DataFrame(article_list,
-                               columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status', 'llm_sensitivity'])
+                               columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
+                                        'llm_sensitivity', 'score'])
         return article_df
 
-    def change_article_status(self, category):
+    def filter_each_category(self, category):
         """
-        已经发布到生成计划中的 id,
-        :return:
+        过滤单个生成计划类别的文章
         """
         plan_id = self.category_map.get(category)
         if plan_id:
@@ -130,6 +135,27 @@ class CategoryColdStartTask(object):
             print("未获取到计划id")
             return
 
+    def published_articles_title_filter(self):
+        """
+        已经发布到生成计划中的 id,
+        :return:
+        """
+        category_list = list(self.category_map.keys())
+        for category in category_list:
+            try:
+                self.filter_each_category(category)
+            except Exception as e:
+                log(
+                    task="category_publish_task",
+                    function="published_articles_title_filter",
+                    message="过滤已发布文章失败",
+                    data={
+                        "error": str(e),
+                        "error_msg": traceback.format_exc(),
+                        "category": category
+                    }
+                )
+
     def change_article_status_while_publishing(self, article_id_list):
         """
 
@@ -206,6 +232,11 @@ class CategoryColdStartTask(object):
             ~(filter_df['llm_sensitivity'] > 0)
         ]
         length_level5 = filter_df.shape[0]
+
+        # 第六层通过相关性分数过滤
+        filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
+        length_level6 = filter_df.shape[0]
+
         log(
             task="category_publish_task",
             function="publish_filter_articles",
@@ -232,6 +263,9 @@ class CategoryColdStartTask(object):
                 "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(
                     length_level4 - length_level5, length_level5
                 ),
+                "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(
+                    length_level5 - length_level6, length_level6
+                ),
                 "品类": category,
                 "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
                 "阅读量阈值": self.READ_THRESHOLD,
@@ -239,7 +273,7 @@ class CategoryColdStartTask(object):
             },
             mention=False
         )
-        return filter_df
+        return filter_df[:DAILY_CRAWLER_MAX_NUM]
 
     def filter_toutiao_articles(self, articles_df, category):
         """
@@ -347,6 +381,9 @@ class CategoryColdStartTask(object):
         )
         for category in category_list:
             try:
+                # 已发布标题去重
+                self.published_articles_title_filter()
+
                 category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
                 self.publish_filter_articles(
                     category=category,
@@ -363,4 +400,3 @@ class CategoryColdStartTask(object):
                         "traceback": traceback.format_exc()
                     }
                 )
-

+ 10 - 0
title_similarity_score_task.py

@@ -0,0 +1,10 @@
+"""
+@author: luojunhui
+"""
+from coldStartTasks.filter.title_similarity_task import ColdStartTitleSimilarityTask
+
+
+if __name__ == '__main__':
+    task = ColdStartTitleSimilarityTask()
+    task.init_database()
+    task.run()