luojunhui пре 1 месец
родитељ
комит
870ac798f2
1 измењених фајлова са 57 додато и 13 уклоњено
  1. 57 13
      applications/tasks/crawler_tasks/crawler_gzh.py

+ 57 - 13
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -4,7 +4,7 @@ import asyncio
 import json
 import time
 import traceback
-from datetime import datetime
+from datetime import datetime, date
 from typing import List, Dict
 
 from applications.api import feishu_robot
@@ -20,10 +20,11 @@ class CrawlerGzhConst:
     DEFAULT_VIEW_COUNT = 0
     DEFAULT_LIKE_COUNT = 0
     DEFAULT_ARTICLE_STATUS = 1
+    STAT_DURATION = 30 # days
     DEFAULT_TIMESTAMP = 1735660800
 
 
-class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst):
+class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client)
         self.trace_id = trace_id
@@ -44,9 +45,7 @@ class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst):
                     from long_articles_accounts
                     where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; 
                 """
-                return await self.pool.async_fetch(
-                    query=query, params=(method, 1, 100)
-                )
+                return await self.pool.async_fetch(query=query, params=(method, 1, 100))
             case _:
                 raise Exception("strategy not supported")
 
@@ -98,8 +97,53 @@ class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst):
             media_type="article", item=new_item, trace_id=self.trace_id
         )
 
+    async def update_account_read_avg_info(self, gh_id, account_name):
+        """update account read avg info"""
+        position_list = [i for i in range(1, 9)]
+        today_dt = date.today().isoformat()
+        for position in position_list:
+            query = """
+                select read_cnt, from_unixtime(publish_time, '%Y-%m_%d') as publish_dt from crawler_meta_article
+                where out_account_id = %s and article_index = %s
+                order by publish_time desc limit %s;
+            """
+            fetch_response = await self.pool.async_fetch(
+                query=query, params=(gh_id, position, self.STAT_DURATION)
+            )
+            if fetch_response:
+                read_cnt_list = [i["read_cnt"] for i in fetch_response]
+                n = len(read_cnt_list)
+                read_avg = sum(read_cnt_list) / n
+                max_publish_dt = fetch_response[0]["publish_dt"]
+                remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
+                insert_query = f"""
+                    insert ignore into crawler_meta_article_accounts_read_avg
+                    (gh_id, account_name, position, read_avg, dt, status, remark)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s);
+                """
+                insert_rows = await self.pool.async_save(
+                    query=insert_query,
+                    params=(
+                        gh_id,
+                        account_name,
+                        position,
+                        read_avg,
+                        today_dt,
+                        1,
+                        remark,
+                    ),
+                )
+                if insert_rows:
+                    update_query = f"""
+                        update crawler_meta_article_accounts_read_avg
+                        set status = %s 
+                        where gh_id = %s and position = %s and dt < %s;
+                    """
+                    self.pool.async_save(update_query, (0, gh_id, position, today_dt))
 
-class CrawlerGzhAccountArticles(CrawlerGzhStrategy):
+
+class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
@@ -162,27 +206,27 @@ class CrawlerGzhAccountArticles(CrawlerGzhStrategy):
             print(account)
             try:
                 await self.crawler_single_account(method, account)
+                await self.update_account_read_avg_info(
+                    gh_id=account["gh_id"], account_name=account["account_name"]
+                )
             except Exception as e:
                 await self.log_client.log(
                     contents={
                         "task": "crawler_gzh_articles",
-                        "trace_id": account["trace_id"],
+                        "trace_id": self.trace_id,
                         "data": {
                             "account_id": account["account_id"],
                             "account_method": method,
                             "error": str(e),
                             "traceback": traceback.format_exc(),
-                        }
+                        },
                     }
                 )
 
 
-class CrawlerGzhSearchArticles(CrawlerGzhStrategy):
+class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
     async def deal(self):
-        return {
-            "mode": "search",
-            "message": "still developing"
-        }
+        return {"mode": "search", "message": "still developing"}