Jelajahi Sumber

增加trace_id记录

luojunhui 4 minggu lalu
induk
melakukan
5aadf0d7e7

+ 1 - 2
applications/pipeline/crawler_pipeline.py

@@ -60,6 +60,7 @@ class CrawlerPipeline(AsyncApolloApi):
 
     async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
         """deal function"""
+        item['trace_id'] = trace_id
         match media_type:
             case "video":
                 await self.save_single_record(media_type, item)
@@ -109,7 +110,6 @@ class CrawlerPipeline(AsyncApolloApi):
                         "message": "save article successfully",
                     }
                 )
-
             case "account":
                 if await self.whether_account_exist(
                     item["account_id"], item["media_type"]
@@ -125,6 +125,5 @@ class CrawlerPipeline(AsyncApolloApi):
                         "message": "save account successfully",
                     }
                 )
-
             case _:
                 raise Exception("Unknown media type")

+ 14 - 2
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -1,7 +1,6 @@
 from __future__ import annotations
 
 import asyncio
-import json
 import time
 import traceback
 from datetime import datetime, date, timedelta
@@ -179,7 +178,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
             case _:
                 raise Exception(f"unknown strategy: {strategy}")
         date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
-            "%Y-%m-%d"
+            "%Y%m%d"
         )
         return await get_hot_titles(
             self.pool,
@@ -327,6 +326,11 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
             # 更新page
             current_page = search_response.get("data", {}).get("next_cursor")
 
+    async def get_task_execute_result(self):
+        """get task execute result"""
+        query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;"""
+        return await self.pool.async_fetch(query=query, params=(self.trace_id,))
+
     async def deal(self, strategy: str = "V1"):
         hot_titles = await self.get_hot_titles_with_strategy(strategy)
         for hot_title in hot_titles:
@@ -335,3 +339,11 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
                 await self.search_each_title(hot_title)
             except Exception as e:
                 print(f"crawler_gzh_articles error:{e}")
+
+        await feishu_robot.bot(
+            title="公众号搜索任务执行完成",
+            detail={
+                "strategy": strategy,
+                "execute_detail": await self.get_task_execute_result()
+            }
+        )

+ 1 - 0
applications/utils/item.py

@@ -39,6 +39,7 @@ class CrawlerMetaArticle(BaseModel):
         default=0,
         description="文章内嵌套视频状态 0: init; 1: processing; 2: successfully; 3:article link bad ;99: fail",
     )
+    trace_id: str = Field(default=None, description="创建该条记录的任务ID")
 
 
 class CrawlerMetaAccount(BaseModel):