Преглед изворни кода

1、小年糕和祝福圈子增加曝光

zhangliang пре 1 месец
родитељ
комит
da26fb7f33

+ 4 - 0
application/config/config.py

@@ -1,3 +1,7 @@
 # api 配置
 crawler_api_domain = 'http://8.217.192.46:8889'
 zhufuquanzi_view_api = crawler_api_domain + '/crawler/zhu_fu_quan_zi/detail_exposure'
+zhufuquanzi_history_api = crawler_api_domain + '/crawler/zhu_fu_quan_zi/detail_history'
+xiaoniangao_view_api = crawler_api_domain + '/crawler/zhu_fu_quan_zi/detail_exposure'
+xiaoniangao_history_api = crawler_api_domain + '/crawler/zhu_fu_quan_zi/detail_history'
+

+ 42 - 48
spider/crawler_author/zhongqingkandian_author.py

@@ -63,56 +63,50 @@ class ZhongQingKanDianAuthor:
         self.session = requests.session()
 
     def send_request(self, path, data):
-        """
-        同步发送 POST 请求到指定路径,带有重试机制。
-        :param path: 请求的 API 路径
-        :param data: 请求的数据
-        :return: 响应的 JSON 数据,如果请求失败则返回 None
-        """
-        full_url = f"{self.API_BASE_URL}{path}"
-
-        for retry in range(self.MAX_RETRIES):
+        """发送带重试机制的API请求"""
+        for attempt in range(self.MAX_RETRIES):
             try:
-                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
-                response.raise_for_status()
-                self.LocalLog.info(f"{path}响应数据:{response.json()}")
-                return response.json()
+                response = self.session.post(
+                    f"{self.API_BASE_URL}{path}",
+                    data=data,
+                    timeout=self.TIMEOUT,
+                    headers=self.COMMON_HEADERS
+                )
+                resp_data = response.json()
+                # 检查响应格式
+                if 'code' not in resp_data:
+                    self.LocalLog.warning(f"{path}响应缺少code字段,尝试重试")
+                    raise ValueError("Missing 'code' in response")
+
+                code = resp_data['code']
+
+                # 成功情况 (code=0)
+                if code == 0:
+                    self.LocalLog.info(f"{path}请求成功:{resp_data}")
+                    return resp_data
+
+                # 特定错误码不重试
+                if code == 29036:
+                    self.LocalLog.warning(f"{path}返回code:29036,消息:{resp_data}")
+                    return None
+
+                # 其他错误码重试
+                self.LocalLog.warning(f"{path}返回错误码{code},尝试重试,响应内容:{resp_data}")
+
             except Exception as e:
                 tb_info = traceback.format_exc()
-                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"请求 {path} 失败,错误信息: {str(e)}",
-                    data={"path": path}
-                )
-                time.sleep(5)
-        return None
+                self.LocalLog.error(f"{path}请求异常: {str(e)} \n {tb_info}")
 
-    def is_response_valid(self, resp, url):
-        """
-        检查响应是否有效(状态码为 0 表示有效)。
-        :param resp: 响应数据
-        :param url: 请求的 URL
-        :return: 如果响应有效则返回响应数据,否则返回 None
-        """
-        try:
-            if resp and resp.get('code') != 0:
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"抓取{url}失败,请求失败,响应:{resp}"
-                )
-                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
-                return None
-            return resp
-        except Exception as e:
-            tb_info = traceback.format_exc()
-            self.aliyun_log.logging(
-                code="3000",
-                message=f"检查响应有效性时出错,错误信息: {str(e)}",
-                data={"url": url, "resp": resp}
-            )
-            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
-            return None
+            time.sleep(random.randint(5, 10))
+
+        # 所有重试失败,记录错误并返回None
+        self.LocalLog.error(f"{path}达到最大重试次数")
+        self.aliyun_log.logging(
+            code="3000",
+            message=f"请求 {path} 失败,达到最大重试次数",
+            data=data
+        )
+        return None
 
     def req_user_list(self, account_id):
         """
@@ -129,7 +123,7 @@ class ZhongQingKanDianAuthor:
             })
             self.LocalLog.info(f"开始请求用户视频列表{body}")
             resp = self.send_request(url, body)
-            return self.is_response_valid(resp, url)
+            return resp
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -154,7 +148,7 @@ class ZhongQingKanDianAuthor:
                 "content_link": content_link
             })
             resp = self.send_request(url, body)
-            if not self.is_response_valid(resp, url):
+            if not resp:
                 return
             data = resp.get("data", {}).get("data", {})
             if data.get("content_type") != "video":

+ 84 - 1
spider/crawler_online/xiaoniangaotuijianliu.py

@@ -14,7 +14,7 @@ from application.common.feishu.feishu_utils import FeishuUtils
 from application.common.gpt import GPT4oMini
 from application.common.mysql.sql import Sql
 from application.common.redis.xng_redis import xng_in_video_data
-
+from application.config.config import xiaoniangao_view_api,xiaoniangao_history_api
 sys.path.append(os.getcwd())
 
 from application.items import VideoItem
@@ -25,6 +25,80 @@ from application.common.mysql import MysqlHelper
 
 
 
+video_view_count = 0
+video_view_lists = []
+def video_view(content_id, account_id):
+    global video_view_count
+    headers = {
+        "Content-Type": "application/json"
+    }
+    payload = {
+        "content_id": str(content_id),
+        "account_id": str(account_id)
+    }
+    try:
+        # 发送 POST 请求
+        response = requests.post(
+            xiaoniangao_view_api,
+            headers=headers,
+            json=payload  # 自动将字典转换为 JSON
+        )
+        # 检查 HTTP 状态码
+        if response.status_code == 200:
+            # 解析 JSON 响应
+            result = response.json()
+            # 提取关键字段
+            code = result.get("code")
+            msg = result.get("msg")
+            # 业务逻辑处理(示例)
+            if code == 0:
+                print("请求成功")
+                video_view_count += 1
+                video_history(content_id)
+            else:
+                print(f"请求失败,错误码: {code}, 消息: {msg}")
+        else:
+            print(f"HTTP 请求失败,状态码: {response.status_code}")
+    except requests.exceptions.RequestException as e:
+        print(f"请求异常: {e}")
+    except json.JSONDecodeError:
+        print("响应不是有效的 JSON 格式")
+
+def video_history():
+    headers = {
+        "Content-Type": "application/json"
+    }
+    payload = {
+        "content_id": video_view_lists
+    }
+    try:
+        # 发送 POST 请求
+        response = requests.post(
+            xiaoniangao_history_api,
+            headers=headers,
+            json=payload  # 自动将字典转换为 JSON
+        )
+        # 检查 HTTP 状态码
+        if response.status_code == 200:
+            # 解析 JSON 响应
+            result = response.json()
+            # 提取关键字段
+            code = result.get("code")
+            msg = result.get("msg")
+            # 业务逻辑处理(示例)
+            if code == 0:
+                video_view_lists.clear()
+                print("请求成功")
+            else:
+                print(f"请求失败,错误码: {code}, 消息: {msg}")
+        else:
+            print(f"HTTP 请求失败,状态码: {response.status_code}")
+    except requests.exceptions.RequestException as e:
+        print(f"请求异常: {e}")
+    except json.JSONDecodeError:
+        print("响应不是有效的 JSON 格式")
+
+
 class XNGTJLRecommend(object):
 
     """
@@ -102,6 +176,9 @@ class XNGTJLRecommend(object):
         trace_id = self.platform + str(uuid.uuid1())
         our_user = random.choice(self.user_list)
         item = VideoItem()
+        vid = video_obj['id']
+        mid = int(video_obj['user']['mid'])
+        print(f"vid={vid},mid={mid}")
         try:
             mid = int(video_obj['user']['mid'])
             print(f"id:{mid}")
@@ -173,10 +250,16 @@ class XNGTJLRecommend(object):
                     FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "A2:Z2", values)
             self.download_cnt += 1
             self.mq.send_msg(mq_obj)
+            video_view(vid, mid)
+            video_view_count += 1
+            video_view_lists.append(str(vid))
+            if video_view_count % 4 == 0:
+                video_history()
             self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
             if self.download_cnt >= int(
                     self.rule_dict.get("videos_cnt", {}).get("min", 200)
             ):
+                video_history()
                 self.limit_flag = True
 
     """

+ 45 - 51
spider/crawler_online/zhongqingkandian.py

@@ -7,8 +7,7 @@ import time
 import traceback
 from datetime import datetime
 import requests
-from requests.adapters import HTTPAdapter
-from urllib3.util.retry import Retry
+
 
 sys.path.append(os.getcwd())
 from application.common.feishu import FsData
@@ -59,56 +58,51 @@ class ZhongQingKanDianRecommend:
         self.session = requests.session()
 
     def send_request(self, path, data):
-        """
-        同步发送 POST 请求到指定路径,带有重试机制。
-        :param path: 请求的 API 路径
-        :param data: 请求的数据
-        :return: 响应的 JSON 数据,如果请求失败则返回 None
-        """
-        full_url = f"{self.API_BASE_URL}{path}"
-
-        for retry in range(self.MAX_RETRIES):
+        """发送带重试机制的API请求"""
+        for attempt in range(self.MAX_RETRIES):
             try:
-                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
-                response.raise_for_status()
-                self.LocalLog.info(f"{path}响应数据:{response.json()}")
-                return response.json()
+                response = self.session.post(
+                    f"{self.API_BASE_URL}{path}",
+                    data=data,
+                    timeout=self.TIMEOUT,
+                    headers=self.COMMON_HEADERS
+                )
+                resp_data = response.json()
+                # 检查响应格式
+                if 'code' not in resp_data:
+                    self.LocalLog.warning(f"{path}响应缺少code字段,尝试重试")
+                    raise ValueError("Missing 'code' in response")
+
+                code = resp_data['code']
+
+                # 成功情况 (code=0)
+                if code == 0:
+                    self.LocalLog.info(f"{path}请求成功:{resp_data}")
+                    return resp_data
+
+                # 特定错误码不重试
+                if code == 29036:
+                    self.LocalLog.warning(f"{path}返回code:29036,消息:{resp_data}")
+                    return None
+
+                # 其他错误码重试
+                self.LocalLog.warning(f"{path}返回错误码{code},尝试重试,响应内容:{resp_data}")
+
             except Exception as e:
                 tb_info = traceback.format_exc()
-                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"请求 {path} 失败,错误信息: {str(e)}",
-                    data={"path": path}
-                )
-                time.sleep(5)
+                self.LocalLog.error(f"{path}请求异常: {str(e)} \n {tb_info}")
+
+            time.sleep(random.randint(5, 10))
+
+        # 所有重试失败,记录错误并返回None
+        self.LocalLog.error(f"{path}达到最大重试次数")
+        self.aliyun_log.logging(
+            code="3000",
+            message=f"请求 {path} 失败,达到最大重试次数",
+            data=data
+        )
         return None
 
-    def is_response_valid(self, resp, url):
-        """
-        检查响应是否有效(状态码为 0 表示有效)。
-        :param resp: 响应数据
-        :param url: 请求的 URL
-        :return: 如果响应有效则返回响应数据,否则返回 None
-        """
-        try:
-            if resp and resp.get('code') != 0:
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"抓取{url}失败,请求失败,响应:{resp}"
-                )
-                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
-                return None
-            return resp
-        except Exception as e:
-            tb_info = traceback.format_exc()
-            self.aliyun_log.logging(
-                code="3000",
-                message=f"检查响应有效性时出错,错误信息: {str(e)}",
-                data={"url": url, "resp": resp}
-            )
-            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
-            return None
 
     def req_recommend_list(self):
         """
@@ -120,7 +114,7 @@ class ZhongQingKanDianRecommend:
             body = json.dumps({"cursor": ""})
             self.LocalLog.info(f"开始请求推荐{body}")
             resp = self.send_request(url, body)
-            return self.is_response_valid(resp, url)
+            return resp
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -146,7 +140,7 @@ class ZhongQingKanDianRecommend:
                 "content_link": content_link
             })
             resp = self.send_request(url, body)
-            if not self.is_response_valid(resp, url):
+            if not resp:
                 return
             data = resp.get("data", {}).get("data", {})
             if data.get("content_type") != "video":
@@ -278,7 +272,7 @@ class ZhongQingKanDianRecommend:
                 rule_dict=self.rule_dict,
                 env=self.env,
                 item=mq_obj,
-                trace_id=traceback.format_exc()
+                trace_id=trace_id
             )
             if pipeline.process_item():
                 title_list = self.title_rule.split(",")
@@ -352,6 +346,6 @@ if __name__ == '__main__':
     ZhongQingKanDianRecommend(
         platform="zhongqingkandian",
         mode="recommend",
-        rule_dict={'videos_cnt': {'min': 2, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
+        rule_dict={'videos_cnt': {'min': 110, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
         user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
     ).run()

+ 50 - 66
spider/crawler_online/zhongqingkandian_related_recommend.py

@@ -58,57 +58,57 @@ class ZhongQingKanDianRelatedRecommend:
         self.LocalLog = Local.logger(self.platform, self.mode)
         self.session = requests.session()
 
-    def send_request(self, path, data):
-        """
-        同步发送 POST 请求到指定路径,带有重试机制。
-        :param path: 请求的 API 路径
-        :param data: 请求的数据
-        :return: 响应的 JSON 数据,如果请求失败则返回 None
-        """
-        full_url = f"{self.API_BASE_URL}{path}"
+    def __del__(self):
+        if self.session:
+            self.LocalLog.info("session 被正确关闭")
+            self.session.close()
 
-        for retry in range(self.MAX_RETRIES):
+    def send_request(self, path, data):
+        """发送带重试机制的API请求"""
+        for attempt in range(self.MAX_RETRIES):
             try:
-                response = self.session.post(full_url, data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS)
-                response.raise_for_status()
-                self.LocalLog.info(f"{path}响应数据:{response.json()}")
-                return response.json()
+                response = self.session.post(
+                    f"{self.API_BASE_URL}{path}",
+                    data=data,
+                    timeout=self.TIMEOUT,
+                    headers=self.COMMON_HEADERS
+                )
+                resp_data = response.json()
+                # 检查响应格式
+                if 'code' not in resp_data:
+                    self.LocalLog.warning(f"{path}响应缺少code字段,尝试重试")
+                    raise ValueError("Missing 'code' in response")
+
+                code = resp_data['code']
+
+                # 成功情况 (code=0)
+                if code == 0:
+                    self.LocalLog.info(f"{path}请求成功:{resp_data}")
+                    return resp_data
+
+                # 特定错误码不重试
+                if code == 29036:
+                    self.LocalLog.warning(f"{path}返回code:29036,消息:{resp_data}")
+                    return None
+
+                # 其他错误码重试
+                self.LocalLog.warning(f"{path}返回错误码{code},尝试重试,响应内容:{resp_data}")
+
             except Exception as e:
                 tb_info = traceback.format_exc()
-                self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"请求 {path} 失败,错误信息: {str(e)}",
-                    data={"path": path}
-                )
-                time.sleep(5)
+                self.LocalLog.error(f"{path}请求异常: {str(e)} \n {tb_info}")
+
+            time.sleep(random.randint(5,10))
+
+        # 所有重试失败,记录错误并返回None
+        self.LocalLog.error(f"{path}达到最大重试次数")
+        self.aliyun_log.logging(
+            code="3000",
+            message=f"请求 {path} 失败,达到最大重试次数",
+            data=data
+        )
         return None
 
-    def is_response_valid(self, resp, url):
-        """
-        检查响应是否有效(状态码为 0 表示有效)。
-        :param resp: 响应数据
-        :param url: 请求的 URL
-        :return: 如果响应有效则返回响应数据,否则返回 None
-        """
-        try:
-            if resp and resp.get('code') != 0:
-                self.aliyun_log.logging(
-                    code="3000",
-                    message=f"抓取{url}失败,请求失败,响应:{resp}"
-                )
-                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
-                return None
-            return resp
-        except Exception as e:
-            tb_info = traceback.format_exc()
-            self.aliyun_log.logging(
-                code="3000",
-                message=f"检查响应有效性时出错,错误信息: {str(e)}",
-                data={"url": url, "resp": resp}
-            )
-            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
-            return None
 
     def req_related_recommend_list(self, content_id):
         """
@@ -124,8 +124,7 @@ class ZhongQingKanDianRelatedRecommend:
                 "cursor": ""
             })
             self.LocalLog.info(f"开始请求相关推荐{body}")
-            resp = self.send_request(url, body)
-            return self.is_response_valid(resp, url)
+            return self.send_request(url, body)
         except Exception as e:
             tb_info = traceback.format_exc()
             self.aliyun_log.logging(
@@ -150,7 +149,7 @@ class ZhongQingKanDianRelatedRecommend:
                 "content_link": content_link
             })
             resp = self.send_request(url, body)
-            if not self.is_response_valid(resp, url):
+            if not resp:
                 return
             data = resp.get("data", {}).get("data", {})
             if data.get("content_type") != "video":
@@ -244,21 +243,6 @@ class ZhongQingKanDianRelatedRecommend:
             account_avatar = video_obj["avatar"]
             self.db_ops.insert_user(account_id, account_name, account_avatar)
             self.LocalLog.info(f"用户{account_id}存入数据库")
-            # 检查用户ID是否存在
-            # """
-            # 需要改为判断redis
-            # """
-            # is_repeat_user = self.db_ops.check_user_id(account_id)
-            # if is_repeat_user:
-            #     # 更新用户信息,使用异步方法并等待结果
-            #     self.LocalLog.info(f"用户{account_id}已经存在数据库中")
-            #     self.db_ops.update_user(account_id, account_name, account_avatar)
-            # else:
-            #     self.LocalLog.info(f"用户{account_id}没在数据库中")
-            #     # 插入用户信息,使用异步方法并等待结果
-            #     self.db_ops.insert_user(account_id, account_name, account_avatar)
-            #     self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
-            #     self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
 
             if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30):
                 self.aliyun_log.logging(
@@ -296,7 +280,7 @@ class ZhongQingKanDianRelatedRecommend:
                 rule_dict=self.rule_dict,
                 env=self.env,
                 item=mq_obj,
-                trace_id=traceback.format_exc()
+                trace_id=trace_id
             )
             if pipeline.process_item():
                 title_list = self.title_rule.split(",")
@@ -355,7 +339,7 @@ class ZhongQingKanDianRelatedRecommend:
 if __name__ == '__main__':
     ZhongQingKanDianRelatedRecommend(
         platform="zhongqingkandianrelated",
-        mode="related_recommend",
-        rule_dict={"videos_cnt": {"min": 8, "max": 0}},
+        mode="related",
+        rule_dict={"videos_cnt": {"min": 200, "max": 0}},
         user_list=[{"uid": 81525095, "link": "中青看点推荐", "nick_name": "善惡"}]
     ).run()

+ 49 - 3
spider/crawler_online/zhufuquanzituijianliu.py

@@ -14,7 +14,7 @@ from application.common.feishu.feishu_utils import FeishuUtils
 from application.common.gpt import GPT4oMini
 from application.common.mysql.sql import Sql
 from application.common.redis.xng_redis import xng_in_video_data
-from application.config.config import zhufuquanzi_view_api
+from application.config.config import zhufuquanzi_view_api,zhufuquanzi_history_api
 
 sys.path.append(os.getcwd())
 
@@ -24,8 +24,10 @@ from application.common.messageQueue import MQ
 from application.common.log import AliyunLogger
 from application.common.mysql import MysqlHelper
 
-
+video_view_count = 0
+video_view_lists = []
 def video_view(content_id, account_id):
+    global video_view_count
     headers = {
         "Content-Type": "application/json"
     }
@@ -50,6 +52,42 @@ def video_view(content_id, account_id):
             # 业务逻辑处理(示例)
             if code == 0:
                 print("请求成功")
+                video_view_count += 1
+                video_history(content_id)
+            else:
+                print(f"请求失败,错误码: {code}, 消息: {msg}")
+        else:
+            print(f"HTTP 请求失败,状态码: {response.status_code}")
+    except requests.exceptions.RequestException as e:
+        print(f"请求异常: {e}")
+    except json.JSONDecodeError:
+        print("响应不是有效的 JSON 格式")
+
+def video_history():
+    headers = {
+        "Content-Type": "application/json"
+    }
+    payload = {
+        "content_id": video_view_lists
+    }
+    try:
+        # 发送 POST 请求
+        response = requests.post(
+            zhufuquanzi_history_api,
+            headers=headers,
+            json=payload  # 自动将字典转换为 JSON
+        )
+        # 检查 HTTP 状态码
+        if response.status_code == 200:
+            # 解析 JSON 响应
+            result = response.json()
+            # 提取关键字段
+            code = result.get("code")
+            msg = result.get("msg")
+            # 业务逻辑处理(示例)
+            if code == 0:
+                video_view_lists.clear()
+                print("请求成功")
             else:
                 print(f"请求失败,错误码: {code}, 消息: {msg}")
         else:
@@ -60,6 +98,10 @@ def video_view(content_id, account_id):
         print("响应不是有效的 JSON 格式")
 
 
+
+
+
+
 class ZFQZTJLRecommend(object):
     """
     祝福圈子推荐流
@@ -213,10 +255,15 @@ class ZFQZTJLRecommend(object):
             self.mq.send_msg(mq_obj)
             self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
             video_view(vid, mid)
+            video_view_count += 1
+            video_view_lists.append(str(vid))
+            if video_view_count % 4 == 0:
+                video_history()
             self.aliyun_log.logging(code="1010", message="触发曝光", data=mq_obj)
             if self.download_cnt >= int(
                     self.rule_dict.get("videos_cnt", {}).get("min", 200)
             ):
+                video_history()
                 self.limit_flag = True
 
     def run(self):
@@ -229,7 +276,6 @@ if __name__ == '__main__':
         mode="recommend",
         rule_dict={},
         user_list=[{"uid": 75590470, "link": "zfqz推荐流_接口1", "nick_name": "做你的尾巴"}, {"uid": 75590471, "link": "zfqz推荐流_接口2", "nick_name": "能够相遇"}, {"uid": 75590472, "link": "zfqz推荐流_接口3", "nick_name": "一别两宽各生欢喜"}, {"uid": 75590473, "link": "zfqz推荐流_接口4", "nick_name": "惹火"}, {"uid": 75590475, "link": "zfqz推荐流_接口5", "nick_name": "顾九"}, {"uid": 75590476, "link": "zfqz推荐流_接口6", "nick_name": "宠一身脾气惯一身毛病"}],
-
     )
     J.get_recommend_list()
     # J.logic()