Bladeren bron

中青看点

zhangliang 3 dagen geleden
bovenliggende
commit
a2cd6d2f31

+ 1 - 2
application/functions/zqkd_db_redis.py

@@ -204,7 +204,6 @@ class RedisOperations:
             result = self.client.set("zqkd_last_scanned_id", last_scanned_id)
             if result:
                 self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")
-            return result
         except Exception as e:
             tb = traceback.format_exc()
             self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")
@@ -213,5 +212,5 @@ class RedisOperations:
 
 if __name__ == '__main__':
     db = DatabaseOperations("12", "123")
-    user = db.select_user()
+    user = db.select_user(10000)
     print(user)

+ 81 - 58
spider/crawler_author/zhongqingkandian_author.py

@@ -1,13 +1,14 @@
 import os
 import sys
-import asyncio
 import json
 import random
 import uuid
 import time
 import traceback
 from datetime import datetime
-import aiohttp
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
 
 sys.path.append(os.getcwd())
 from application.common.feishu import FsData
@@ -60,31 +61,32 @@ class ZhongQingKanDianAuthor:
         self.last_scanned_id = 0 if result is None else int(result)
         self.zqkd_user_list = self.db_ops.select_user(self.last_scanned_id)
         self.LocalLog.info(f"获取到的用户列表:{self.zqkd_user_list} \n 昨天最后扫描的用户ID{self.last_scanned_id}")
+        self.session = requests.session()
 
-    async def send_request(self, path, data):
+    def send_request(self, path, data):
         """
-        步发送 POST 请求到指定路径,带有重试机制。
+        步发送 POST 请求到指定路径,带有重试机制。
         :param path: 请求的 API 路径
         :param data: 请求的数据
         :return: 响应的 JSON 数据,如果请求失败则返回 None
         """
         full_url = f"{self.API_BASE_URL}{path}"
-        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
-            for retry in range(self.MAX_RETRIES):
-                try:
-                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
-                        response.raise_for_status()
-                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
-                        return await response.json()
-                except (aiohttp.ClientError, json.JSONDecodeError) as e:
-                    tb_info = traceback.format_exc()
-                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                    self.aliyun_log.logging(
-                        code="3000",
-                        message=f"请求 {path} 失败,错误信息: {str(e)}",
-                        data={"path": path}
-                    )
-                    await asyncio.sleep(5)
+
+        for retry in range(self.MAX_RETRIES):
+            try:
+                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
+                response.raise_for_status()
+                self.LocalLog.info(f"{path}响应数据:{response.json()}")
+                return response.json()
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"请求 {path} 失败,错误信息: {str(e)}",
+                    data={"path": path}
+                )
+                time.sleep(5)
         return None
 
     def is_response_valid(self, resp, url):
@@ -113,22 +115,21 @@ class ZhongQingKanDianAuthor:
             self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
             return None
 
-    async def req_user_list(self, account_id):
+    def req_user_list(self, account_id):
         """
-        异步请求与指定内容 ID 相关的推荐列表。
+        请求与指定内容 ID 相关的推荐列表。
         :param
         :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
         """
         try:
-
             url = '/crawler/zhong_qing_kan_dian/blogger'
             body = json.dumps({
-                 "account_id": f"{account_id}",
-                 "content_type": "全部",
-                 "cursor": f"{self.curses}"
+                "account_id": f"{account_id}",
+                "content_type": "全部",
+                "cursor": f"{self.curses}"
             })
             self.LocalLog.info(f"开始请求用户视频列表{body}")
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             return self.is_response_valid(resp, url)
         except Exception as e:
             tb_info = traceback.format_exc()
@@ -140,9 +141,9 @@ class ZhongQingKanDianAuthor:
             self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e}   \n{tb_info}")
             return None
 
-    async def req_detail(self, content_link, **kwargs):
+    def req_detail(self, content_link, **kwargs):
         """
-        异步请求视频详情。
+        请求视频详情。
         :param content_link: 视频内容链接
         :param kwargs: 额外的视频信息
         :return: 无返回值,处理视频详情信息
@@ -153,7 +154,7 @@ class ZhongQingKanDianAuthor:
             body = json.dumps({
                 "content_link": content_link
             })
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             if not self.is_response_valid(resp, url):
                 return
             data = resp.get("data", {}).get("data", {})
@@ -167,8 +168,8 @@ class ZhongQingKanDianAuthor:
                 return
             self.LocalLog.info(f"{content_link} 是视频")
             data.update(kwargs)
-            await self.process_video_obj(data)
-            await asyncio.sleep(10)
+            self.process_video_obj(data)
+            time.sleep(10)
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -177,7 +178,8 @@ class ZhongQingKanDianAuthor:
                 data={"content_link": content_link}
             )
             self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
-    async def control_request_author(self):
+
+    def control_request_author(self):
         """
         控制相关推荐视频列表的请求和处理流程。
         :return: 无返回值,根据下载数量限制控制流程
@@ -185,25 +187,31 @@ class ZhongQingKanDianAuthor:
         while self.limit_flag:
             try:
                 self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
-
                 if not self.zqkd_user_list:
                     self.LocalLog.info("没有用户数据")
-                    await asyncio.sleep(10)
+                    time.sleep(10)
                     continue
                 for user_info in self.zqkd_user_list:
+                    if not self.limit_flag:
+                        return
                     current_id, user_id = user_info
-                    author_resp = await self.req_user_list(user_id)
+                    author_resp = self.req_user_list(user_id)
                     if current_id > self.last_scanned_id:
                         self.last_scanned_id = current_id
                     if not author_resp:
                         continue
-                    author_data = author_resp.get("data", {}).get("data", [])
+                    author_data = author_resp.get("data", {})
+                    # 判断是否有下一页
                     if not author_data["next_cursor"]:
                         continue
-                    for author_obj in author_data:
-                        author_content_link = author_obj.get("share_url")
-                        if author_content_link:
-                            await self.req_detail(author_content_link, **author_obj)
+                    video_data = author_data.get("data", [])
+                    self.LocalLog.info(f"用户{user_id}第{self.curses}页数据长度{len(video_data)}")
+                    for video_obj in video_data:
+                        if not self.limit_flag:
+                            return
+                        video_content_link = video_obj.get("share_url")
+                        if video_content_link:
+                            self.req_detail(video_content_link, **video_obj)
             except Exception as e:
                 tb_info = traceback.format_exc()
                 self.aliyun_log.logging(
@@ -213,7 +221,7 @@ class ZhongQingKanDianAuthor:
                 )
                 self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
 
-    async def process_video_obj(self, video_obj):
+    def process_video_obj(self, video_obj):
         """
         处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
         :param video_obj: 视频对象,包含视频的各种信息
@@ -234,9 +242,9 @@ class ZhongQingKanDianAuthor:
             trace_id = self.platform + str(uuid.uuid1())
             item = VideoItem()
 
-            account_id = video_obj["channel_account_id"]
-            account_name = video_obj["channel_account_name"]
-            account_avatar = video_obj["avatar"]
+            # account_id = video_obj["channel_account_id"]
+            # account_name = video_obj["channel_account_name"]
+            # account_avatar = video_obj["avatar"]
             # # 检查用户ID是否存在
             # """
             # 需要改为判断redis
@@ -256,7 +264,7 @@ class ZhongQingKanDianAuthor:
 
             if video_duration > self.rule_dict.get("duration", {}).get("max",
                                                                        1200) or video_duration < self.rule_dict.get(
-                    "duration", {}).get("min", 30):
+                "duration", {}).get("min", 30):
                 self.aliyun_log.logging(
                     code="3005",
                     message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
@@ -267,7 +275,7 @@ class ZhongQingKanDianAuthor:
 
             item.add_video_info("video_id", video_obj['channel_content_id'])
             item.add_video_info("video_title", video_obj["title"])
-            item.add_video_info("play_cnt", video_obj["read_num"])
+            item.add_video_info("play_cnt",  self.convert_number(video_obj["read_num"]))
             item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
             item.add_video_info("out_user_id", video_obj["channel_account_id"])
             item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
@@ -319,13 +327,17 @@ class ZhongQingKanDianAuthor:
             self.mq.send_msg(mq_obj)
             # 保存视频ID
             self.redis_ops.save_video_id(video_obj['channel_content_id'])
-            if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300):
-                # 记录轮训到的用户id
+            if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300) and self.zqkd_user_list:
+                self.LocalLog.info("视频数量已达到预期")
+                # 判断视频数量达到预期且用户列表没有轮训完
                 self.redis_ops.set_last_scanned_id(self.last_scanned_id)
                 self.limit_flag = False
-            else:
+            elif not self.zqkd_user_list:
+                # 如果数据没达到预期数量,则重新开始扫用户数据,扫所有用户下一页的数据,直到数量达到预期
+                self.LocalLog.info("扫描到最后一个用户")
                 self.redis_ops.set_last_scanned_id(0)
                 self.curses += 1
+
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -335,22 +347,33 @@ class ZhongQingKanDianAuthor:
             )
             self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
 
-    async def run(self):
+    def convert_number(self,s):
+        if '万' in s:
+            try:
+                num = float(s.strip('万')) * 10000
+                return num
+            except ValueError:
+                self.LocalLog.info(f"无法将 '{s}' 转换为有效的数字。")
+        else:
+            try:
+                return int(s)
+            except ValueError:
+                self.LocalLog.info(f"'{s}' 不是有效的数字格式。")
+
+    def run(self):
         """
-        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
+        运行主流程,执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
 
         :return: 无返回值,程序运行的主逻辑
         """
         self.LocalLog.info("开始执行中青看点用户视频抓取...")
-        await asyncio.gather(
-            self.control_request_author()
-        )
+        self.control_request_author()
 
 
 if __name__ == '__main__':
-    asyncio.run(ZhongQingKanDianAuthor(
+    ZhongQingKanDianAuthor(
         platform="zhongqingkandian",
         mode="author",
-        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
+        rule_dict={'videos_cnt': {'min': 2, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
         user_list=[{"uid": 81525568, "link": "中青看点推荐", "nick_name": "芸芸众生"}]
-    ).run())
+    ).run()

+ 20 - 16
spider/crawler_offline/zhongqingkandian_old.py

@@ -477,26 +477,30 @@ class ZhongQingKanDian:
         self.content_recommend_list_request_count = 0
         self.detail_request_count = 0
 
-    def send_request(self, endpoint, data):
-        full_url = f"{self.API_BASE_URL}{endpoint}"
+    def send_request(self, path, data):
+        """
+        同步发送 POST 请求到指定路径,带有重试机制。
+        :param path: 请求的 API 路径
+        :param data: 请求的数据
+        :return: 响应的 JSON 数据,如果请求失败则返回 None
+        """
+        full_url = f"{self.API_BASE_URL}{path}"
+
         for retry in range(self.MAX_RETRIES):
             try:
-                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT)
+                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
                 response.raise_for_status()
-
+                self.LocalLog.info(f"{path}响应数据:{response.json()}")
                 return response.json()
-            except requests.RequestException as e:
-                Local.logger("zhongqingkandian", "recommend").info(
-                    f"请求 {full_url} 失败(第 {retry + 1} 次重试): {e}")
-                if retry < self.MAX_RETRIES - 1:
-                    time.sleep(2)
-            except json.JSONDecodeError as e:
-                Local.logger("zhongqingkandian", "recommend").info(
-                    f"解析 {full_url} 的响应数据失败(第 {retry + 1} 次重试): {e}")
-
-                # print(f"解析 {full_url} 的响应数据失败(第 {retry + 1} 次重试): {e}")
-                if retry < self.MAX_RETRIES - 1:
-                    time.sleep(2)
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"请求 {path} 失败,错误信息: {str(e)}",
+                    data={"path": path}
+                )
+                time.sleep(5)
         return None
 
     def is_response_valid(self, resp):

+ 61 - 61
spider/crawler_online/zhongqingkandian.py

@@ -1,13 +1,14 @@
 import os
 import sys
-import asyncio
 import json
 import random
 import uuid
 import time
 import traceback
 from datetime import datetime
-import aiohttp
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
 
 sys.path.append(os.getcwd())
 from application.common.feishu import FsData
@@ -51,35 +52,36 @@ class ZhongQingKanDian:
         self.expire_flag = False
         self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
         self.db_ops = DatabaseOperations(mode=mode, platform=platform)
-        self.redis_ops = RedisOperations()
+        self.redis_ops = RedisOperations(mode=mode, platform=platform)
         data_rule = FsData()
         self.title_rule = data_rule.get_title_rule()
         self.LocalLog = Local.logger(self.platform, self.mode)
+        self.session = requests.session()
 
-    async def send_request(self, path, data):
+    def send_request(self, path, data):
         """
-        步发送 POST 请求到指定路径,带有重试机制。
+        步发送 POST 请求到指定路径,带有重试机制。
         :param path: 请求的 API 路径
         :param data: 请求的数据
         :return: 响应的 JSON 数据,如果请求失败则返回 None
         """
         full_url = f"{self.API_BASE_URL}{path}"
-        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
-            for retry in range(self.MAX_RETRIES):
-                try:
-                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
-                        response.raise_for_status()
-                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
-                        return await response.json()
-                except (aiohttp.ClientError, json.JSONDecodeError) as e:
-                    tb_info = traceback.format_exc()
-                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                    self.aliyun_log.logging(
-                        code="3000",
-                        message=f"请求 {path} 失败,错误信息: {str(e)}",
-                        data={"path": path}
-                    )
-                    await asyncio.sleep(random.randint(5, 10))
+
+        for retry in range(self.MAX_RETRIES):
+            try:
+                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
+                response.raise_for_status()
+                self.LocalLog.info(f"{path}响应数据:{response.json()}")
+                return response.json()
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"请求 {path} 失败,错误信息: {str(e)}",
+                    data={"path": path}
+                )
+                time.sleep(5)
         return None
 
     def is_response_valid(self, resp, url):
@@ -108,17 +110,16 @@ class ZhongQingKanDian:
             self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
             return None
 
-    async def req_recommend_list(self):
+    def req_recommend_list(self):
         """
-        步请求推荐视频列表。
+        步请求推荐视频列表。
         :return: 推荐视频列表的有效响应数据,如果请求失败则返回 None
         """
         try:
-
             url = '/crawler/zhong_qing_kan_dian/recommend'
             body = json.dumps({"cursor": ""})
             self.LocalLog.info(f"开始请求推荐{body}")
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             return self.is_response_valid(resp, url)
         except Exception as e:
             tb_info = traceback.format_exc()
@@ -130,11 +131,9 @@ class ZhongQingKanDian:
             self.LocalLog.info(f"请求推荐视频列表 {url} 时发生异常:{str(e)}   \n{tb_info}")
             return None
 
-
-
-    async def req_detail(self, content_link, **kwargs):
+    def req_detail(self, content_link, **kwargs):
         """
-        步请求视频详情。
+        同步请求视频详情。
         :param content_link: 视频内容链接
         :param label: 视频标签(如 "recommend" 或 "related")
         :param kwargs: 额外的视频信息
@@ -146,22 +145,21 @@ class ZhongQingKanDian:
             body = json.dumps({
                 "content_link": content_link
             })
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             if not self.is_response_valid(resp, url):
                 return
             data = resp.get("data", {}).get("data", {})
             if data.get("content_type") != "video":
                 self.aliyun_log.logging(
                     code="3003",
-                    message=f"跳过非视频内容",
+                    message=f"跳过非视频内容",
                     data={"content_link": content_link}
                 )
                 self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
                 return
             self.LocalLog.info(f"{content_link} 是视频")
             data.update(kwargs)
-            await self.process_video_obj(data)
-            await asyncio.sleep(10)
+            self.process_video_obj(data)
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -171,7 +169,7 @@ class ZhongQingKanDian:
             )
             self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
 
-    async def control_request_recommend(self):
+    def control_request_recommend(self):
         """
         控制推荐视频列表的请求和处理流程。
         :return: 无返回值,根据下载数量限制控制流程
@@ -179,12 +177,17 @@ class ZhongQingKanDian:
         while self.limit_flag:
             try:
                 self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬推荐 {self.download_cnt} 个视频")
-                recommend_resp = await self.req_recommend_list()
+
+                recommend_resp = self.req_recommend_list()
                 if not recommend_resp:
+                    time.sleep(random.randint(5, 10))
                     continue
                 recommend_list = recommend_resp.get("data", {}).get("data", [])
                 self.LocalLog.info(f"获取的推荐列表长度:{len(recommend_list)}")
                 for video_obj in recommend_list:
+                    # if not self.limit_flag:
+                    #     self.LocalLog.info(f"今日视频数量已达最大量{self.download_cnt}")
+                    #     return
                     content_link = video_obj.get("share_url")
                     content_id = video_obj.get("id")
                     self.LocalLog.info(f"content_link == {content_link} \n content_id == {content_id}")
@@ -192,7 +195,8 @@ class ZhongQingKanDian:
                         continue
                     # 当前内容id保存到redis
                     self.redis_ops.save_recommend_video(content_id)
-                    await self.req_detail(content_link, **video_obj)
+                    time.sleep(random.randint(5, 10))
+                    self.req_detail(content_link, **video_obj)
             except Exception as e:
                 tb_info = traceback.format_exc()
                 self.aliyun_log.logging(
@@ -203,14 +207,13 @@ class ZhongQingKanDian:
                 self.LocalLog.info(f"控制推荐视频请求和处理时发生异常:\n{tb_info}")
         self.LocalLog.info(f"循环结束,当前 limit_flag 值为: {self.limit_flag}")
 
-    async def process_video_obj(self, video_obj):
+    def process_video_obj(self, video_obj):
         """
         处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
         :param video_obj: 视频对象,包含视频的各种信息
         :return: 无返回值,完成视频对象的处理
         """
         try:
-
             video_duration = video_obj["video_url_list"][0]['video_duration']
             video_id = video_obj['channel_content_id']
             # 检查视频ID是否存在
@@ -240,9 +243,7 @@ class ZhongQingKanDian:
                 self.db_ops.insert_user(account_id, account_name, account_avatar)
                 self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
                 self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
-            if video_duration > self.rule_dict.get("duration", {}).get("max",
-                                                                       1200) or video_duration < self.rule_dict.get(
-                    "duration", {}).get("min", 30):
+            if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30):
                 self.aliyun_log.logging(
                     code="3005",
                     message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
@@ -253,7 +254,7 @@ class ZhongQingKanDian:
 
             item.add_video_info("video_id", video_obj['channel_content_id'])
             item.add_video_info("video_title", video_obj["title"])
-            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+            item.add_video_info("play_cnt",  self.convert_number(video_obj["read_num"]))
             item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
             item.add_video_info("out_user_id", video_obj["channel_account_id"])
             item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
@@ -277,7 +278,7 @@ class ZhongQingKanDian:
                 rule_dict=self.rule_dict,
                 env=self.env,
                 item=mq_obj,
-                trace_id=trace_id
+                trace_id=traceback.format_exc()
             )
             if pipeline.process_item():
                 title_list = self.title_rule.split(",")
@@ -312,6 +313,7 @@ class ZhongQingKanDian:
                 # 保存视频ID
                 self.redis_ops.save_video_id(video_obj['channel_content_id'])
                 if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.LocalLog.info("当日视频已达到最大爬取量")
                     self.limit_flag = False
         except Exception as e:
             tb_info = traceback.format_exc()
@@ -322,31 +324,29 @@ class ZhongQingKanDian:
             )
             self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
 
-    async def run(self):
-        """
-        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
 
+    def convert_number(self, s):
+        if not isinstance(s, str):
+            return s
+        try:
+            return float(s.strip('万')) * 10000 if '万' in s else int(s)
+        except ValueError:
+            self.LocalLog.info(f"无法将 '{s}' 转换为有效的数字。")
+
+
+    def run(self):
+        """
+        运行主流程,执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
         :return: 无返回值,程序运行的主逻辑
         """
         self.LocalLog.info("开始执行中青看点推荐抓取...")
-        await asyncio.gather(
-            self.control_request_recommend()
-        )
+        self.control_request_recommend()
 
 
 if __name__ == '__main__':
-    asyncio.run(ZhongQingKanDian(
+    ZhongQingKanDian(
         platform="zhongqingkandian",
         mode="recommend",
-        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
+        rule_dict={'videos_cnt': {'min': 2, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
         user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
-    ).run())
-    # content_link = "https://vol.youth.cn/4X32ftEV6SsA9Mq9?signature=6y30XlmbkL9oxwAjJd1PXOBX0idx0ZD1gMQE2nZKW8RNpvPrqz"
-    # asyncio.run(ZhongQingKanDian(
-    #     platform="zhongqingkandian",
-    #     mode="recommend",
-    #     rule_dict={
-    #         {"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}
-    #     },
-    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
-    # ).req_detail(content_link,"测试"))
+    ).run()

+ 58 - 75
spider/crawler_online/zhongqingkandian_related_recommend.py

@@ -1,13 +1,14 @@
 import os
 import sys
-import asyncio
 import json
 import random
 import uuid
 import time
 import traceback
 from datetime import datetime
-import aiohttp
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
 
 sys.path.append(os.getcwd())
 from application.common.feishu import FsData
@@ -51,36 +52,36 @@ class ZhongQingKanDianRelated:
         self.expire_flag = False
         self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
         self.db_ops = DatabaseOperations(mode=mode, platform=platform)
-        self.redis_ops = RedisOperations()
+        self.redis_ops = RedisOperations(mode=mode, platform=platform)
         data_rule = FsData()
         self.title_rule = data_rule.get_title_rule()
         self.LocalLog = Local.logger(self.platform, self.mode)
+        self.session = requests.session()
 
-
-    async def send_request(self, path, data):
+    def send_request(self, path, data):
         """
-        步发送 POST 请求到指定路径,带有重试机制。
+        步发送 POST 请求到指定路径,带有重试机制。
         :param path: 请求的 API 路径
         :param data: 请求的数据
         :return: 响应的 JSON 数据,如果请求失败则返回 None
         """
         full_url = f"{self.API_BASE_URL}{path}"
-        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
-            for retry in range(self.MAX_RETRIES):
-                try:
-                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
-                        response.raise_for_status()
-                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
-                        return await response.json()
-                except (aiohttp.ClientError, json.JSONDecodeError) as e:
-                    tb_info = traceback.format_exc()
-                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                    self.aliyun_log.logging(
-                        code="3000",
-                        message=f"请求 {path} 失败,错误信息: {str(e)}",
-                        data={"path": path}
-                    )
-                    await asyncio.sleep(5)
+
+        for retry in range(self.MAX_RETRIES):
+            try:
+                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
+                response.raise_for_status()
+                self.LocalLog.info(f"{path}响应数据:{response.json()}")
+                return response.json()
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"请求 {path} 失败,错误信息: {str(e)}",
+                    data={"path": path}
+                )
+                time.sleep(5)
         return None
 
     def is_response_valid(self, resp, url):
@@ -109,21 +110,20 @@ class ZhongQingKanDianRelated:
             self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
             return None
 
-    async def req_related_recommend_list(self, content_id):
+    def req_related_recommend_list(self, content_id):
         """
-        步请求与指定内容 ID 相关的推荐列表。
+        步请求与指定内容 ID 相关的推荐列表。
         :param
         :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
         """
         try:
-
             url = '/crawler/zhong_qing_kan_dian/related'
             body = json.dumps({
                 "content_id": f"{content_id}",
                 "cursor": ""
             })
             self.LocalLog.info(f"开始请求相关推荐{body}")
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             return self.is_response_valid(resp, url)
         except Exception as e:
             tb_info = traceback.format_exc()
@@ -135,9 +135,9 @@ class ZhongQingKanDianRelated:
             self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e}   \n{tb_info}")
             return None
 
-    async def req_detail(self, content_link, **kwargs):
+    def req_detail(self, content_link, **kwargs):
         """
-        步请求视频详情。
+        步请求视频详情。
         :param content_link: 视频内容链接
         :param kwargs: 额外的视频信息
         :return: 无返回值,处理视频详情信息
@@ -148,7 +148,7 @@ class ZhongQingKanDianRelated:
             body = json.dumps({
                 "content_link": content_link
             })
-            resp = await self.send_request(url, body)
+            resp = self.send_request(url, body)
             if not self.is_response_valid(resp, url):
                 return
             data = resp.get("data", {}).get("data", {})
@@ -162,8 +162,7 @@ class ZhongQingKanDianRelated:
                 return
             self.LocalLog.info(f"{content_link} 是视频")
             data.update(kwargs)
-            await self.process_video_obj(data)
-            await asyncio.sleep(10)
+            self.process_video_obj(data)
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -173,7 +172,7 @@ class ZhongQingKanDianRelated:
             )
             self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
 
-    async def control_request_related(self):
+    def control_request_related(self):
         """
         控制相关推荐视频列表的请求和处理流程。
         :return: 无返回值,根据下载数量限制控制流程
@@ -181,20 +180,26 @@ class ZhongQingKanDianRelated:
         while self.limit_flag:
             try:
                 self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
+
                 content_id = self.redis_ops.get_recommend_video()
                 if not content_id:
                     self.LocalLog.info("缓存中【task:zqkd_video_id】没有数据")
-                    await asyncio.sleep(10)
                     continue
-                related_resp = await self.req_related_recommend_list(content_id)
-                self.LocalLog.info(f"获取的推荐列表长度:{len(related_resp)}")
+                time.sleep(random.randint(5, 10))
+                related_resp = self.req_related_recommend_list(content_id)
                 if not related_resp:
                     continue
                 related_list = related_resp.get("data", {}).get("data", [])
+                self.LocalLog.info(f"获取的推荐列表长度:{len(related_list)}")
                 for related_obj in related_list:
-                    related_content_link = related_obj.get("share_url")
+                    # if not self.limit_flag:
+                    #     self.LocalLog.info(f"今日视频数量已达最大量{self.download_cnt}")
+                    #     return
+                    related_content_link = related_obj.get("share_info", {}).get("share_url")
+                    self.LocalLog.info(f"related_content_link == {related_content_link}")
                     if related_content_link:
-                        await self.req_detail(related_content_link, **related_obj)
+                        time.sleep(random.randint(5, 10))
+                        self.req_detail(related_content_link, **related_obj)
             except Exception as e:
                 tb_info = traceback.format_exc()
                 self.aliyun_log.logging(
@@ -204,7 +209,7 @@ class ZhongQingKanDianRelated:
                 )
                 self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
 
-    async def process_video_obj(self, video_obj):
+    def process_video_obj(self, video_obj):
         """
         处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
         :param video_obj: 视频对象,包含视频的各种信息
@@ -243,9 +248,7 @@ class ZhongQingKanDianRelated:
                 self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
                 self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
 
-            if video_duration > self.rule_dict.get("duration", {}).get("max",
-                                                                       1200) or video_duration < self.rule_dict.get(
-                "duration", {}).get("min", 30):
+            if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30):
                 self.aliyun_log.logging(
                     code="3005",
                     message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
@@ -256,14 +259,15 @@ class ZhongQingKanDianRelated:
 
             item.add_video_info("video_id", video_obj['channel_content_id'])
             item.add_video_info("video_title", video_obj["title"])
-            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+
+            item.add_video_info("play_cnt", int(video_obj["read_count"]))
             item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
             item.add_video_info("out_user_id", video_obj["channel_account_id"])
             item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
             item.add_video_info("like_cnt", 0)
-            item.add_video_info("collection_cnt", int(video_obj['collect_num']))
-            item.add_video_info("share_cnt", int(video_obj["share_num"]))
-            item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+            item.add_video_info("collection_cnt", 0)
+            item.add_video_info("share_cnt", int(video_obj["share_count"]))
+            item.add_video_info("comment_cnt", int(video_obj["comment_count"]))
             item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
             item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
             item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
@@ -280,7 +284,7 @@ class ZhongQingKanDianRelated:
                 rule_dict=self.rule_dict,
                 env=self.env,
                 item=mq_obj,
-                trace_id=trace_id
+                trace_id=traceback.format_exc()
             )
             if pipeline.process_item():
                 title_list = self.title_rule.split(",")
@@ -325,42 +329,21 @@ class ZhongQingKanDianRelated:
             )
             self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
 
-    async def run(self):
-        """
-        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
-
-        :return: 无返回值,程序运行的主逻辑
-        """
-        self.LocalLog.info("开始执行中青看点推荐抓取...")
-        await asyncio.gather(
-            self.control_request_related()
-        )
 
-    async def run(self):
+    def run(self):
         """
-        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
+        运行主流程,执行相关推荐视频的请求,直到达到下载数量限制。
 
         :return: 无返回值,程序运行的主逻辑
         """
-        self.LocalLog.info("开始执行中青看点推荐抓取...")
-        await asyncio.gather(
-            self.control_request_related()
-        )
+        self.LocalLog.info("开始执行中青看点相关推荐抓取...")
+        self.control_request_related()
 
 
 if __name__ == '__main__':
-    asyncio.run(ZhongQingKanDianRelated(
+    ZhongQingKanDianRelated(
         platform="zhongqingkandian",
         mode="recommend",
-        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
-        user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
-    ).run())
-    # content_link = "https://vol.youth.cn/4X32ftEV6SsA9Mq9?signature=6y30XlmbkL9oxwAjJd1PXOBX0idx0ZD1gMQE2nZKW8RNpvPrqz"
-    # asyncio.run(ZhongQingKanDian(
-    #     platform="zhongqingkandian",
-    #     mode="recommend",
-    #     rule_dict={
-    #         {"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}
-    #     },
-    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
-    # ).req_detail(content_link,"测试"))
+        rule_dict={"videos_cnt": {"min": 3, "max": 0}},
+        user_list=[{"uid": 81525095, "link": "中青看点推荐", "nick_name": "善惡"}]
+    ).run()