Browse Source

增加冷启动任务

luojunhui 1 month ago
parent
commit
636b3cb964
1 changed files with 74 additions and 14 deletions
  1. 74 14
      applications/tasks/crawler_tasks/crawler_toutiao.py

+ 74 - 14
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 import json
 import time
-import requests
+import aiohttp
 import traceback
 from datetime import datetime
 from typing import List, Dict
@@ -12,7 +12,7 @@ from tqdm import tqdm
 from applications.api import feishu_robot
 from applications.crawler.toutiao import get_toutiao_account_info_list
 from applications.pipeline import CrawlerPipeline
-from applications.utils import proxy
+from applications.utils import async_proxy
 
 
 class CrawlerToutiaoConst:
@@ -38,6 +38,8 @@ class CrawlerToutiaoConst:
     # sleep second
     SLEEP_SECOND = 3
 
+    RECOMMEND_TIMES = 10
+
 
 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
     def __init__(self, pool, log_client, trace_id):
@@ -83,6 +85,16 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
         """
         response = await self.pool.async_fetch(query)
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "crawler_toutiao",
+                "function": "get_account_list",
+                "message": f"get toutiao account list, media_type: {media_type}",
+                "status": "success",
+                "data": response
+            }
+        )
         if not response:
             await feishu_robot.bot(
                 title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
@@ -144,7 +156,6 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                     raise Exception(f"unknown media type: {media_type}")
 
             crawler_info_list_bar = tqdm(info_list, desc=bar_description)
-            print(json.dumps(info_list, ensure_ascii=False, indent=4))
             for info in crawler_info_list_bar:
                 try:
                     crawler_info_list_bar.set_postfix({"id": info["id"]})
@@ -201,7 +212,16 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 }
             case _:
                 raise Exception(f"unknown method: {method}")
-
+        await self.log_client.log(
+            contents={
+                "task": "crawler_toutiao",
+                "function": "crawler_each_article",
+                "trace_id": self.trace_id,
+                "message": "抓取文章成功",
+                "status": "success",
+                "data": new_article_item,
+            }
+        )
         await self.save_item_to_database(media_type="article", item=new_article_item)
 
     async def crawler_each_video(self, video_raw_data):
@@ -261,13 +281,24 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 await self.update_account_max_cursor(
                     media_type=media_type, account_id=account_id
                 )
+                await self.log_client.log(
+                    contents={
+                    "trace_id": self.trace_id,
+                    "task": "crawler_toutiao_account_info",
+                    "function": "crawler_task",
+                    "message": f"crawler account: {account_id} successfully, media type: {media_type}",
+                    "status": "success"
+                }
+                )
 
             except Exception as e:
                 await self.log_client.log(
                     contents={
+                        "trace_id": self.trace_id,
                         "task": "crawler_toutiao_account_info",
                         "function": "crawler_task",
-                        "message": account_id,
+                        "message": f"crawler_account: {account_id} fail",
+                        "status": "fail",
                         "data": {
                             "media_type": media_type,
                             "error": str(e),
@@ -281,16 +312,45 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
         if not cookie:
             return
 
-        for crawler_time in range(10):
-            response = requests.request(
-                method=cookie["request_method"],
-                url=cookie["request_url"],
-                headers=json.loads(cookie["request_headers"]),
-                proxies=proxy(),
-            )
-            if response.text is None:
+        for crawler_time in range(self.RECOMMEND_TIMES):
+            try:
+                proxy_url = async_proxy()["url"]
+                proxy_auth = aiohttp.BasicAuth(async_proxy()["username"], async_proxy()["password"])
+                async with aiohttp.ClientSession() as session:
+                    async with session.request(
+                        method=cookie["request_method"],
+                        url=cookie["request_url"],
+                        headers=json.loads(cookie["request_headers"]),
+                        proxy=proxy_url,
+                        proxy_auth=proxy_auth
+                    ) as response:
+                        response.raise_for_status()
+                        response_json = await response.json()
+
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "crawler_recommend_articles",
+                        "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
+                        "trace_id": self.trace_id,
+                        "status": "success",
+                        "data": response_json,
+                    }
+                )
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "crawler_recommend_articles",
+                        "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
+                        "status": "fail",
+                        "trace_id": self.trace_id,
+                        "data": {"error": str(e), "traceback": traceback.format_exc(),},
+                    }
+                )
                 continue
-            article_list = response.json()["data"]
+
+            article_list = response_json["data"]
             for article in article_list:
                 if article.get("article_url"):
                     video_flag = article.get("has_video")