Browse Source

Merge remote-tracking branch 'origin/master'

zhangyong 1 year ago
parent
commit
b1584cb933

+ 3 - 1
common/mq.py

@@ -18,7 +18,9 @@ class MQ:
         platform = video_dict["platform"]
         try:
             msg = TopicMessage(json.dumps(video_dict))
-            msg.set_message_key(platform + "-" + strategy + "-" + video_dict["out_video_id"])
+            message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
+            # msg.set_message_key(platform + "-" + strategy + "-" + video_dict["out_video_id"])
+            msg.set_message_key(message_key)
             re_msg = self.producer.publish_message(msg)
             Common.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
                                                    (re_msg.message_id, re_msg.message_body_md5))

+ 0 - 6
haitunzhufu/haitunzhufu_recommend/haitunzhufu_recommend3.py

@@ -112,12 +112,6 @@ class HTZFScheduling:
             Common.logger(self.log_type, self.crawler).info(
                 f"get_videoList:{response.text}\n"
             )
-            # Common.logging(
-            #     self.log_type,
-            #     self.crawler,
-            #     self.env,
-            #     f"get_videoList:{response.text}\n",
-            # )
             AliyunLogger.logging(
                 code="2000",
                 platform=self.crawler,

+ 0 - 0
shanhuzhufu/__init__.py


+ 27 - 0
shanhuzhufu/crypt/decrypt.py

@@ -0,0 +1,27 @@
+from base64 import b64encode, b64decode
+from Crypto.Cipher import AES
+from Crypto.Util.Padding import pad, unpad
+
+
+class ShanHuZhuFuAes:
+    def __init__(self):
+        self.key = 'xlc2ze7qnqg8xi1d'.encode('utf-8')  # 需要一个bytes类型的key
+        self.iv = self.key  # 在这个例子中,key和iv是相同的
+
+    def encrypt(self, data):
+        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
+        ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))
+        ct = b64encode(ct_bytes).decode('utf-8')
+        return ct
+
+    def decrypt(self, data):
+        try:
+            ct = b64decode(data.encode('utf-8'))
+            cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
+            pt = unpad(cipher.decrypt(ct), AES.block_size)
+            return pt.decode('utf-8')
+        except Exception as e:
+            print("Incorrect decryption")
+            return None
+
+

+ 153 - 0
shanhuzhufu/shanhuzhufu_main/run_shzf_recommend.py

@@ -0,0 +1,153 @@
+import argparse
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from shanhuzhufu.shanhuzhufu_recommend import ShanHuZhuFuRecommend
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取祝珊瑚祝福——推荐",
+                )
+                main_process = ShanHuZhuFuRecommend(
+                    platform=crawler,
+                    mode=log_type,
+                    rule_dict=rule_dict,
+                    user_list=user_list,
+                    env=env
+                )
+                main_process.get_video_list()
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="完成抓取——珊瑚祝福",
+                )
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 1 - 0
shanhuzhufu/shanhuzhufu_recommend/__init__.py

@@ -0,0 +1 @@
+from .shanhuzhufu_recommend_scheduling import ShanHuZhuFuRecommend

+ 103 - 0
shanhuzhufu/shanhuzhufu_recommend/shanhuzhufu_recommend_dev.py

@@ -0,0 +1,103 @@
+import os
+import json
+import random
+import sys
+import time
+import uuid
+
+import requests
+import datetime
+
+sys.path.append(os.getcwd())
+from common.video_item import VideoItem
+from common import AliyunLogger, tunnel_proxies
+from common.pipeline import PiaoQuanPipelineTest
+from common.mq import MQ
+from shanhuzhufu.crypt.decrypt import ShanHuZhuFuAes as AES
+
+
+class ShanHuZhuFuRecommend(object):
+    def __init__(self, platform, mode, rule_dict, user_list, env):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.limit_flag = False
+        self.cryptor = AES()
+
+    def get_video_list(self):
+        base_url = "https://shanhu.nnapi.cn/videos/api.videos/getItem"
+        headers = {
+            'Host': 'shanhu.nnapi.cn',
+            'xweb_xhr': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+            'content-type': 'application/json',
+            'accept': '*/*',
+            'sec-fetch-site': 'cross-site',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-dest': 'empty',
+            'referer': 'https://servicewechat.com/wxc2088c70f666b45e/2/page-frame.html',
+            'accept-language': 'en-US,en;q=0.9',
+            'Cookie': 'PHPSESSID=562dc39e8e68ad3e76c237f687bd049b; lang=zh-cn'
+        }
+        for i in range(100):
+            params = {
+                "mark": "",
+                "page": i + 1
+            }
+            response = requests.get(url=base_url, headers=headers, params=params, proxies=tunnel_proxies())
+            encrypted_info = response.json()['data']
+            decrypted_info = json.loads(self.cryptor.decrypt(data=encrypted_info))
+            # print(json.dumps(decrypted_info, ensure_ascii=False, indent=4))
+            video_list = decrypted_info['list']
+            for index, video_obj in enumerate(video_list):
+                self.process_video_obj(video_obj)
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        publish_time_stamp = datetime.datetime.strptime(video_obj['create_at'], "%Y-%m-%d %H:%M:%S").timestamp()
+        item = VideoItem()
+        item.add_video_info("user_id", our_user['uid'])
+        item.add_video_info("user_name", our_user['nick_name'])
+        item.add_video_info("video_id", video_obj['id'])
+        item.add_video_info("video_title", video_obj['name'])
+        item.add_video_info("publish_time_str", video_obj['create_at'])
+        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("video_url", video_obj['cover'])
+        item.add_video_info("cover_url", video_obj['cover'] + '&vframe/png/offset/1/w/200')
+        item.add_video_info("like_cnt", video_obj['num_like'])
+        item.add_video_info("play_cnt", video_obj['num_read'])
+        item.add_video_info("comment_cnt", video_obj['num_comment'])
+        item.add_video_info("out_video_id", video_obj['id'])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipelineTest(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            print(json.dumps(mq_obj, ensure_ascii=False, indent=4))
+            if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 200)):
+                self.limit_flag = True
+
+
+if __name__ == '__main__':
+    S = ShanHuZhuFuRecommend(
+        platform="shanhuzhufu",
+        mode="recommend",
+        env="dev",
+        rule_dict={},
+        user_list=[{'nick_name': "Ivring", 'uid': "1997"}, {'nick_name': "paul", 'uid': "1998"}]
+    )
+    S.get_video_list()

+ 158 - 0
shanhuzhufu/shanhuzhufu_recommend/shanhuzhufu_recommend_scheduling.py

@@ -0,0 +1,158 @@
+import os
+import json
+import random
+import sys
+import time
+import uuid
+
+import requests
+import datetime
+
+sys.path.append(os.getcwd())
+from common.video_item import VideoItem
+from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
+from common.mq import MQ
+from shanhuzhufu.crypt.decrypt import ShanHuZhuFuAes as AES
+
+
+class ShanHuZhuFuRecommend(object):
+    def __init__(self, platform, mode, rule_dict, user_list, env):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.limit_flag = False
+        self.cryptor = AES()
+
+    def get_video_list(self):
+        base_url = "https://shanhu.nnapi.cn/videos/api.videos/getItem"
+        headers = {
+            "Host": "shanhu.nnapi.cn",
+            "xweb_xhr": "1",
+            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
+            "content-type": "application/json",
+            "accept": "*/*",
+            "sec-fetch-site": "cross-site",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-dest": "empty",
+            "referer": "https://servicewechat.com/wxc2088c70f666b45e/2/page-frame.html",
+            "accept-language": "en-US,en;q=0.9",
+            "Cookie": "PHPSESSID=562dc39e8e68ad3e76c237f687bd049b; lang=zh-cn",
+        }
+        for i in range(100):
+            time.sleep(random.randint(1, 10))
+            try:
+                if self.limit_flag:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
+                    )
+                    return
+                else:
+                    params = {"mark": "", "page": i + 1}
+                    response = requests.get(
+                        url=base_url,
+                        headers=headers,
+                        params=params,
+                        proxies=tunnel_proxies(),
+                    )
+                    encrypted_info = response.json()["data"]
+                    decrypted_info = json.loads(
+                        self.cryptor.decrypt(data=encrypted_info)
+                    )
+                    video_list = decrypted_info["list"]
+                    for index, video_obj in enumerate(video_list, 1):
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="扫描到一条视频",
+                                data=video_obj,
+                            )
+                            self.process_video_obj(video_obj)
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                data=video_obj,
+                                message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
+                            )
+            except Exception as e:
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message="抓取第{}页时候出现错误, 报错信息是{}".format(i + 1, e),
+                )
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        publish_time_stamp = datetime.datetime.strptime(
+            video_obj["create_at"], "%Y-%m-%d %H:%M:%S"
+        ).timestamp()
+        item = VideoItem()
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+        item.add_video_info("video_id", video_obj["id"])
+        item.add_video_info("video_title", video_obj["name"])
+        item.add_video_info("publish_time_str", video_obj["create_at"])
+        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("video_url", video_obj["cover"])
+        item.add_video_info(
+            "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200"
+        )
+        item.add_video_info("like_cnt", video_obj["num_like"])
+        item.add_video_info("play_cnt", video_obj["num_read"])
+        item.add_video_info("comment_cnt", video_obj["num_comment"])
+        item.add_video_info("out_video_id", video_obj["id"])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            # print(mq_obj)
+            self.mq.send_msg(mq_obj)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="成功发送至 ETL",
+                data=mq_obj,
+            )
+            if self.download_cnt >= int(
+                self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.limit_flag = True
+
+
+if __name__ == '__main__':
+    S = ShanHuZhuFuRecommend(
+        platform="shanhuzhufu",
+        mode="recommend",
+        env="dev",
+        rule_dict={},
+        user_list=[{'nick_name': "Ivring", 'uid': "1997"}, {'nick_name': "paul", 'uid': "1998"}]
+    )
+    S.get_video_list()

+ 39 - 35
zhuwanwufusu/zhuwanwufusu_recommend/zwwfs_recommend.py

@@ -286,41 +286,41 @@ class ZhuWanWuFuSuRecommend(object):
                         message="本轮已经抓取足够数量的视频"
                     )
                     return
-                query = {
-                    "groupId": "1650323161797439489",
-                    "pageNo": page_index,
-                    "pageSize": 10,
-                    # "videoId": "1681138763919003650",
-                    "appid": "wx0afdc2669ed8df2f",
-                    "type": 3,
-                    "hxid": "1556555457243828666"
-                }
-                params = {
-                    "v": self.cryptor.aes_encrypt(data=json.dumps(query))
-                }
-                response = requests.request("GET", url, headers=headers, params=params)
-                result = json.loads(self.cryptor.aes_decrypt(response.text))
-                total_page = result['list']['pages']
-                page_index = result['list']['current'] + 1
-                for index, video_obj in enumerate(result['list']['records'], 1):
-                    try:
-                        AliyunLogger.logging(
-                            code="1001",
-                            platform=self.platform,
-                            mode=self.mode,
-                            env=self.env,
-                            message="扫描到一条视频",
-                            data=video_obj
-                        )
-                        self.process_video_obj(video_obj)
-                    except Exception as e:
-                        AliyunLogger.logging(
-                            code="3000",
-                            platform=self.platform,
-                            mode=self.mode,
-                            env=self.env,
-                            message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
-                        )
+                else:
+                    query = {
+                        "groupId": "1650323161797439489",
+                        "pageNo": page_index,
+                        "pageSize": 10,
+                        "appid": "wx0afdc2669ed8df2f",
+                        "type": 3,
+                        "hxid": "1556555457243828666"
+                    }
+                    params = {
+                        "v": self.cryptor.aes_encrypt(data=json.dumps(query))
+                    }
+                    response = requests.request("GET", url, headers=headers, params=params)
+                    result = json.loads(self.cryptor.aes_decrypt(response.text))
+                    total_page = result['list']['pages']
+                    page_index = result['list']['current'] + 1
+                    for index, video_obj in enumerate(result['list']['records'], 1):
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="扫描到一条视频",
+                                data=video_obj
+                            )
+                            self.process_video_obj(video_obj)
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
+                            )
             except Exception as e:
                 AliyunLogger.logging(
                     code="3000",
@@ -336,8 +336,12 @@ class ZhuWanWuFuSuRecommend(object):
         先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
         """
         self.get_recommend_list()
+        if self.limit_flag:
+            return
         time.sleep(2 * 60)
         self.get_detail_video_list()
+        if self.limit_flag:
+            return
         time.sleep(5 * 60)
         self.mode = "author"
         user_list = self.get_user_list()