Quellcode durchsuchen

更新小程序任务优化

luojunhui vor 2 Monaten
Ursprung
Commit
d0d74c7e1d

+ 1 - 1
applications/api/__init__.py

@@ -43,5 +43,5 @@ __all__ = [
     "auto_create_crawler_task",
     "auto_bind_crawler_task_to_generate_task",
     "AsyncElasticSearchClient",
-    "insert_crawler_relation_to_aigc_system"
+    "insert_crawler_relation_to_aigc_system",
 ]

+ 9 - 6
applications/api/async_aigc_system_api.py

@@ -24,6 +24,7 @@ PERSON_COOKIE = {
     "uid": 1,
 }
 
+
 class RelationDict(TypedDict):
     videoPoolTraceId: str
     channelContentId: str
@@ -174,10 +175,12 @@ async def get_generate_task_detail(generate_task_id):
         return {}
 
 
-async def insert_crawler_relation_to_aigc_system(relation_list: List[RelationDict]) -> Optional[Dict]:
-        url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
-        payload = json.dumps({"params": {"relations": relation_list}})
-        async with AsyncHttpClient(timeout=60) as client:
-            res = await client.post(url=url, headers=HEADERS, data=payload)
+async def insert_crawler_relation_to_aigc_system(
+    relation_list: List[RelationDict],
+) -> Optional[Dict]:
+    url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
+    payload = json.dumps({"params": {"relations": relation_list}})
+    async with AsyncHttpClient(timeout=60) as client:
+        res = await client.post(url=url, headers=HEADERS, data=payload)
 
-        return res
+    return res

+ 1 - 1
applications/config/__init__.py

@@ -38,5 +38,5 @@ __all__ = [
     "input_source_map",
     "name_map",
     "CATEGORY_FEATURES",
-    "CATEGORY_MAP"
+    "CATEGORY_MAP",
 ]

+ 2 - 5
applications/config/cold_start_config.py

@@ -13,10 +13,7 @@ cold_start_category_map = {
     "家长里短": "20250813034159621236902",
     "军事历史": "20250813034227997109122",
     "财经科技": "20250813034253336624837",
-    "政治新闻": "20250813034320561348119"
+    "政治新闻": "20250813034320561348119",
 }
 
-input_source_map = {
-    "weixin": 5,
-    "toutiao": 6
-}
+input_source_map = {"weixin": 5, "toutiao": 6}

+ 2 - 2
applications/config/task_chinese_name.py

@@ -14,5 +14,5 @@ name_map = {
     "get_off_videos": "自动下架视频",
     "check_publish_video_audit_status": "校验发布视频状态",
     "check_kimi_balance": "检验kimi余额",
-    "account_category_analysis": "账号品类分析"
-}
+    "account_category_analysis": "账号品类分析",
+}

+ 9 - 1
applications/pipeline/__init__.py

@@ -1,2 +1,10 @@
-from .data_recycle_pipeline import insert_article_into_recycle_pool
 from .crawler_pipeline import CrawlerPipeline
+
+from .data_recycle_pipeline import insert_article_into_recycle_pool
+from .data_recycle_pipeline import insert_into_mini_program_detail_pool
+
+__all__ = [
+    "CrawlerPipeline",
+    "insert_article_into_recycle_pool",
+    "insert_into_mini_program_detail_pool",
+]

+ 24 - 0
applications/pipeline/data_recycle_pipeline.py

@@ -126,3 +126,27 @@ async def insert_article_into_recycle_pool(
                     "data": {"account_name": account_info["name"]},
                 }
             )
+
+
+async def insert_into_mini_program_detail_pool(pool, raw: Dict) -> int:
+    query = """
+        INSERT IGNORE INTO long_articles_detail_info
+            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+        VALUES
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    return await pool.async_save(
+        query=query,
+        db_name="piaoquan_crawler",
+        params=(
+            raw["wx_sn"],
+            raw["mini_title"],
+            raw["mini_name"],
+            raw["cover_url"],
+            raw["video_index"],
+            raw["root_source_id"],
+            raw["video_id"],
+            raw["publish_dt"],
+            raw["recall_dt"],
+        ),
+    )

+ 5 - 8
applications/service/task_manager_service.py

@@ -106,17 +106,14 @@ class TaskManagerService(TaskConst):
         items = [
             {
                 **r,
-                "status_text": self.STATUS_TEXT.get(r["task_status"], str(r["task_status"])),
-                "task_name": get_task_chinese_name(_safe_json(r["data"]))
+                "status_text": self.STATUS_TEXT.get(
+                    r["task_status"], str(r["task_status"])
+                ),
+                "task_name": get_task_chinese_name(_safe_json(r["data"])),
             }
             for r in rows
         ]
-        return {
-            "total": total,
-            "page": page,
-            "page_size": page_size,
-            "items": items
-        }
+        return {"total": total, "page": page, "page_size": page_size, "items": items}
 
     async def get_task(self, task_id: int):
         pass

+ 1 - 1
applications/tasks/algorithm_tasks/__init__.py

@@ -1,3 +1,3 @@
 from .account_category_analysis import AccountCategoryAnalysis
 
-__all__ = ["AccountCategoryAnalysis"]
+__all__ = ["AccountCategoryAnalysis"]

+ 11 - 2
applications/tasks/algorithm_tasks/account_category_analysis.py

@@ -139,7 +139,12 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         params, t_stats, p_values = self.run_ols_linear_regression(
             sub_df, self.view_only, self.P_VALUE_THRESHOLD
         )
-        current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
+        current_record = {
+            "dt": end_dt,
+            "gh_id": account_id,
+            "category_map": {},
+            "name": account_name,
+        }
         params_names = self.get_param_names()
         for name, param, p_value in zip(params_names, params, p_values):
             category_name = param_to_category_map.get(name, None)
@@ -197,5 +202,9 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
 
         for account_id in tqdm(account_ids, desc="analysis each account"):
             await self.predict_each_account(
-                pre_processed_dataframe, account_id, account_id_map, end_dt, param_to_category_map
+                pre_processed_dataframe,
+                account_id,
+                account_id_map,
+                end_dt,
+                param_to_category_map,
             )

+ 1 - 1
applications/tasks/cold_start_tasks/article_pool/article_pool_filter_strategy.py

@@ -121,7 +121,7 @@ class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
                 self.cold_start_records.append(
                     {
                         "category": category,
-                        "cold_start_num": min(daily_article_num, len(filter_df))
+                        "cold_start_num": min(daily_article_num, len(filter_df)),
                     }
                 )
 

+ 18 - 12
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -16,7 +16,7 @@ from applications.config import cold_start_category_map, input_source_map
 from applications.utils import get_titles_from_produce_plan
 from applications.tasks.cold_start_tasks.article_pool import (
     ArticlePoolColdStartStrategy,
-    ArticlePoolFilterStrategy
+    ArticlePoolFilterStrategy,
 )
 
 
@@ -111,13 +111,13 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         return affect_rows
 
     async def create_crawler_plan_and_bind_to_produce_plan(
-            self,
-            strategy: str,
-            crawl_method: str,
-            category: str,
-            platform: str,
-            url_list: List[str],
-            plan_id: str,
+        self,
+        strategy: str,
+        crawl_method: str,
+        category: str,
+        platform: str,
+        url_list: List[str],
+        plan_id: str,
     ):
         # create_crawler_plan
         crawler_plan_response = await auto_create_crawler_task(
@@ -202,16 +202,23 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     url_list = filter_category_df["link"].values.tolist()
                     if url_list:
                         await self.create_crawler_plan_and_bind_to_produce_plan(
-                            strategy, crawl_method, ai_category, platform, url_list, plan_id
+                            strategy,
+                            crawl_method,
+                            ai_category,
+                            platform,
+                            url_list,
+                            plan_id,
                         )
                         # change article status
-                        article_id_list = filter_category_df["article_id"].values.tolist()
+                        article_id_list = filter_category_df[
+                            "article_id"
+                        ].values.tolist()
                         await self.change_article_status_while_publishing(
                             article_id_list=article_id_list
                         )
 
             case "strategy_v2":
-                url_list =  filter_article_df["link"].values.tolist()
+                url_list = filter_article_df["link"].values.tolist()
                 await self.create_crawler_plan_and_bind_to_produce_plan(
                     strategy, crawl_method, category, platform, url_list, plan_id
                 )
@@ -221,7 +228,6 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     article_id_list=article_id_list
                 )
 
-
     async def deal(
         self,
         platform: str,

+ 11 - 9
applications/tasks/cold_start_tasks/video_pool/video_pool_audit_strategy.py

@@ -18,7 +18,9 @@ class VideoPoolAuditStrategy(VideoPoolConst):
             SET audit_status = %s 
             WHERE audit_video_id = %s AND audit_status = %s;
         """
-        return await self.pool.async_save(query=query, params=(new_status, video_id, ori_status))
+        return await self.pool.async_save(
+            query=query, params=(new_status, video_id, ori_status)
+        )
 
     async def get_auditing_video_list(self):
         """get auditing video list"""
@@ -27,13 +29,13 @@ class VideoPoolAuditStrategy(VideoPoolConst):
             from publish_single_video_source
             where audit_status = %s
         """
-        return await self.pool.async_fetch(query=query, params=(-1, ))
+        return await self.pool.async_fetch(query=query, params=(-1,))
 
     async def get_video_audit_info(self, video_obj):
         """
         get audit video info from piaoquan
         """
-        video_id = video_obj['audit_video_id']
+        video_id = video_obj["audit_video_id"]
         response = await fetch_piaoquan_video_list_detail([video_id])
         response_data = response.get("data")
         if not response_data:
@@ -50,7 +52,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     affected_rows = await self.update_video_audit_status(
                         video_id=video_id,
                         ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                        new_status=self.VIDEO_AUDIT_SUCCESS_STATUS
+                        new_status=self.VIDEO_AUDIT_SUCCESS_STATUS,
                     )
                     # 将视频存储到任务队列
                     self.insert_into_task_queue(video_obj)
@@ -59,9 +61,9 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     await insert_crawler_relation_to_aigc_system(
                         relation_list=[
                             {
-                                "videoPoolTraceId": video_obj['content_trace_id'],
+                                "videoPoolTraceId": video_obj["content_trace_id"],
                                 "channelContentId": str(video_id),
-                                "platform": video_obj['platform'],
+                                "platform": video_obj["platform"],
                             }
                         ]
                     )
@@ -70,7 +72,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     affected_rows = await self.update_video_audit_status(
                         video_id=video_id,
                         ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                        new_status=self.VIDEO_TITLE_GENERATE_FAIL_STATUS
+                        new_status=self.VIDEO_TITLE_GENERATE_FAIL_STATUS,
                     )
 
             case self.PQ_AUDIT_SELF_VISIBLE_STATUS, self.PQ_AUDIT_FAIL_STATUS:
@@ -78,7 +80,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                 affected_rows = await self.update_video_audit_status(
                     video_id=video_id,
                     ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                    new_status=self.VIDEO_AUDIT_FAIL_STATUS
+                    new_status=self.VIDEO_AUDIT_FAIL_STATUS,
                 )
 
             case self.PQ_AUDIT_PROCESSING_STATUS:
@@ -90,5 +92,5 @@ class VideoPoolAuditStrategy(VideoPoolConst):
         return {
             "affected_rows": affected_rows,
             "video_id": video_id,
-            "audit_status": audit_status
+            "audit_status": audit_status,
         }

+ 1 - 1
applications/tasks/cold_start_tasks/video_pool/video_pool_const.py

@@ -72,4 +72,4 @@ class VideoPoolConst:
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
     SUCCESS_STATUS = 2
-    FAIL_STATUS = 99
+    FAIL_STATUS = 99

+ 1 - 3
applications/tasks/cold_start_tasks/video_pool_cold_start.py

@@ -1,4 +1,2 @@
-
-
 class VideoPoolColdStart:
-    pass
+    pass

+ 22 - 6
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -52,7 +52,8 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     where account_category = %s and is_using = %s and daily_scrape = %s;
                 """
                 return await self.pool.async_fetch(
-                    query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE)
+                    query=query,
+                    params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE),
                 )
 
             case "V2":
@@ -63,7 +64,8 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     order by recent_score_ci_lower desc limit %s; 
                 """
                 return await self.pool.async_fetch(
-                    query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL)
+                    query=query,
+                    params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL),
                 )
 
             case _:
@@ -161,7 +163,15 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                 """
                 insert_rows = await self.pool.async_save(
                     query=insert_query,
-                    params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark),
+                    params=(
+                        gh_id,
+                        account_name,
+                        position,
+                        read_avg,
+                        today_dt,
+                        self.USING_STATUS,
+                        remark,
+                    ),
                 )
                 if insert_rows:
                     update_query = """
@@ -284,14 +294,18 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str):
+    async def crawl_search_articles_detail(
+        self, article_list: List[Dict], source_title: str
+    ):
         """
         @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
         """
         for article in tqdm(article_list, desc="获取搜索结果详情"):
             print(f"{datetime.now()}: start crawling article: {article['title']}")
             url = article["url"]
-            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
+            detail_response = await get_article_detail(
+                url, is_count=True, is_cache=False
+            )
             if not detail_response:
                 continue
 
@@ -351,7 +365,9 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
             try:
                 await self.search_each_title(hot_title)
             except Exception as e:
-                print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}")
+                print(
+                    f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}"
+                )
 
             print(f"{datetime.now()}: finish searched hot title: {hot_title}")
         await feishu_robot.bot(

+ 4 - 2
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -306,7 +306,9 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
                 mini_program = data.get("mini_program", [])
                 if mini_program:
                     root_source_id_list = [
-                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get("rootSourceId", [""])[0]
+                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get(
+                            "rootSourceId", [""]
+                        )[0]
                         for i in mini_program
                     ]
                 else:
@@ -345,7 +347,7 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             ),
         )
         if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
-            article['wx_sn'] = wx_sn
+            article["wx_sn"] = wx_sn
             return article
         else:
             return None

+ 40 - 24
applications/tasks/data_recycle_tasks/recycle_root_source_id_detail.py

@@ -5,7 +5,14 @@ from datetime import datetime, timedelta
 from tqdm.asyncio import tqdm
 
 from applications.crawler.wechat import get_article_detail
-from applications.utils import get_beijing_date, handle_spider_exception, transform_to_beijing_date, extract_root_source_id
+from applications.pipeline import insert_into_mini_program_detail_pool
+from applications.utils import (
+    get_beijing_date,
+    handle_spider_exception,
+    transform_to_beijing_date,
+    extract_root_source_id,
+)
+
 
 class Const:
     ARTICLE_SUCCESS_CODE = 0
@@ -52,7 +59,9 @@ class RecycleRootSourceIdDetail(Const):
             from long_articles_root_source_id
             where root_source_id in %s;
         """
-        return await self.pool.async_fetch(query=query, params=(tuple(root_source_id_list),))
+        return await self.pool.async_fetch(
+            query=query, params=(tuple(root_source_id_list),)
+        )
 
     async def get_article_mini_program_detail(self, url, root_source_id_list):
         if not root_source_id_list:
@@ -70,11 +79,13 @@ class RecycleRootSourceIdDetail(Const):
                     error=e,
                     traceback=traceback.format_exc(),
                     trace_id=self.trace_id,
-                    task_name=self.__class__.__name__
+                    task_name=self.__class__.__name__,
                 )
                 return []
         else:
-            mini_program_info = await self.get_mini_program_info_by_root_source_id(root_source_id_list)
+            mini_program_info = await self.get_mini_program_info_by_root_source_id(
+                root_source_id_list
+            )
             if mini_program_info:
                 return [
                     {
@@ -87,7 +98,8 @@ class RecycleRootSourceIdDetail(Const):
                         "service_type": "0",
                         "title": "",
                         "type": "card",
-                    } for item in mini_program_info
+                    }
+                    for item in mini_program_info
                 ]
             else:
                 return []
@@ -96,10 +108,16 @@ class RecycleRootSourceIdDetail(Const):
         url = article["ContentUrl"]
         wx_sn = article["wx_sn"].decode("utf-8")
         publish_timestamp = article["publish_timestamp"]
-        root_source_id_list = json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else []
+        root_source_id_list = (
+            json.loads(article["root_source_id_list"])
+            if article["root_source_id_list"]
+            else []
+        )
 
         # get article mini program info
-        article_mini_program_detail = await self.get_article_mini_program_detail(url, root_source_id_list)
+        article_mini_program_detail = await self.get_article_mini_program_detail(
+            url, root_source_id_list
+        )
         if not article_mini_program_detail:
             return {}
         else:
@@ -110,7 +128,6 @@ class RecycleRootSourceIdDetail(Const):
                     (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
                     for i in range(3)
                 ]
-
                 for date_str in recall_dt_str_list:
                     for video_index, mini_item in enumerate(
                         article_mini_program_detail, 1
@@ -128,24 +145,27 @@ class RecycleRootSourceIdDetail(Const):
                             root_source_id = id_info["root_source_id"]
                             video_id = id_info["video_id"]
                         kimi_title = mini_item["title"]
-                        # self.insert_each_root_source_id(
-                        #     wx_sn=wx_sn,
-                        #     mini_title=kimi_title,
-                        #     mini_name=nick_name,
-                        #     cover_url=image_url,
-                        #     video_index=video_index,
-                        #     root_source_id=root_source_id,
-                        #     video_id=video_id,
-                        #     publish_dt=publish_date.strftime("%Y-%m-%d"),
-                        #     recall_dt=date_str,
-                        # )
+                        await insert_into_mini_program_detail_pool(
+                            self.pool,
+                            raw={
+                                "wx_sn": wx_sn,
+                                "mini_title": kimi_title,
+                                "root_source_id": root_source_id,
+                                "video_id": video_id,
+                                "mini_name": nick_name,
+                                "cover_url": image_url,
+                                "publish_dt": publish_date.strftime("%Y-%m-%d"),
+                                "recall_dt": date_str,
+                                "video_index": video_index,
+                            },
+                        )
+
                 return {}
             except Exception as e:
                 print(e)
                 error_msg = traceback.format_exc()
                 return article
 
-
     async def deal(self):
         """deal function"""
         # step 1, record articles to detail table
@@ -154,7 +174,3 @@ class RecycleRootSourceIdDetail(Const):
             await self.record_single_article(article)
 
         # step2, update root_source_id detail info
-
-
-
-

+ 11 - 3
applications/tasks/task_handler.py

@@ -109,7 +109,12 @@ class TaskHandler(TaskMapper):
         crawler_methods = self.data.get("crawler_methods", [])
         category_list = self.data.get("category_list", [])
         strategy = self.data.get("strategy", "strategy_v1")
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods, category_list=category_list, strategy=strategy)
+        await cold_start.deal(
+            platform=platform,
+            crawl_methods=crawler_methods,
+            category_list=category_list,
+            strategy=strategy,
+        )
         return self.TASK_SUCCESS_STATUS
 
     async def _candidate_account_quality_score_handler(self) -> int:
@@ -168,8 +173,11 @@ class TaskHandler(TaskMapper):
 
     async def _account_category_analysis_handler(self) -> int:
         task = AccountCategoryAnalysis(
-            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id, data=self.data, date_string=None
+            pool=self.db_client,
+            log_client=self.log_client,
+            trace_id=self.trace_id,
+            data=self.data,
+            date_string=None,
         )
         await task.deal()
         return self.TASK_SUCCESS_STATUS
-

+ 3 - 1
applications/tasks/task_scheduler.py

@@ -151,7 +151,9 @@ class TaskScheduler(TaskHandler):
                 "4003", "task_name must be input"
             )
 
-        date_str = self.data.get("date_string") or (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d")
+        date_str = self.data.get("date_string") or (
+            datetime.utcnow() + timedelta(hours=8)
+        ).strftime("%Y-%m-%d")
 
         # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
         handlers: Dict[str, Callable[[], Awaitable[int]]] = {

+ 15 - 14
applications/utils/common.py

@@ -225,25 +225,27 @@ def get_task_chinese_name(data):
     """
     通过输入任务详情信息获取任务名称
     """
-    task_name = data['task_name']
+    task_name = data["task_name"]
     task_name_chinese = name_map.get(task_name, task_name)
 
     # account_method
-    if task_name == 'crawler_gzh_articles':
-        account_method = data.get('account_method', '')
-        account_method = account_method.replace("account_association", "账号联想").replace("search", "")
-        crawl_mode = data.get('crawl_mode', '')
+    if task_name == "crawler_gzh_articles":
+        account_method = data.get("account_method", "")
+        account_method = account_method.replace(
+            "account_association", "账号联想"
+        ).replace("search", "")
+        crawl_mode = data.get("crawl_mode", "")
         crawl_mode = crawl_mode.replace("search", "搜索").replace("account", "抓账号")
-        strategy = data.get('strategy', '')
+        strategy = data.get("strategy", "")
         return f"{task_name_chinese}\t{crawl_mode}\t{account_method}\t{strategy}"
-    elif task_name == 'article_pool_cold_start':
-        platform = data.get('platform', '')
-        platform = platform.replace('toutiao', '今日头条').replace("weixin", "微信")
-        strategy = data.get('strategy', '')
+    elif task_name == "article_pool_cold_start":
+        platform = data.get("platform", "")
+        platform = platform.replace("toutiao", "今日头条").replace("weixin", "微信")
+        strategy = data.get("strategy", "")
         strategy = strategy.replace("strategy", "策略")
-        category_list = data.get('category_list', [])
+        category_list = data.get("category_list", [])
         category_list = "、".join(category_list)
-        crawler_methods = data.get('crawler_methods', [])
+        crawler_methods = data.get("crawler_methods", [])
         crawler_methods = "、".join(crawler_methods)
         return f"{task_name_chinese}\t{platform}\t{crawler_methods}\t{category_list}\t{strategy}"
     else:
@@ -253,10 +255,9 @@ def get_task_chinese_name(data):
 def get_beijing_date():
     return (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d")
 
+
 def transform_to_beijing_date(publish_timestamp):
     utc_time = datetime.utcfromtimestamp(publish_timestamp)
     # 添加8小时偏移
     utc_plus_8_time = utc_time + timedelta(hours=8)
     return utc_plus_8_time.strftime("%Y-%m-%d")
-
-

+ 2 - 5
applications/utils/exceptions.py

@@ -2,6 +2,7 @@
 Exception Handle Functions
 """
 
+
 async def handle_spider_exception(log_client, error, traceback, trace_id, task_name):
     await log_client.log(
         contexts={
@@ -11,10 +12,6 @@ async def handle_spider_exception(log_client, error, traceback, trace_id, task_n
             },
             "trace_id": trace_id,
             "message": "爬虫接口请求失败",
-            "task": task_name
+            "task": task_name,
         }
     )
-
-
-
-