瀏覽代碼

中青看点

zhangliang 5 天之前
父節點
當前提交
77dc29dfb8

+ 7 - 0
application/common/mysql/mysql_helper.py

@@ -44,6 +44,12 @@ class MysqlHelper(object):
         data = cursor.fetchall()
         return data
 
+    def select_params(self, sql, params=None):
+        cursor = self.connection.cursor()
+        cursor.execute(sql, params or ())  # 支持参数化查询
+        data = cursor.fetchall()
+        return data
+
     def update(self, sql):
         """
         插入
@@ -66,6 +72,7 @@ class MysqlHelper(object):
         self.connection.close()
 
 
+
 class RedisHelper:
     @classmethod
     def connect_redis(cls, env):

+ 2 - 2
application/config/topic_group_queue.py

@@ -44,8 +44,8 @@ class TopicGroup(object):
             ('xlzf', 'recommend', 'xuanlanzhufu'),
             ('zzhxzfy', 'recommend', 'zhaozhaohuanxizhufuyu'),
             ('zqkd', 'recommend', 'zhongqingkandian'),
-            ('zqkd', 'related_recommend', 'zhongqingkandian'),
-            ('zqkd', 'author', 'zhongqingkandian')
+            ('zqkd', 'related_recommend', 'zhongqingkandianrelated'),
+            ('zqkd', 'author', 'zhongqingkandianauthor')
         ]
 
     def produce(self):

+ 40 - 16
application/functions/zqkd_db_redis.py

@@ -17,6 +17,8 @@ class DatabaseOperations:
     def __init__(self, mode, platform):
         self.mysql = MysqlHelper(mode=mode, platform=platform)
         self.LocalLog = Local.logger(platform, mode)
+        self.mode = mode
+        self.platform = platform
 
     def check_user_id(self, uid):
         """
@@ -26,7 +28,7 @@ class DatabaseOperations:
         :return:如果用户ID存在于表中返回True,否则返回False
         """
         try:
-            query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
+            query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """
             result = self.mysql.select(sql=query_sql)
             return bool(result)
         except Exception as e:
@@ -44,7 +46,7 @@ class DatabaseOperations:
         :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
         """
         try:
-            update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
+            update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
             return self.mysql.update(sql=update_sql)
         except Exception as e:
             tb = traceback.format_exc()
@@ -53,32 +55,55 @@ class DatabaseOperations:
 
     def insert_user(self, uid, user_name, avatar_url):
         """
-        向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
+        向数据库的zqkd_user表中插入或更新用户信息
 
-        :param uid:新用户的ID
-        :param user_name:新用户的用户名
-        :param avatar_url:新用户的头像URL
-        :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
+        :param uid: 用户ID(数值类型)
+        :param user_name: 用户名
+        :param avatar_url: 头像URL
+        :return: 成功返回影响的行数,失败返回None
         """
         try:
-            current_time = datetime.now()
-            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-            insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
+            # 直接拼接SQL(不推荐,有SQL注入风险)
+            insert_sql = f"""
+                INSERT INTO zqkd_user (uid, avatar_url, user_name) 
+                VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}') 
+                ON DUPLICATE KEY UPDATE 
+                user_name = '{user_name.replace("'", "''")}', 
+                avatar_url = '{avatar_url.replace("'", "''")}'
+            """
             return self.mysql.update(sql=insert_sql)
         except Exception as e:
             tb = traceback.format_exc()
             self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")
             return None
-
+    def get_today_videos(self):
+        try:
+            # 手动转义单引号(仅缓解部分风险)
+
+            sql = """
+                        SELECT count(*) as cnt
+                        FROM crawler_video 
+                        WHERE create_time >= CURDATE() 
+                          AND create_time < CURDATE() + INTERVAL 1 DAY 
+                          AND platform = %s 
+                          AND strategy = %s
+                    """
+            result = self.mysql.select_params(sql, (self.platform,self.mode))
+            if result and len(result) > 0:
+                return result[0][0]  # 返回第一行第一列的计数值
+            return 0  # 无结果时返回0
+        except Exception as e:
+            self.LocalLog.error(f"查询失败: {e}")
+            return 0
     def select_user(self, last_scanned_id=0):
         """
-        根据last_scanned_id分页查询用户数据
+        根据last_scanned_id查询用户数据
         :param last_scanned_id: 上次扫描的ID,0表示从头开始
         :return: 查询结果列表
         """
         try:
             # 构建查询(根据last_scanned_id过滤)
-            query = "SELECT id, uid FROM zqkd_uid"
+            query = "SELECT id, uid FROM zqkd_user"
             if last_scanned_id > 0:
                 query += f" WHERE id > {last_scanned_id}"
             query += " ORDER BY id ASC"
@@ -211,6 +236,5 @@ class RedisOperations:
 
 
 if __name__ == '__main__':
-    db = DatabaseOperations("12", "123")
-    user = db.select_user(10000)
-    print(user)
+    db = DatabaseOperations("recommend", "zhongqingkandian")
+    print(db.get_today_videos())

+ 13 - 4
spider/crawler_author/zhongqingkandian_author.py

@@ -186,6 +186,15 @@ class ZhongQingKanDianAuthor:
         """
         while self.limit_flag:
             try:
+                self.download_cnt = self.db_ops.get_today_videos()
+                if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.aliyun_log.logging(
+                        code="2010",
+                        message=f"今日已经达到最大量",
+                        data=self.download_cnt
+                    )
+                    self.LocalLog.info(f"当日视频已达到最大爬取量{self.download_cnt}")
+                    return
                 self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
                 if not self.zqkd_user_list:
                     self.LocalLog.info("没有用户数据")
@@ -197,7 +206,7 @@ class ZhongQingKanDianAuthor:
                     current_id, user_id = user_info
                     author_resp = self.req_user_list(user_id)
                     if current_id > self.last_scanned_id:
-                        self.last_scanned_id = current_id
+                        self.redis_ops.set_last_scanned_id(current_id)
                     if not author_resp:
                         continue
                     author_data = author_resp.get("data", {})
@@ -207,8 +216,8 @@ class ZhongQingKanDianAuthor:
                     video_data = author_data.get("data", [])
                     self.LocalLog.info(f"用户{user_id}第{self.curses}页数据长度{len(video_data)}")
                     for video_obj in video_data:
-                        if not self.limit_flag:
-                            return
+                        # if not self.limit_flag:
+                        #     return
                         video_content_link = video_obj.get("share_url")
                         if video_content_link:
                             self.req_detail(video_content_link, **video_obj)
@@ -330,7 +339,7 @@ class ZhongQingKanDianAuthor:
             if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300) and self.zqkd_user_list:
                 self.LocalLog.info("视频数量已达到预期")
                 # 判断视频数量达到预期且用户列表没有轮训完
-                self.redis_ops.set_last_scanned_id(self.last_scanned_id)
+                # self.redis_ops.set_last_scanned_id(self.last_scanned_id)
                 self.limit_flag = False
             elif not self.zqkd_user_list:
                 # 如果数据没达到预期数量,则重新开始扫用户数据,扫所有用户下一页的数据,直到数量达到预期

+ 20 - 15
spider/crawler_online/zhongqingkandian.py

@@ -22,7 +22,7 @@ from application.pipeline import PiaoQuanPipeline
 from application.common.log import Local
 
 
-class ZhongQingKanDian:
+class ZhongQingKanDianRecommend:
     API_BASE_URL = "http://8.217.192.46:8889"
     COMMON_HEADERS = {
         "Content-Type": "application/json"
@@ -176,8 +176,16 @@ class ZhongQingKanDian:
         """
         while self.limit_flag:
             try:
+                self.download_cnt = self.db_ops.get_today_videos()
+                if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.aliyun_log.logging(
+                        code="2010",
+                        message=f"今日已经达到最大量",
+                        data=self.download_cnt
+                    )
+                    self.LocalLog.info(f"当日视频已达到最大爬取量{self.download_cnt}")
+                    return
                 self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬推荐 {self.download_cnt} 个视频")
-
                 recommend_resp = self.req_recommend_list()
                 if not recommend_resp:
                     time.sleep(random.randint(5, 10))
@@ -231,18 +239,10 @@ class ZhongQingKanDian:
             account_id = video_obj["channel_account_id"]
             account_name = video_obj["channel_account_name"]
             account_avatar = video_obj["avatar"]
-            # 检查用户ID是否存在
-            is_repeat_user = self.db_ops.check_user_id(account_id)
-            if is_repeat_user:
-                # 更新用户信息,使用异步方法并等待结果
-                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
-                self.db_ops.update_user(account_id, account_name, account_avatar)
-            else:
-                self.LocalLog.info(f"用户{account_id}没在数据库中")
-                # 插入用户信息,使用异步方法并等待结果
-                self.db_ops.insert_user(account_id, account_name, account_avatar)
-                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
-                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+            # 插入用户信息,使用异步方法并等待结果
+            self.db_ops.insert_user(account_id, account_name, account_avatar)
+            self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+            self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
             if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30):
                 self.aliyun_log.logging(
                     code="3005",
@@ -313,6 +313,11 @@ class ZhongQingKanDian:
                 # 保存视频ID
                 self.redis_ops.save_video_id(video_obj['channel_content_id'])
                 if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.aliyun_log.logging(
+                        code="2010",
+                        message=f"今日已经达到最大量",
+                        data=self.download_cnt
+                    )
                     self.LocalLog.info("当日视频已达到最大爬取量")
                     self.limit_flag = False
         except Exception as e:
@@ -344,7 +349,7 @@ class ZhongQingKanDian:
 
 
 if __name__ == '__main__':
-    ZhongQingKanDian(
+    ZhongQingKanDianRecommend(
         platform="zhongqingkandian",
         mode="recommend",
         rule_dict={'videos_cnt': {'min': 2, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},

+ 31 - 19
spider/crawler_online/zhongqingkandian_related_recommend.py

@@ -22,7 +22,7 @@ from application.pipeline import PiaoQuanPipeline
 from application.common.log import Local
 
 
-class ZhongQingKanDianRelated:
+class ZhongQingKanDianRelatedRecommend:
     API_BASE_URL = "http://8.217.192.46:8889"
     COMMON_HEADERS = {
         "Content-Type": "application/json"
@@ -117,6 +117,7 @@ class ZhongQingKanDianRelated:
         :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
         """
         try:
+
             url = '/crawler/zhong_qing_kan_dian/related'
             body = json.dumps({
                 "content_id": f"{content_id}",
@@ -179,6 +180,15 @@ class ZhongQingKanDianRelated:
         """
         while self.limit_flag:
             try:
+                self.download_cnt = self.db_ops.get_today_videos()
+                if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.aliyun_log.logging(
+                        code="2010",
+                        message=f"今日已经达到最大量",
+                        data=self.download_cnt
+                    )
+                    self.LocalLog.info(f"当日视频已达到最大爬取量{self.download_cnt}")
+                    return
                 self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
 
                 content_id = self.redis_ops.get_recommend_video()
@@ -232,21 +242,23 @@ class ZhongQingKanDianRelated:
             account_id = video_obj["channel_account_id"]
             account_name = video_obj["channel_account_name"]
             account_avatar = video_obj["avatar"]
+            self.db_ops.insert_user(account_id, account_name, account_avatar)
+            self.LocalLog.info(f"用户{account_id}存入数据库")
             # 检查用户ID是否存在
-            """
-            需要改为判断redis
-            """
-            is_repeat_user = self.db_ops.check_user_id(account_id)
-            if is_repeat_user:
-                # 更新用户信息,使用异步方法并等待结果
-                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
-                self.db_ops.update_user(account_id, account_name, account_avatar)
-            else:
-                self.LocalLog.info(f"用户{account_id}没在数据库中")
-                # 插入用户信息,使用异步方法并等待结果
-                self.db_ops.insert_user(account_id, account_name, account_avatar)
-                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
-                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+            # """
+            # 需要改为判断redis
+            # """
+            # is_repeat_user = self.db_ops.check_user_id(account_id)
+            # if is_repeat_user:
+            #     # 更新用户信息,使用异步方法并等待结果
+            #     self.LocalLog.info(f"用户{account_id}已经存在数据库中")
+            #     self.db_ops.update_user(account_id, account_name, account_avatar)
+            # else:
+            #     self.LocalLog.info(f"用户{account_id}没在数据库中")
+            #     # 插入用户信息,使用异步方法并等待结果
+            #     self.db_ops.insert_user(account_id, account_name, account_avatar)
+            #     self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+            #     self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
 
             if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30):
                 self.aliyun_log.logging(
@@ -341,9 +353,9 @@ class ZhongQingKanDianRelated:
 
 
 if __name__ == '__main__':
-    ZhongQingKanDianRelated(
-        platform="zhongqingkandian",
-        mode="recommend",
-        rule_dict={"videos_cnt": {"min": 3, "max": 0}},
+    ZhongQingKanDianRelatedRecommend(
+        platform="zhongqingkandianrelated",
+        mode="related_recommend",
+        rule_dict={"videos_cnt": {"min": 8, "max": 0}},
         user_list=[{"uid": 81525095, "link": "中青看点推荐", "nick_name": "善惡"}]
     ).run()

+ 6 - 6
spider/spider_map.py

@@ -30,8 +30,8 @@ from spider.crawler_online.zhufuquanzituijianliu import ZFQZTJLRecommend
 from spider.crawler_online.chaojipiaoquan import CJPQRecommend
 from spider.crawler_online.xuanlanzhufu import XLZFRecommend
 from spider.crawler_online.zhaozhaohuanxizhufuyu import ZZHXZFYRecommend
-from spider.crawler_online.zhongqingkandian import ZhongQingKanDian
-from spider.crawler_online.zhongqingkandian_related_recommend import ZhongQingKanDianRelated
+from spider.crawler_online.zhongqingkandian import ZhongQingKanDianRecommend
+from spider.crawler_online.zhongqingkandian_related_recommend import ZhongQingKanDianRelatedRecommend
 from spider.crawler_author.zhongqingkandian_author import ZhongQingKanDianAuthor
 spider_map = {
     # 祝万物复苏
@@ -192,14 +192,14 @@ spider_map = {
     },
     # 中青看点推荐
     "zhongqingkandian": {
-        "recommend": ZhongQingKanDian
+        "recommend": ZhongQingKanDianRecommend
     },
     # 中青看点相关推荐
-    "zhongqingkandian":{
-        "related_recommend": ZhongQingKanDianRelated
+    "zhongqingkandianrelated":{
+        "related_recommend": ZhongQingKanDianRelatedRecommend
     },
     # 中青看点用户
-    "zhongqingkandian":{
+    "zhongqingkandianauthor":{
         "author": ZhongQingKanDianAuthor
     }
 }