luojunhui 2 hónapja
szülő
commit
96f6d09a17

+ 1 - 1
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 8
+workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "warning"  # 日志级别

+ 1 - 0
applications/tasks/other/__init__.py

@@ -0,0 +1 @@
+from .cooperate_account_analysis import CooperateAccountAnalysisTask

+ 71 - 0
applications/tasks/other/cooperate_account_analysis.py

@@ -0,0 +1,71 @@
+from tqdm.asyncio import tqdm
+import json
+
+from applications.crawler.wechat import get_article_list_from_account
+
+class CooperateAccountAnalysisTask:
+
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def insert_account(self, account: dict):
+        insert_query = """
+            INSERT IGNORE INTO cooperate_accounts_temp
+            (partner_name, partner_id, gh_id, account_name)
+            VALUES (%s, %s, %s, %s);
+        """
+        await self.pool.async_save(
+            query=insert_query,
+            params=(
+                account.get('partner_name'),
+                account.get('partner_id'),
+                account.get('gh_id'),
+                account.get('account_name')
+            )
+        )
+
+    async def update_each_account(self, gh_id: str, response: dict):
+        if response['code'] == 25013:
+            status = 0
+            result = response['msg']
+        else:
+            status = 1
+            result = json.dumps(response['data'], ensure_ascii=False)
+        update_query = """
+            UPDATE cooperate_accounts_temp
+            SET status = %s, recent_articles= %s
+            WHERE gh_id = %s;
+        """
+        await self.pool.async_save(
+            query=update_query,
+            params=(
+                status,
+                json.dumps(result, ensure_ascii=False),
+                gh_id
+            )
+        )
+
+    async def get_account_list(self):
+        select_query = """
+            SELECT gh_id FROM cooperate_accounts_temp
+            WHERE status = %s and id > 11;
+        """
+        return await self.pool.async_fetch(
+            query=select_query,
+            params=(1, )
+        )
+
+    async def init_account_list(self, account_list):
+        for account in tqdm(account_list, desc="Dealing Accounts"):
+            await self.insert_account(account)
+
+    async def deal(self):
+        account_list = await self.get_account_list()
+        for account in tqdm(account_list, desc="Dealing Accounts"):
+            gh_id = account['gh_id']
+            response = await get_article_list_from_account(gh_id)
+            await self.update_each_account(gh_id, response)
+
+
+

+ 10 - 0
applications/tasks/task_handler.py

@@ -41,6 +41,8 @@ from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
 
 from applications.tasks.task_mapper import TaskMapper
 
+from applications.tasks.other import CooperateAccountAnalysisTask
+
 
 class TaskHandler(TaskMapper):
     def __init__(self, data, log_service, db_client, trace_id):
@@ -275,5 +277,13 @@ class TaskHandler(TaskMapper):
         await task.deal(date_string=self.data.get("date_string"))
         return self.TASK_SUCCESS_STATUS
 
+     # 处理合作账号分析任务
+    async def _cooperate_account_analysis_handler(self) -> int:
+        task = CooperateAccountAnalysisTask(
+            pool=self.db_client, log_client=self.log_client
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -209,6 +209,8 @@ class TaskScheduler(TaskHandler):
             "update_account_read_avg": self._update_account_read_avg_handler,
             # 更新账号打开率均值
             "update_account_open_rate_avg": self._update_account_open_rate_avg_handler,
+            # 合作账号分析
+            "cooperate_account_analysis": self._cooperate_account_analysis_handler,
         }
 
         if task_name not in handlers: