Bläddra i källkod

新增热点事件

luojunhui 1 vecka sedan
förälder
incheckning
f75b41bda8

+ 1 - 1
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 8
+workers = 2
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "warning"  # 日志级别

+ 29 - 0
applications/tasks/cold_start_tasks/article_pool/article_pool_cold_start_strategy.py

@@ -4,6 +4,7 @@ from typing import List, Dict
 from applications.tasks.cold_start_tasks.article_pool.article_pool_cold_start_const import (
     ArticlePoolColdStartConst,
 )
+from applications.tasks.llm_tasks import HotKeysGenerate
 
 
 class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
@@ -75,6 +76,34 @@ class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
                 )
                 return article_list
 
+            case "strategy_v3":
+                # find hot_point articles
+                hot_key_generate = HotKeysGenerate(self.pool, self.log_client, self.trace_id)
+                hot_keys = await hot_key_generate.deal()
+                if not hot_keys:
+                    return []
+                else:
+                    print(hot_keys)
+                    query = """
+                        select article_id, title, link, llm_sensitivity, score, category_by_ai
+                        from crawler_meta_article
+                        where platform = %s
+                          and status = %s
+                          and article_id in %s
+                        order by score desc;
+                    """
+                    print(tuple(hot_keys))
+                    data_list = await self.pool.async_fetch(
+                        query=query,
+                        params=(
+                            "weixin",
+                            self.INIT_STATUS,
+                            tuple(hot_keys)
+                        ),
+                    )
+                    print(data_list)
+                    return data_list
+
             case _:
                 raise ValueError("Invalid strategy")
 

+ 10 - 0
applications/tasks/cold_start_tasks/article_pool/article_pool_filter_strategy.py

@@ -15,6 +15,9 @@ class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
         self.cold_start_records = []
 
     async def filter_weixin_articles(self, strategy, dataframe, crawl_method, category):
+        if strategy == "strategy_v3":
+            return dataframe
+
         """微信过滤漏斗"""
         total_length: int = dataframe.shape[0]
 
@@ -92,6 +95,13 @@ class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
                         "title_length_threshold": self.TITLE_LENGTH_LIMIT,
                     }
                 )
+            case "strategy_v3":
+                self.cold_start_records.append(
+                    {
+                        "category": category,
+                        "cold_start_num": min(daily_article_num, len(filter_df)),
+                    }
+                )
 
             case _:
                 raise ValueError("Invalid strategy")

+ 24 - 0
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -175,6 +175,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         article_dataframe = await self.get_article_from_meta_table(
             platform, crawl_method, strategy, category
         )
+        print(article_dataframe)
 
         await self.log_client.log(
             contents={
@@ -228,6 +229,21 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     article_id_list=article_id_list
                 )
 
+            case "strategy_v3":
+                url_list = filter_article_df["link"].values.tolist()
+                print("url_list", url_list)
+                await self.create_crawler_plan_and_bind_to_produce_plan(
+                    strategy, crawl_method, category, platform, url_list, plan_id
+                )
+                print("抓取计划创建成功")
+                # change article status
+                article_id_list = filter_article_df["article_id"].values.tolist()
+                await self.change_article_status_while_publishing(
+                    article_id_list=article_id_list
+                )
+                print("文章状态修改成功")
+
+
     async def deal(
         self,
         platform: str,
@@ -388,5 +404,13 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                         mention=False,
                     )
 
+            case "strategy_v3":
+                plan_id = "20250703081329508785665"
+                await self.create_cold_start_plan(
+                    platform=platform,
+                    strategy=strategy,
+                    plan_id=plan_id,
+                )
+
             case _:
                 raise Exception(f"error strategy {strategy}")

+ 3 - 0
applications/tasks/llm_tasks/__init__.py

@@ -1,9 +1,12 @@
 from .process_title import TitleRewrite
 from .process_title import ArticlePoolCategoryGeneration
 from .candidate_account_process import CandidateAccountQualityScoreRecognizer
+from .hot_keys_generate import HotKeysGenerate
+
 
 __all__ = [
     "TitleRewrite",
     "CandidateAccountQualityScoreRecognizer",
     "ArticlePoolCategoryGeneration",
+    "HotKeysGenerate",
 ]

+ 57 - 0
applications/tasks/llm_tasks/hot_keys_generate.py

@@ -0,0 +1,57 @@
+from applications.api import AsyncElasticSearchClient
+from applications.api import fetch_deepseek_completion
+
+
+class HotKeysGenerate:
+    def __init__(self, pool, log_client, trace_id):
+        self.elastic_client = AsyncElasticSearchClient()
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    @staticmethod
+    async def generate_prompt(formated_titles):
+        """
+        生成热键的prompt
+        """
+        prompt = f"""
+请你根据以下的热点时事,帮我生成一些热搜词组,我需要用这些词组来搜索相关的文章。
+生成的每个词组不能太短,需要能够覆盖热点时事的主要内容。如果遇到相似的热点时事,需要合并为一个词组。
+如果遇到敏感话题,直接过滤掉即可
+## 输入
+{formated_titles}
+## 输出
+输出要求是 JSON 格式,返回一个数组,数组中的每个元素都是一个字符串,字符串是一个热搜词组。
+参考结构:{{
+    "hot_keys": ["key1", "key2", "key3", ...]
+}}
+        """
+        return prompt
+
+    async def get_hot_titles(self):
+        query = """
+            SELECT title FROM hot_point_titles WHERE useful = 2
+                AND create_time > DATE_SUB(CURDATE(), INTERVAL 3 DAY)
+            ;
+        """
+        response = await self.pool.async_fetch(query=query)
+        hot_titles = [item["title"] for item in response if '习近平' not in item['title']]
+        return "\n".join(hot_titles)
+
+    async def deal(self):
+        article_id_set = set()
+        hot_titles = await self.get_hot_titles()
+        prompt = await self.generate_prompt(hot_titles)
+        completion = fetch_deepseek_completion(model="default", prompt=prompt, output_type="json")
+        hot_keys = completion["hot_keys"]
+        if not hot_keys:
+            return list(article_id_set)
+
+        for key in hot_keys:
+            response = await self.elastic_client.search(
+                search_keys=key
+            )
+            for item in response[:5]:
+                article_id_set.add(item["article_id"])
+
+        return list(article_id_set)

+ 6 - 0
applications/tasks/task_handler.py

@@ -19,6 +19,7 @@ from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticles
 from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
 
 from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import HotKeysGenerate
 from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
 from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
 
@@ -219,5 +220,10 @@ class TaskHandler(TaskMapper):
         await task.classify_articles_by_llm()
         return self.TASK_SUCCESS_STATUS
 
+    async def _hot_keys_generate_handler(self) -> int:
+        task = HotKeysGenerate(self.db_client, self.log_client, self.trace_id)
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 3 - 1
applications/tasks/task_scheduler.py

@@ -198,7 +198,9 @@ class TaskScheduler(TaskHandler):
             # 热点事件抓取
             "crawler_hot_point": self._crawl_hot_point_handler,
             # 热点事件判断
-            "analysis_hot_point": self._analysis_hot_point_handler
+            "analysis_hot_point": self._analysis_hot_point_handler,
+            # 热搜词生成
+            "hot_keys_generate": self._hot_keys_generate_handler,
         }
 
         if task_name not in handlers: