Bladeren bron

新增热点事件

luojunhui 1 week geleden
bovenliggende
commit
c51ca91849

+ 1 - 1
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 8
+workers = 2
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "warning"  # 日志级别

+ 1 - 0
applications/tasks/dev/__init__.py

@@ -0,0 +1 @@
+from .data_analysis import DataAnalysis

+ 54 - 0
applications/tasks/dev/data_analysis.py

@@ -0,0 +1,54 @@
+from tqdm.asyncio import tqdm
+from applications.utils import run_tasks_with_asyncio_task_group
+
+
+class DataAnalysis:
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def get_articles(self):
+        query = """
+            select id, title
+            from publish_account_category_detail
+            where category is null;
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def get_title_category(self, title: str):
+        query = f"""
+            select category from article_category
+            where title = %s and status = 2 and version = 2
+            order by id desc limit 1;
+        """
+        return await self.pool.async_fetch(query=query, params=(title,))
+
+    async def set_each_record(self, article):
+        id_ = article['id']
+        title = article['title']
+        result = await self.get_title_category(title)
+        if not result:
+            category = "empty"
+        else:
+            category = result[0]['category']
+        update_query = """
+            update publish_account_category_detail
+            set category = %s
+            where id = %s;
+        """
+        await self.pool.async_save(query=update_query, params=(category, id_))
+
+    async def deal(self):
+        articles = await self.get_articles()
+        return await run_tasks_with_asyncio_task_group(
+            task_list=articles,
+            handler=self.set_each_record,
+            max_concurrency=20,
+            fail_fast=False,
+            description="更新文章分类",
+            unit="id",
+        )
+        for article in tqdm(articles):
+            await self.set_each_record(article)
+
+
+

+ 7 - 0
applications/tasks/task_handler.py

@@ -29,6 +29,8 @@ from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 from applications.tasks.monitor_tasks import TaskProcessingMonitor
 
+from applications.tasks.dev import DataAnalysis
+
 from applications.tasks.task_mapper import TaskMapper
 
 
@@ -206,5 +208,10 @@ class TaskHandler(TaskMapper):
         await task.deal(params=self.data)
         return self.TASK_SUCCESS_STATUS
 
+    async def _data_analysis_handler(self) -> int:
+        task = DataAnalysis(self.db_client)
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -195,6 +195,8 @@ class TaskScheduler(TaskHandler):
             "crawler_detail_analysis": self._crawler_article_analysis_handler,
             # 小程序裂变信息处理
             "mini_program_detail_process": self._mini_program_detail_handler,
+            # 数据分析
+            "data_analysis": self._data_analysis_handler,
         }
 
         if task_name not in handlers: