浏览代码

add 快手

zhangyong 1 年之前
父节点
当前提交
bb9bbdbf38

+ 15 - 131
kuaishou/kuaishou_author/kuaishou_author_scheduling.py

@@ -2,10 +2,9 @@
 # @Author: wangkun
 # @Time: 2023/5/24
 import os
-import shutil
 import sys
 import time
-from hashlib import md5
+from datetime import date, timedelta
 import requests
 import json
 import urllib3
@@ -13,9 +12,7 @@ from requests.adapters import HTTPAdapter
 from common.mq import MQ
 sys.path.append(os.getcwd())
 from common.common import Common
-from common.feishu import Feishu
 from common.scheduling_db import MysqlHelper
-from common.publish import Publish
 from common.public import random_title, get_config_from_mysql, download_rule
 
 
@@ -75,8 +72,6 @@ class KuaishouauthorScheduling:
                     "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(config["update_time"] / 1000))),
                     "operator": config["operator"].strip()
                 }
-                # for k, v in cookie_dict.items():
-                #     print(f"{k}:{type(v)}, {v}")
                 return cookie_dict
 
     @classmethod
@@ -94,11 +89,12 @@ class KuaishouauthorScheduling:
                 },
                 "query": "fragment photoContent on PhotoEntity {\n  id\n  duration\n  caption\n  originCaption\n  likeCount\n  viewCount\n  commentCount\n  realLikeCount\n  coverUrl\n  photoUrl\n  photoH265Url\n  manifest\n  manifestH265\n  videoResource\n  coverUrls {\n    url\n    __typename\n  }\n  timestamp\n  expTag\n  animatedCoverUrl\n  distance\n  videoRatio\n  liked\n  stereoType\n  profileUserTopPhoto\n  musicBlocked\n  __typename\n}\n\nfragment feedContent on Feed {\n  type\n  author {\n    id\n    name\n    headerUrl\n    following\n    headerUrls {\n      url\n      __typename\n    }\n    __typename\n  }\n  photo {\n    ...photoContent\n    __typename\n  }\n  canAddComment\n  llsid\n  status\n  currentPcursor\n  tags {\n    type\n    name\n    __typename\n  }\n  __typename\n}\n\nquery visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n  visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n    result\n    llsid\n    webPageArea\n    feeds {\n      ...feedContent\n      __typename\n    }\n    hostName\n    pcursor\n    __typename\n  }\n}\n"
             })
+            cookie = cls.get_cookie(log_type, crawler, env)["cookie"]
             headers = {
                 'Accept': '*/*',
                 'Content-Type': 'application/json',
                 'Origin': 'https://www.kuaishou.com',
-                'Cookie': cls.get_cookie(log_type, crawler, env)["cookie"],
+                'Cookie': cookie,
                 'Content-Length': '1260',
                 'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
                 'Host': 'www.kuaishou.com',
@@ -140,7 +136,6 @@ class KuaishouauthorScheduling:
             for i in range(len(feeds)):
                 try:
                     if cls.download_cnt >= cls.videos_cnt(rule_dict):
-                    # if cls.download_cnt >= 2:
                         Common.logger(log_type, crawler).info(f"已下载视频数:{cls.download_cnt}\n")
                         Common.logging(log_type, crawler, env, f"已下载视频数:{cls.download_cnt}\n")
                         return
@@ -156,7 +151,17 @@ class KuaishouauthorScheduling:
                         video_height = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("height", 0)
                     publish_time_stamp = int(int(feeds[i].get('photo', {}).get('timestamp', 0)) / 1000)
                     publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
-
+                    date_three_days_ago_string = (date.today() + timedelta(days=-60)).strftime("%Y-%m-%d %H:%M:%S")
+                    rule = publish_time_str > date_three_days_ago_string
+                    if rule == False:
+                        Common.logger(log_type, crawler).info(f"发布时间小于60天,发布时间:{rule}\n")
+                        Common.logging(log_type, crawler, env, f"发布时间小于60天,发布时间:{rule}\n")
+                        continue
+                    realLikeCount = int(feeds[i].get('photo', {}).get('realLikeCount', 0))
+                    if realLikeCount < 10000:
+                        Common.logger(log_type, crawler).info(f"点赞量:{realLikeCount}\n")
+                        Common.logging(log_type, crawler, env, f"点赞量:{realLikeCount}\n")
+                        continue
                     video_dict = {'video_title': video_title,
                                   'video_id': video_id,
                                   'play_cnt': int(feeds[i].get('photo', {}).get('viewCount', 0)),
@@ -212,12 +217,7 @@ class KuaishouauthorScheduling:
                         video_dict["strategy_type"] = log_type
                         mq.send_msg(video_dict)
                         cls.download_cnt += 1
-                        # cls.download_publish(log_type=log_type,
-                        #                      crawler=crawler,
-                        #                      user_dict=user_dict,
-                        #                      video_dict=video_dict,
-                        #                      rule_dict=rule_dict,
-                        #                      env=env)
+
                 except Exception as e:
                     Common.logger(log_type, crawler).warning(f"抓取单条视频异常:{e}\n")
                     Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
@@ -229,123 +229,7 @@ class KuaishouauthorScheduling:
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 
-    @classmethod
-    def download_publish(cls, log_type, crawler, user_dict, rule_dict, video_dict, env):
-        # 下载视频
-        Common.download_method(log_type=log_type, crawler=crawler, text='video', title=video_dict['video_title'], url=video_dict['video_url'])
-        md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest()
-        try:
-            if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0:
-                # 删除视频文件夹
-                shutil.rmtree(f"./{crawler}/videos/{md_title}")
-                Common.logger(log_type, crawler).info("视频size=0,删除成功\n")
-                Common.logging(log_type, crawler, env, "视频size=0,删除成功\n")
-                return
-        except FileNotFoundError:
-            # 删除视频文件夹
-            shutil.rmtree(f"./{crawler}/videos/{md_title}")
-            Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n")
-            Common.logging(log_type, crawler, env, "视频文件不存在,删除文件夹成功\n")
-            return
-        # 下载封面
-        Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url'])
-        # 保存视频信息至txt
-        Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict)
-
-        # 上传视频
-        Common.logger(log_type, crawler).info("开始上传视频...")
-        Common.logging(log_type, crawler, env, "开始上传视频...")
-        if env == "dev":
-            oss_endpoint = "out"
-            our_video_id = Publish.upload_and_publish(log_type=log_type,
-                                                      crawler=crawler,
-                                                      strategy="定向抓取策略",
-                                                      our_uid=user_dict["uid"],
-                                                      env=env,
-                                                      oss_endpoint=oss_endpoint)
-            our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info"
-        else:
-            oss_endpoint = "inner"
-            our_video_id = Publish.upload_and_publish(log_type=log_type,
-                                                      crawler=crawler,
-                                                      strategy="定向抓取策略",
-                                                      our_uid=user_dict["uid"],
-                                                      env=env,
-                                                      oss_endpoint=oss_endpoint)
-
-            our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info"
-
-        if our_video_id is None:
-            try:
-                # 删除视频文件夹
-                shutil.rmtree(f"./{crawler}/videos/{md_title}")
-                return
-            except FileNotFoundError:
-                return
-
-        # 视频信息保存数据库
-        insert_sql = f""" insert into crawler_video(video_id,
-                                                user_id,
-                                                out_user_id,
-                                                platform,
-                                                strategy,
-                                                out_video_id,
-                                                video_title,
-                                                cover_url,
-                                                video_url,
-                                                duration,
-                                                publish_time,
-                                                play_cnt,
-                                                crawler_rule,
-                                                width,
-                                                height)
-                                                values({our_video_id},
-                                                {user_dict["uid"]},
-                                                "{video_dict['user_id']}",
-                                                "{cls.platform}",
-                                                "定向爬虫策略",
-                                                "{video_dict['video_id']}",
-                                                "{video_dict['video_title']}",
-                                                "{video_dict['cover_url']}",
-                                                "{video_dict['video_url']}",
-                                                {int(video_dict['duration'])},
-                                                "{video_dict['publish_time_str']}",
-                                                {int(video_dict['play_cnt'])},
-                                                '{json.dumps(rule_dict)}',
-                                                {int(video_dict['video_width'])},
-                                                {int(video_dict['video_height'])}) """
-        Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}")
-        Common.logging(log_type, crawler, env, f"insert_sql:{insert_sql}")
-        MysqlHelper.update_values(log_type, crawler, insert_sql, env, action="")
-        Common.logger(log_type, crawler).info('视频信息写入数据库成功')
-        Common.logging(log_type, crawler, env, '视频信息写入数据库成功')
 
-        # 视频写入飞书
-        Feishu.insert_columns(log_type, crawler, "fYdA8F", "ROWS", 1, 2)
-        upload_time = int(time.time())
-        values = [[our_video_id,
-                   time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)),
-                   "定向榜",
-                   str(video_dict['video_id']),
-                   video_dict['video_title'],
-                   our_video_link,
-                   video_dict['play_cnt'],
-                   video_dict['comment_cnt'],
-                   video_dict['like_cnt'],
-                   video_dict['share_cnt'],
-                   video_dict['duration'],
-                   f"{video_dict['video_width']}*{video_dict['video_height']}",
-                   video_dict['publish_time_str'],
-                   video_dict['user_name'],
-                   video_dict['user_id'],
-                   video_dict['avatar_url'],
-                   video_dict['cover_url'],
-                   video_dict['video_url']]]
-        time.sleep(1)
-        Feishu.update_values(log_type, crawler, "fYdA8F", "E2:Z2", values)
-        Common.logger(log_type, crawler).info(f"视频已保存至云文档\n")
-        Common.logging(log_type, crawler, env, f"视频已保存至云文档\n")
-        cls.download_cnt += 1
 
     @classmethod
     def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env):

+ 354 - 0
kuaishou/kuaishou_author/kuaishou_author_scheduling_new.py

@@ -0,0 +1,354 @@
+# -*- coding: utf-8 -*-
+# @Time: 2023/11/07
+import os
+import sys
+import time
+from datetime import date, timedelta
+import requests
+import json
+import urllib3
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common import AliyunLogger
+from common.mq import MQ
+from requests.adapters import HTTPAdapter
+from common.scheduling_db import MysqlHelper
+from common.public import random_title, get_config_from_mysql, download_rule
+
+
+class KuaishouauthorScheduling:
+    platform = "快手"
+    download_cnt = 0
+
+    @classmethod
+    def videos_cnt(cls, rule_dict):
+        videos_cnt = rule_dict.get("videos_cnt", {}).get("min", 0)
+        if videos_cnt == 0:
+            videos_cnt = 1000
+        return videos_cnt
+
+    @classmethod
+    def video_title(cls, log_type, crawler, env, title):
+        title_split1 = title.split(" #")
+        if title_split1[0] != "":
+            title1 = title_split1[0]
+        else:
+            title1 = title_split1[-1]
+
+        title_split2 = title1.split(" #")
+        if title_split2[0] != "":
+            title2 = title_split2[0]
+        else:
+            title2 = title_split2[-1]
+
+        title_split3 = title2.split("@")
+        if title_split3[0] != "":
+            title3 = title_split3[0]
+        else:
+            title3 = title_split3[-1]
+
+        video_title = title3.strip().replace("\n", "") \
+                          .replace("/", "").replace("快手", "").replace(" ", "") \
+                          .replace(" ", "").replace("&NBSP", "").replace("\r", "") \
+                          .replace("#", "").replace(".", "。").replace("\\", "") \
+                          .replace(":", "").replace("*", "").replace("?", "") \
+                          .replace("?", "").replace('"', "").replace("<", "") \
+                          .replace(">", "").replace("|", "").replace("@", "").replace('"', '').replace("'", '')[:40]
+        if video_title.replace(" ", "") == "" or video_title == "。。。" or video_title == "...":
+            return random_title(log_type, crawler, env, text='title')
+        else:
+            return video_title
+
+    @classmethod
+    def get_cookie(cls, log_type, crawler, env):
+        select_sql = f""" select * from crawler_config where source="{crawler}" """
+        configs = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
+        for config in configs:
+            if "cookie" in config["config"]:
+                cookie_dict = {
+                    "cookie_id": config["id"],
+                    "title": config["title"].strip(),
+                    "cookie": dict(eval(config["config"]))["cookie"].strip(),
+                    "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(config["update_time"] / 1000))),
+                    "operator": config["operator"].strip()
+                }
+                return cookie_dict
+
+    @classmethod
+    def get_videoList(cls, log_type, crawler, user_dict, rule_dict, env):
+        pcursor = ""
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        while True:
+            url = "https://www.kuaishou.com/graphql"
+            payload = json.dumps({
+                "operationName": "visionProfilePhotoList",
+                "variables": {
+                    "userId": user_dict["link"].replace("https://www.kuaishou.com/profile/", ""),
+                    "pcursor": pcursor,
+                    "page": "profile"
+                },
+                "query": "fragment photoContent on PhotoEntity {\n  id\n  duration\n  caption\n  originCaption\n  likeCount\n  viewCount\n  commentCount\n  realLikeCount\n  coverUrl\n  photoUrl\n  photoH265Url\n  manifest\n  manifestH265\n  videoResource\n  coverUrls {\n    url\n    __typename\n  }\n  timestamp\n  expTag\n  animatedCoverUrl\n  distance\n  videoRatio\n  liked\n  stereoType\n  profileUserTopPhoto\n  musicBlocked\n  __typename\n}\n\nfragment feedContent on Feed {\n  type\n  author {\n    id\n    name\n    headerUrl\n    following\n    headerUrls {\n      url\n      __typename\n    }\n    __typename\n  }\n  photo {\n    ...photoContent\n    __typename\n  }\n  canAddComment\n  llsid\n  status\n  currentPcursor\n  tags {\n    type\n    name\n    __typename\n  }\n  __typename\n}\n\nquery visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n  visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n    result\n    llsid\n    webPageArea\n    feeds {\n      ...feedContent\n      __typename\n    }\n    hostName\n    pcursor\n    __typename\n  }\n}\n"
+            })
+            cookie = cls.get_cookie(log_type, crawler, env)["cookie"]
+            headers = {
+                'Accept': '*/*',
+                'Content-Type': 'application/json',
+                'Origin': 'https://www.kuaishou.com',
+                'Cookie': cookie,
+                'Content-Length': '1260',
+                'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
+                'Host': 'www.kuaishou.com',
+                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6.1 Safari/605.1.15',
+                'Referer': f'https://www.kuaishou.com/profile/{user_dict["link"].replace("https://www.kuaishou.com/profile/", "")}',
+                'Accept-Encoding': 'gzip, deflate, br',
+                'Connection': 'keep-alive'
+            }
+            urllib3.disable_warnings()
+            s = requests.session()
+            # max_retries=3 重试3次
+            s.mount('http://', HTTPAdapter(max_retries=3))
+            s.mount('https://', HTTPAdapter(max_retries=3))
+            response = s.post(url=url, headers=headers, data=payload, proxies=Common.tunnel_proxies(), verify=False, timeout=10)
+            response.close()
+            # Common.logger(log_type, crawler).info(f"response:{response.text}\n")
+            if response.status_code != 200:
+                Common.logger(log_type, crawler).warning(f"response:{response.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"response:{response.json()}\n"
+                )
+                return
+            elif "data" not in response.json():
+                Common.logger(log_type, crawler).warning(f"response:{response.json()}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"response:{response.json()}\n"
+                )
+                return
+            elif "visionProfilePhotoList" not in response.json()["data"]:
+                Common.logger(log_type, crawler).warning(f"response:{response.json()}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"response:{response.json()}\n"
+                )
+                return
+            elif "feeds" not in response.json()["data"]["visionProfilePhotoList"]:
+                Common.logger(log_type, crawler).warning(f"response:{response.json()}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"response:{response.json()}\n"
+                )
+                return
+            elif len(response.json()["data"]["visionProfilePhotoList"]["feeds"]) == 0:
+                Common.logger(log_type, crawler).warning(f"没有更多视频啦 ~\n")
+                AliyunLogger.logging(
+                    code="2001",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message= f"没有更多视频啦 ~\n"
+                )
+                return
+            pcursor = response.json()['data']['visionProfilePhotoList']['pcursor']
+            feeds = response.json()['data']['visionProfilePhotoList']['feeds']
+            for i in range(len(feeds)):
+                try:
+                    if cls.download_cnt >= cls.videos_cnt(rule_dict):
+                        Common.logger(log_type, crawler).info(f"已下载视频数:{cls.download_cnt}\n")
+                        AliyunLogger.logging(
+                            code="2002",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message=f"已下载视频数:{cls.download_cnt}\n"
+                        )
+                        return
+                    video_title = feeds[i].get("photo", {}).get("caption", random_title(log_type, crawler, env, text='title'))
+                    video_title = cls.video_title(log_type, crawler, env, video_title)
+                    try:
+                        video_id = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("videoId", "")
+                        video_width = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("width", 0)
+                        video_height = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("height", 0)
+                    except KeyError:
+                        video_id = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("videoId", "")
+                        video_width = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("width", 0)
+                        video_height = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("height", 0)
+                    publish_time_stamp = int(int(feeds[i].get('photo', {}).get('timestamp', 0)) / 1000)
+                    publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+                    date_three_days_ago_string = (date.today() + timedelta(days=-60)).strftime("%Y-%m-%d %H:%M:%S")
+                    rule = publish_time_str > date_three_days_ago_string
+                    if rule == False:
+                        Common.logger(log_type, crawler).info(f"发布时间小于60天,发布时间:{rule}\n")
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message=f"发布时间小于60天,发布时间:{rule}\n"
+                        )
+                        continue
+                    realLikeCount = int(feeds[i].get('photo', {}).get('realLikeCount', 0))
+                    if realLikeCount < 10000:
+                        Common.logger(log_type, crawler).info(f"点赞量:{realLikeCount}\n")
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message=f"点赞量:{realLikeCount}\n"
+                        )
+                        continue
+                    video_dict = {'video_title': video_title,
+                                  'video_id': video_id,
+                                  'play_cnt': int(feeds[i].get('photo', {}).get('viewCount', 0)),
+                                  'like_cnt': int(feeds[i].get('photo', {}).get('realLikeCount', 0)),
+                                  'comment_cnt': 0,
+                                  'share_cnt': 0,
+                                  'video_width': video_width,
+                                  'video_height': video_height,
+                                  'duration': int(int(feeds[i].get('photo', {}).get('duration', 0)) / 1000),
+                                  'publish_time_stamp': publish_time_stamp,
+                                  'publish_time_str': publish_time_str,
+                                  'user_name': feeds[i].get('author', {}).get('name', ""),
+                                  'user_id': feeds[i].get('author', {}).get('id', ""),
+                                  'avatar_url': feeds[i].get('author', {}).get('headerUrl', ""),
+                                  'cover_url': feeds[i].get('photo', {}).get('coverUrl', ""),
+                                  'video_url': feeds[i].get('photo', {}).get('photoUrl', ""),
+                                  'session': f"kuaishou-{int(time.time())}"}
+                    for k, v in video_dict.items():
+                        Common.logger(log_type, crawler).info(f"{k}:{v}")
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"{video_dict}\n"
+                    )
+                    if int((int(time.time()) - int(publish_time_stamp)) / (3600*24)) > int(rule_dict.get("period", {}).get("max", 1000)):
+                        Common.logger(log_type, crawler).info(f'发布时间超过{int(rule_dict.get("period", {}).get("max", 1000))}天\n')
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message=f'发布时间超过{int(rule_dict.get("period", {}).get("max", 1000))}天\n'
+                        )
+                        return
+                    if video_dict["video_id"] == '' or video_dict["cover_url"] == '' or video_dict["video_url"] == '':
+                        Common.logger(log_type, crawler).info('无效视频\n')
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message='无效视频\n'
+                        )
+                    elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False:
+                        Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message='不满足抓取规则\n'
+                        )
+                    elif any(str(word) if str(word) in video_dict["video_title"] else False
+                             for word in get_config_from_mysql(log_type=log_type,
+                                                               source=crawler,
+                                                               env=env,
+                                                               text="filter",
+                                                               action="")) is True:
+                        Common.logger(log_type, crawler).info('已中过滤词\n')
+                        AliyunLogger.logging(
+                            code="2004",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message='已中过滤词\n'
+                        )
+                    elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
+                        Common.logger(log_type, crawler).info('视频已下载\n')
+                        AliyunLogger.logging(
+                            code="2002",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message='视频已下载\n'
+                        )
+                    else:
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = video_dict["video_width"]
+                        video_dict["height"] = video_dict["video_height"]
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = user_dict["uid"]
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        video_dict["strategy_type"] = log_type
+                        mq.send_msg(video_dict)
+                        cls.download_cnt += 1
+
+                except Exception as e:
+                    Common.logger(log_type, crawler).warning(f"抓取单条视频异常:{e}\n")
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"抓取单条视频异常:{e}\n"
+                    )
+
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+
+
+    @classmethod
+    def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env):
+        for user_dict in user_list:
+            try:
+                Common.logger(log_type, crawler).info(f"开始抓取 {user_dict['nick_name']} 主页视频")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"开始抓取 {user_dict['nick_name']} 主页视频"
+                )
+                cls.download_cnt = 0
+                cls.get_videoList(log_type=log_type,
+                                  crawler=crawler,
+                                  user_dict=user_dict,
+                                  rule_dict=rule_dict,
+                                  env=env)
+            except Exception as e:
+                Common.logger(log_type, crawler).warning(f"抓取用户{user_dict['nick_name']}主页视频时异常:{e}\n")
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取用户{user_dict['nick_name']}主页视频时异常:{e}\n"
+                )
+
+
+if __name__ == "__main__":
+    print(KuaishouauthorScheduling.get_cookie("author", "kuaishou", "prod")["cookie"])
+    pass

+ 67 - 22
kuaishou/kuaishou_main/run_ks_author.py

@@ -1,15 +1,17 @@
 # -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/6/7
+# @Time: 2023/11/07
 import argparse
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
+
+from common import AliyunLogger
+
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.scheduling_db import MysqlHelper
-from kuaishou.kuaishou_author.kuaishou_author_scheduling import KuaishouauthorScheduling
+from kuaishou.kuaishou_author.kuaishou_author_scheduling_new import KuaishouauthorScheduling
 
 
 def main(log_type, crawler, topic_name, group_id, env):
@@ -43,16 +45,22 @@ def main(log_type, crawler, topic_name, group_id, env):
                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
                                                       f"Properties:{msg.properties}")
-                Common.logging(log_type, crawler, env, f"Receive\n"
-                                                       f"MessageId:{msg.message_id}\n"
-                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                       f"MessageTag:{msg.message_tag}\n"
-                                                       f"ConsumedTimes:{msg.consumed_times}\n"
-                                                       f"PublishTime:{msg.publish_time}\n"
-                                                       f"Body:{msg.message_body}\n"
-                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                       f"Properties:{msg.properties}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                            f"MessageId:{msg.message_id}\n"
+                            f"MessageBodyMD5:{msg.message_body_md5}\n"
+                            f"MessageTag:{msg.message_tag}\n"
+                            f"ConsumedTimes:{msg.consumed_times}\n"
+                            f"PublishTime:{msg.publish_time}\n"
+                             f"Body:{msg.message_body}\n"
+                            f"NextConsumeTime:{msg.next_consume_time}\n"
+                            f"ReceiptHandle:{msg.receipt_handle}\n"
+                            f"Properties:{msg.properties}"
+                )
                 # ack_mq_message
                 ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
 
@@ -63,34 +71,71 @@ def main(log_type, crawler, topic_name, group_id, env):
                 select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
                 user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
                 Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
-                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"调度任务:{task_dict}")
                 Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
-                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
-                # Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}")
                 Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
-                Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f'开始抓取:{task_dict["taskName"]}\n')
                 KuaishouauthorScheduling.get_author_videos(log_type=log_type,
                                                            crawler=crawler,
                                                            rule_dict=rule_dict,
                                                            user_list=user_list,
                                                            env=env)
-                # Common.del_logs(log_type, crawler)
                 Common.logger(log_type, crawler).info('抓取一轮结束\n')
-                Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+                AliyunLogger.logging(
+                    code="1004",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message='抓取一轮结束\n'
+                )
                 kuaishou_author_end_time = int(time.time())
                 kuaishou_author_duration = kuaishou_author_start_time - kuaishou_author_end_time
                 Common.logger(log_type, crawler).info(f"duration {kuaishou_author_duration}")
-                Common.logging(log_type, crawler, env, f"duration {kuaishou_author_duration}")
+                AliyunLogger.logging(
+                    code="1004",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"duration {kuaishou_author_duration}"
+                )
 
         except MQExceptionBase as err:
             # Topic中没有消息可消费。
             if err.type == "MessageNotExist":
                 Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
-                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n"
+                )
                 continue
 
             Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
-            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            AliyunLogger.logging(
+                code="1000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n"
+            )
             time.sleep(2)
             continue
 

+ 110 - 0
kuaishou/kuaishou_main/run_ks_author1.py

@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/7
+import argparse
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.public import get_consumer, ack_message, task_fun_mq
+from common.scheduling_db import MysqlHelper
+from kuaishou.kuaishou_author.kuaishou_author_scheduling import KuaishouauthorScheduling
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'WaitSeconds:{wait_seconds}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}')
+    Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                           f'WaitSeconds:{wait_seconds}\n'
+                                           f'TopicName:{topic_name}\n'
+                                           f'MQConsumer:{group_id}')
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                kuaishou_author_start_time = int(time.time())
+                Common.logger(log_type, crawler).info(f"Receive\n"
+                                                      f"MessageId:{msg.message_id}\n"
+                                                      f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                      f"MessageTag:{msg.message_tag}\n"
+                                                      f"ConsumedTimes:{msg.consumed_times}\n"
+                                                      f"PublishTime:{msg.publish_time}\n"
+                                                      f"Body:{msg.message_body}\n"
+                                                      f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                      f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                      f"Properties:{msg.properties}")
+                Common.logging(log_type, crawler, env, f"Receive\n"
+                                                       f"MessageId:{msg.message_id}\n"
+                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                       f"MessageTag:{msg.message_tag}\n"
+                                                       f"ConsumedTimes:{msg.consumed_times}\n"
+                                                       f"PublishTime:{msg.publish_time}\n"
+                                                       f"Body:{msg.message_body}\n"
+                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                       f"Properties:{msg.properties}")
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 处理爬虫业务
+                task_dict = task_fun_mq(msg.message_body)['task_dict']
+                rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                task_id = task_dict['id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
+                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                # Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+                Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                KuaishouauthorScheduling.get_author_videos(log_type=log_type,
+                                                           crawler=crawler,
+                                                           rule_dict=rule_dict,
+                                                           user_list=user_list,
+                                                           env=env)
+                # Common.del_logs(log_type, crawler)
+                Common.logger(log_type, crawler).info('抓取一轮结束\n')
+                Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+                kuaishou_author_end_time = int(time.time())
+                kuaishou_author_duration = kuaishou_author_start_time - kuaishou_author_end_time
+                Common.logger(log_type, crawler).info(f"duration {kuaishou_author_duration}")
+                Common.logging(log_type, crawler, env, f"duration {kuaishou_author_duration}")
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                continue
+
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(log_type=args.log_type,
+         crawler=args.crawler,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
+         env=args.env)

+ 2 - 2
kuaishou/kuaishou_main/run_ks_author_dev.py

@@ -5,7 +5,7 @@ import os
 import sys
 sys.path.append(os.getcwd())
 from common.common import Common
-from kuaishou.kuaishou_author.kuaishou_author_scheduling import KuaishouauthorScheduling
+from kuaishou.kuaishou_author.kuaishou_author_scheduling_new import KuaishouauthorScheduling
 
 
 def kuaishou_recommend_main(log_type, crawler, env):
@@ -32,4 +32,4 @@ def kuaishou_recommend_main(log_type, crawler, env):
 
 
 if __name__ == "__main__":
-    kuaishou_recommend_main("author", "kuaishou", "dev")
+    kuaishou_recommend_main("author", "kuaishou", "prod")