浏览代码

欢欢喜喜祝福到——修改加密方式, 祝福送老友逆向

罗俊辉 1 年之前
父节点
当前提交
d98f94d926

+ 2 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao_author.py

@@ -718,6 +718,8 @@ class GongzhonghaoAuthor:
                 env=env,
                 message="完成抓取公众号: {}".format(user_dict["nick_name"]),
             )
+            # 判断该账号10天内是否有新进到数据
+
 
 
 if __name__ == "__main__":

+ 137 - 124
huanhuanxixizhufudao/huanhuanxixizhufudao_main/run_hhxxzfd_recommend.py

@@ -1,140 +1,153 @@
-# -*- coding: utf-8 -*-
-# @Author: luojunhui
-# @Time: 2023/10/18
 import argparse
-import random
-import multiprocessing
-
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
 
 sys.path.append(os.getcwd())
-from common.common import Common
-from common.public import get_consumer, ack_message, task_fun_mq, get_rule_from_mysql
+from common.public import task_fun_mq, get_consumer, ack_message
 from common.scheduling_db import MysqlHelper
-from huanhuanxixizhufudao.huanhuanxixizhufudao_recommend import HHXXZFDScheduling
-
-
-def run(args1, args2, args3, args4, args5):
-    HHXXZFDScheduling(log_type=args1,
-                      crawler=args2,
-                      rule_dict=args3,
-                      our_uid=args4,
-                      env=args5)
-
-
-class HhxxzfdMain:
-    @classmethod
-    def main(cls, log_type, crawler, topic_name, group_id, env):
-        consumer = get_consumer(topic_name, group_id)
-        # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
-        # 长轮询时间3秒(最多可设置为30秒)。
-        wait_seconds = 30
-        # 一次最多消费3条(最多可设置为16条)。
-        batch = 1
-        Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-                                              f'WaitSeconds:{wait_seconds}\n'
-                                              f'TopicName:{topic_name}\n'
-                                              f'MQConsumer:{group_id}')
-        Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-                                               f'WaitSeconds:{wait_seconds}\n'
-                                               f'TopicName:{topic_name}\n'
-                                               f'MQConsumer:{group_id}')
-        while True:
-            try:
-                # 长轮询消费消息。
-                recv_msgs = consumer.consume_message(batch, wait_seconds)
-                for msg in recv_msgs:
-                    Common.logger(log_type, crawler).info(f"Receive\n"
-                                                          f"MessageId:{msg.message_id}\n"
-                                                          f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                          f"MessageTag:{msg.message_tag}\n"
-                                                          f"ConsumedTimes:{msg.consumed_times}\n"
-                                                          f"PublishTime:{msg.publish_time}\n"
-                                                          f"Body:{msg.message_body}\n"
-                                                          f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                          f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                          f"Properties:{msg.properties}")
-                    Common.logging(log_type, crawler, env, f"Receive\n"
-                                                           f"MessageId:{msg.message_id}\n"
-                                                           f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                           f"MessageTag:{msg.message_tag}\n"
-                                                           f"ConsumedTimes:{msg.consumed_times}\n"
-                                                           f"PublishTime:{msg.publish_time}\n"
-                                                           f"Body:{msg.message_body}\n"
-                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                           f"Properties:{msg.properties}")
-                    # ack_mq_message
-                    ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+from common import AliyunLogger
+from huanhuanxixizhufudao.huanhuanxixizhufudao_recommend import HHXXZFDRecommend
 
-                    # 处理爬虫业务
-                    task_dict = task_fun_mq(msg.message_body)['task_dict']
-                    rule_dict = task_fun_mq(msg.message_body)['rule_dict']
-                    task_id = task_dict['id']
-                    select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
-                    user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
-                    our_uid_list = []
-                    for user in user_list:
-                        our_uid_list.append(user["uid"])
-                    our_uid = random.choice(our_uid_list)
-                    Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
-                    Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-                    Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
-                    Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
-                    Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
-                    Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
-                    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
-                    new_r = get_rule_from_mysql(task_id=task_id, log_type=log_type, crawler=crawler, env=env)
-                    r_d = {}
-                    for item in new_r:
-                        for k, val in item.items():
-                            r_d[k] = val
-                    Common.logger(log_type, crawler).info(f"抓取规则:{r_d}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{r_d}")
-                    # 初始化
-                    HHXXZFD = HHXXZFDScheduling(
-                        log_type=log_type,
-                        crawler=crawler,
-                        env=env,
-                        rule_dict=r_d,
-                        our_uid=our_uid
-                    )
-                    for i in range(20):
-                        if HHXXZFD.download_count >= int(rule_dict.get("videos_cnt", {}).get("min", 10)):
-                            HHXXZFD.download_count = 0
-                            break
-                        else:
-                            HHXXZFD.get_videoList(page_id=i + 1, page_limit=10)
-                            time.sleep(60)
-                    Common.logger(log_type, crawler).info('抓取一轮结束\n')
-                    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
 
-            except MQExceptionBase as err:
-                # Topic中没有消息可消费。
-                if err.type == "MessageNotExist":
-                    Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
-                    Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
-                    continue
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取欢欢喜喜祝福到——推荐",
+                )
+                main_process = HHXXZFDRecommend(
+                    platform=crawler,
+                    mode=log_type,
+                    rule_dict=rule_dict,
+                    user_list=user_list,
+                    env=env
+                )
+                main_process.schedule()
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="完成抓取——祝万物复苏",
+                )
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
+                )
 
-                Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
-                Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
-                time.sleep(2)
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
                 continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
 
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()  ## 新建参数解释器对象
-    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
-    parser.add_argument('--crawler')  ## 添加参数
-    parser.add_argument('--topic_name')  ## 添加参数
-    parser.add_argument('--group_id')  ## 添加参数
-    parser.add_argument('--env')  ## 添加参数
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
     args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
-    HhxxzfdMain.main(log_type=args.log_type,
-                     crawler=args.crawler,
-                     topic_name=args.topic_name,
-                     group_id=args.group_id,
-                     env=args.env)
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 0 - 161
huanhuanxixizhufudao/huanhuanxixizhufudao_main/run_hhxxzfd_recommend_1.py

@@ -1,161 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author:
-# @Time: 2023/9/7
-import argparse
-import random
-import multiprocessing
-
-from mq_http_sdk.mq_client import *
-from mq_http_sdk.mq_consumer import *
-from mq_http_sdk.mq_exception import MQExceptionBase
-
-
-sys.path.append(os.getcwd())
-from common.common import Common
-from common.public import get_consumer, ack_message, task_fun_mq, get_rule_from_mysql
-from common.scheduling_db import MysqlHelper
-from huanhuanxixizhufudao.huanhuanxixizhufudao_recommend.huanhuanxixizhufudao_recommend import HhxxzfdRecommend
-
-
-
-def run(args1, args2, args3, args4, args5):
-    HhxxzfdRecommend.start_wechat(log_type=args1,
-                               crawler=args2,
-                               rule_dict=args3,
-                               our_uid=args4,
-                               env=args5)
-
-
-class HhxxzfdMain:
-    @classmethod
-    def main(cls, log_type, crawler, topic_name, group_id, env):
-        consumer = get_consumer(topic_name, group_id)
-        # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
-        # 长轮询时间3秒(最多可设置为30秒)。
-        wait_seconds = 30
-        # 一次最多消费3条(最多可设置为16条)。
-        batch = 1
-        Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-                                              f'WaitSeconds:{wait_seconds}\n'
-                                              f'TopicName:{topic_name}\n'
-                                              f'MQConsumer:{group_id}')
-        Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-                                               f'WaitSeconds:{wait_seconds}\n'
-                                               f'TopicName:{topic_name}\n'
-                                               f'MQConsumer:{group_id}')
-        while True:
-            try:
-                # 长轮询消费消息。
-                recv_msgs = consumer.consume_message(batch, wait_seconds)
-                for msg in recv_msgs:
-                    Common.logger(log_type, crawler).info(f"Receive\n"
-                                                          f"MessageId:{msg.message_id}\n"
-                                                          f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                          f"MessageTag:{msg.message_tag}\n"
-                                                          f"ConsumedTimes:{msg.consumed_times}\n"
-                                                          f"PublishTime:{msg.publish_time}\n"
-                                                          f"Body:{msg.message_body}\n"
-                                                          f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                          f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                          f"Properties:{msg.properties}")
-                    Common.logging(log_type, crawler, env, f"Receive\n"
-                                                           f"MessageId:{msg.message_id}\n"
-                                                           f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                           f"MessageTag:{msg.message_tag}\n"
-                                                           f"ConsumedTimes:{msg.consumed_times}\n"
-                                                           f"PublishTime:{msg.publish_time}\n"
-                                                           f"Body:{msg.message_body}\n"
-                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                           f"Properties:{msg.properties}")
-                    # ack_mq_message
-                    ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
-
-                    # 处理爬虫业务
-                    task_dict = task_fun_mq(msg.message_body)['task_dict']
-                    rule_dict = task_fun_mq(msg.message_body)['rule_dict']
-                    task_id = task_dict['id']
-                    select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
-                    user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
-                    our_uid_list = []
-                    for user in user_list:
-                        our_uid_list.append(user["uid"])
-                    our_uid = random.choice(our_uid_list)
-                    Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
-                    Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-                    Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
-                    Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
-                    Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
-                    Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
-                    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
-                    new_r = get_rule_from_mysql(task_id=task_id, log_type=log_type, crawler=crawler, env=env)
-                    r_d = {}
-                    for item in new_r:
-                        for k, val in item.items():
-                            r_d[k] = val
-                    Common.logger(log_type, crawler).info(f"抓取规则:{r_d}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{r_d}")
-                    process = multiprocessing.Process(
-                        target=run,
-                        args=(log_type, crawler, rule_dict, our_uid, env)
-                    )
-                    process.start()
-                    print("进程开始")
-
-                    for i in range(10):
-                        if not process.is_alive():
-
-                            print("进程异常,准备重启")
-                            process.terminate()
-                            os.system("adb forward --remove-all")
-                            time.sleep(60)
-                            new_r = get_rule_from_mysql(task_id=task_id, log_type=log_type, crawler=crawler, env=env)
-                            r_d = {}
-                            for item in new_r:
-                                for k, val in item.items():
-                                    r_d[k] = val
-                            Common.logger(log_type, crawler).info(f"抓取规则:{r_d}")
-                            Common.logging(log_type, crawler, env, f"抓取规则:{r_d}")
-                            process = multiprocessing.Process(
-                                target=run,
-                                args=(log_type, crawler, rule_dict, our_uid, env)
-                            )
-                            process.start()
-                        time.sleep(60)
-
-                    # # 抓取符合规则的视频列表
-                    # ZFQZRecommend.start_wechat(log_type=log_type,
-                    #                            crawler=crawler,
-                    #                            rule_dict=rule_dict,
-                    #                            our_uid=our_uid,
-                    #                            env=env)
-                    Common.logger(log_type, crawler).info('抓取一轮结束\n')
-                    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
-
-            except MQExceptionBase as err:
-                # Topic中没有消息可消费。
-                if err.type == "MessageNotExist":
-                    Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
-                    Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
-                    continue
-
-                Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
-                Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
-                time.sleep(2)
-                continue
-
-
-if __name__ == "__main__":
-    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
-    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
-    parser.add_argument('--crawler')  ## 添加参数
-    parser.add_argument('--topic_name')  ## 添加参数
-    parser.add_argument('--group_id')  ## 添加参数
-    parser.add_argument('--env')  ## 添加参数
-    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
-    HhxxzfdMain.main(log_type=args.log_type,
-                              crawler=args.crawler,
-                              topic_name=args.topic_name,
-                              group_id=args.group_id,
-                              env=args.env)

+ 1 - 1
huanhuanxixizhufudao/huanhuanxixizhufudao_recommend/__init__.py

@@ -1 +1 @@
-from .huanhuanxixizhufudao_recommend_2 import HHXXZFDScheduling
+from .huanhuanxixizhufudao_recommend_2 import HHXXZFDRecommend

+ 346 - 237
huanhuanxixizhufudao/huanhuanxixizhufudao_recommend/huanhuanxixizhufudao_recommend_2.py

@@ -1,264 +1,373 @@
-# -*- coding: utf-8 -*-
-# @Author: luojunhui
-# @Time: 2023/10/18
-import json
 import os
+import json
 import random
 import sys
 import time
-import requests
-from hashlib import md5
-from datetime import datetime
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import padding
-import binascii
+import uuid
 
-from common.mq import MQ
+import requests
 
 sys.path.append(os.getcwd())
-from common.common import Common
-from common.scheduling_db import MysqlHelper
-from common.public import get_config_from_mysql, download_rule, download_rule_v2
-
-
-# 定义一个 AES 加密解密的类
-class AESCryptor:
-    def __init__(self):
-        # 初始化密钥和 IV, 在生产环境中,这些值不应该被硬编码
-        self.key = b"50102fa64073ad76"
-        self.iv = b"173d023138824bb0"
-
-    # AES 加密方法
-    def aes_encrypt(self, data):
-        # 使用 PKCS7 填充模式处理待加密的数据,使其长度满足 AES 加密的需求
-        padder = padding.PKCS7(128).padder()
-        padded_data = padder.update(data.encode('utf-8')) + padder.finalize()
-
-        # 初始化 AES 加密器,使用 CBC 模式和给定的密钥、IV
-        backend = default_backend()
-        cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend)
-        encryptor = cipher.encryptor()
-        ct = encryptor.update(padded_data) + encryptor.finalize()
-
-        # 将加密后的字节串转为十六进制字符串,并转为大写
-        return binascii.hexlify(ct).upper().decode('utf-8')
-
-    # AES 解密方法
-    def aes_decrypt(self, hex_data):
-        # 将十六进制字符串转为原始的字节串
-        ct = binascii.unhexlify(hex_data)
-
-        # 初始化 AES 解密器,使用相同的密钥和 IV
-        backend = default_backend()
-        cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend)
-        decryptor = cipher.decryptor()
-        padded_data = decryptor.update(ct) + decryptor.finalize()
-
-        # 使用 PKCS7 移除填充
-        unpadder = padding.PKCS7(128).unpadder()
-        data = unpadder.update(padded_data) + unpadder.finalize()
-
-        # 返回解密后,去掉填充的原始字符串
-        return data.decode('utf-8')
-
-
-def clean_title(strings):
-    return (
-        strings.strip()
-        .replace("\n", "")
-        .replace("/", "")
-        .replace("\r", "")
-        .replace("#", "")
-        .replace(".", "。")
-        .replace("\\", "")
-        .replace("&NBSP", "")
-        .replace(":", "")
-        .replace("*", "")
-        .replace("?", "")
-        .replace("?", "")
-        .replace('"', "")
-        .replace("<", "")
-        .replace(">", "")
-        .replace("|", "")
-        .replace(" ", "")
-        .replace('"', "")
-        .replace("'", "")
-    )
+from common.video_item import VideoItem
+from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
+from common.mq import MQ
+from common.db import MysqlHelper
+from zhuwanwufusu.crypt import AESCipher as AES
 
 
-class HHXXZFDScheduling:
-    def __init__(self, log_type, crawler, rule_dict, env, our_uid):
-        self.platform = "欢欢喜喜祝福到"
-        self.log_type = log_type
-        self.crawler = crawler
+class HHXXZFDRecommend(object):
+    def __init__(self, platform, mode, rule_dict, user_list, env):
+        self.platform = platform
+        self.mode = mode
         self.rule_dict = rule_dict
+        self.user_list = user_list
         self.env = env
-        self.our_uid = our_uid
+        self.download_cnt = 0
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
-        self.download_count = 0
-
-    def repeat_video(self, video_id):
-        sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
-        repeat_video = MysqlHelper.get_values(
-            self.log_type, self.crawler, sql, self.env
-        )
-        return len(repeat_video)
+        self.limit_flag = False
+        self.cryptor = AES()
 
-    # 获取视频id_list
-    def get_videoList(self, page_id, page_limit):
-        time.sleep(random.randint(5, 10))
-        my_dict = {
-            "pageNo": page_id,  # 页数
-            "pageSize": page_limit,  # 每一页的视频数量
-            "groupId": "1650323161797439489",  # 分类
-            "vn": 1,
-            "gx": 1,
-            "appid": "wx9a60184c443f39af",  # 小程序id
-            "type": 2,
-            "hxid": "this may not be important",
+    def get_recommend_list(self):
+        url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
+        headers = {
+            'Host': 'api.lidongze.cn',
+            'xweb_xhr': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
+            'token': '',
+            'content-type': 'application/json',
+            'accept': '*/*',
+            'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
+            'accept-language': 'en-US,en;q=0.9'
         }
-        my_str = AESCryptor().aes_encrypt(json.dumps(my_dict, ensure_ascii=False))
-        url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2?v={}".format(my_str)
-        # 请求头
+        page_index = 1
+        total_page = 2
+        while page_index <= total_page:
+            try:
+                query = {
+                    "pageNo": page_index,
+                    "pageSize": 10,
+                    "groupId": "1650323161797439489",  # 推荐流的 ID
+                    "vn": 1,
+                    "gx": 1,
+                    "appid": "wx9a60184c443f39af",
+                    "type": 0
+                }
+                params = {
+                    "v": self.cryptor.aes_encrypt(data=json.dumps(query))
+                }
+                response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
+                result = json.loads(self.cryptor.aes_decrypt(response.text))
+                total_page = result['list']['pages']
+                page_index = result['list']['current'] + 1
+                for index, video_obj in enumerate(result['list']['records'], 1):
+                    try:
+                        AliyunLogger.logging(
+                            code="1001",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="扫描到一条视频",
+                            data=video_obj
+                        )
+                        self.process_video_obj(video_obj)
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
+                        )
+            except Exception as e:
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
+                )
+            time.sleep(random.randint(5, 10))
+
+    def get_user_videos(self, user_id):
+        """
+        在抓取完推荐页之后,去抓每一个用户的主页视频
+        """
+        url = "https://api.lidongze.cn/jeecg-boot/ugc/getAuthVideoList"
         headers = {
-            "xweb_xhr": "1",
-            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
-            "content-type": "application/json",
-            "accept": "*/*",
-            "sec-fetch-site": "cross-site",
-            "sec-fetch-mode": "cors",
-            "sec-fetch-dest": "empty",
-            "referer": "https://servicewechat.com/wx9a60184c443f39af/9/page-frame.html",
-            "accept-encoding": "gzip, deflate, br",
-            "accept-language": "en",
+            'Host': 'api.lidongze.cn',
+            'xweb_xhr': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
+            'token': '',
+            'content-type': 'application/json',
+            'accept': '*/*',
+            'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
+            'accept-language': 'en-US,en;q=0.9'
         }
-        response = requests.get(url, headers=headers)
-        result = json.loads(AESCryptor().aes_decrypt(response.text))
-        if "list" not in result or response.status_code != 200:
-            Common.logger(self.log_type, self.crawler).info(
-                f"get_videoList:{response.text}\n"
-            )
-            Common.logging(
-                self.log_type,
-                self.crawler,
-                self.env,
-                f"get_videoList:{response.text}\n",
-            )
-            return
-        elif len(result["list"]["records"]) == 0:
-            Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
-            Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
-            return
-        else:
-            data_list = result["list"]["records"]
-            for video_obj in data_list:
-                try:
-                    self.process_video_obj(video_obj)
-                except Exception as e:
-                    Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
-                    Common.logging(
-                        self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
-                    )
+        page_index = 1
+        total_page = 1
+        while page_index <= total_page:
+            query = {
+                "pageNo": page_index,
+                "pageSize": 10,
+                "authid": user_id
+            }
+            params = {
+                "v": self.cryptor.aes_encrypt(data=json.dumps(query))
+            }
+            response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
+            result = json.loads(self.cryptor.aes_decrypt(response.text))
+            total_page = result['list']['pages']
+            page_index = result['list']['current'] + 1
+            for index, video_temp in enumerate(result['list']['records']):
+                video_id = video_temp['id']
+                detail_query = {
+                    "videoId": video_id
+                }
+                detail_params = {
+                    "v": self.cryptor.aes_encrypt(data=json.dumps(detail_query))
+                }
+                url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideosDataEn"
+                headers = {
+                    'Host': 'api.lidongze.cn',
+                    'xweb_xhr': '1',
+                    'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
+                    'token': '',
+                    'content-type': 'application/json',
+                    'accept': '*/*',
+                    'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
+                    'accept-language': 'en-US,en;q=0.9'
+                }
+                detail_response = requests.request("GET", url, headers=headers, params=detail_params,
+                                                   proxies=tunnel_proxies())
+                detail_video = json.loads(self.cryptor.aes_decrypt(detail_response.text))
+                if detail_video['success']:
+                    try:
+                        AliyunLogger.logging(
+                            code="1001",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="扫描到一条视频",
+                            data=detail_video['data']
+                        )
+                        self.process_video_obj(detail_video['data'])
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="抓取单条视频失败, 该视频位于第{}条报错原因是{}".format(index, e)
+                        )
 
     def process_video_obj(self, video_obj):
-        # print(type(video_obj))
-        video_id = video_obj.get("id", 0)
-        video_title = clean_title(video_obj.get("vname", "no title"))
-        video_time = video_obj.get("v_time", 0)
-        publish_time_stamp = int(time.time())
-        publish_time_str = time.strftime(
-            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        time.sleep(random.randint(3, 8))
+        trace_id = self.platform + str(uuid.uuid1())
+        if video_obj.get("playnum"):
+            play_cnt = int(video_obj['playnum'].replace("万+", "0000")) if "万+" in video_obj['playnum'] else int(
+                video_obj['playnum'])
+        else:
+            play_cnt = 0
+        our_user = random.choice(self.user_list)
+        item = VideoItem()
+        item.add_video_info("video_id", video_obj['id'])
+        item.add_video_info("video_title", video_obj['vname'])
+        item.add_video_info("play_cnt", play_cnt)
+        item.add_video_info("publish_time_stamp", int(time.time()))
+        item.add_video_info("out_user_id", video_obj['authid'])
+        item.add_video_info("cover_url", video_obj['shareimg'])
+        item.add_video_info("like_cnt", int(video_obj['likenum']))
+        item.add_video_info("video_url", video_obj['videoaddr'])
+        item.add_video_info("out_video_id", video_obj['id'])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        item.add_video_info("user_id", our_user['uid'])
+        item.add_video_info("user_name", our_user['nick_name'])
+        # 把扫描到的账号存到 accounts 表中
+        self.manage_auth_id(out_user_id=video_obj['authid'], out_user_name=video_obj['authname'])
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
         )
-        user_name = video_obj.get("authname", "")
-        video_dict = {
-            "video_title": video_title,
-            "video_id": video_id,
-            "duration": video_time,
-            "play_cnt": int(video_obj.get("playnum", 0).replace("万+", "0000") if "万+" in video_obj.get("playnum", 0) else video_obj.get("playnum", 0)),
-            "like_cnt": int(video_obj.get("likenum", 0)),
-            "comment_cnt": 0,
-            "share_cnt": 0,
-            "user_name": user_name,
-            "publish_time_stamp": publish_time_stamp,
-            "publish_time_str": publish_time_str,
-            "video_width": 0,
-            "video_height": 0,
-            "profile_id": 0,
-            "profile_mid": 0,
-            "session": f"huanhaunxixizhufudao-{int(time.time())}",
-        }
-        for k, v in video_dict.items():
-            Common.logger(self.log_type, self.crawler).info(f"{k}:{v}")
-        Common.logging(
-            self.log_type, self.crawler, self.env, f"{video_dict}"
+        if pipeline.process_item():
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="成功发送至 ETL",
+                data=mq_obj
+            )
+            if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 200)):
+                self.limit_flag = True
+
+    def manage_auth_id(self, out_user_id, out_user_name):
+        """
+        out_user_id: 外站视频的用户 id
+        out_user_name: 外站视频用户名字
+        逻辑: 对新扫描到的视频的用户 id 进行判断,若用户 id 不存在,则把视频 id 存到表中,
+              如果用户 id 存在,则判断用户是否修改名字,若名字修改则更新名字
+        """
+        select_user_sql = f"""select name, name_id from accounts where name_id = "{out_user_id}" and platform = "{self.platform}" and useful = 1 limit 1"""
+        out_user_info = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
         )
-        # 过滤无效视频
-        if video_title == "" or video_dict["video_id"] == "":
-            Common.logger(self.log_type, self.crawler).info("无效视频\n")
-            Common.logging(self.log_type, self.crawler, self.env, "无效视频\n")
-            # 抓取基础规则过滤
-        elif (
-                download_rule_v2(
-                    log_type=self.log_type,
-                    crawler=self.crawler,
-                    video_dict=video_dict,
-                    rule_dict=self.rule_dict,
+        if out_user_info:
+            name, name_id = out_user_info[0]
+            if name == out_user_name:
+                return
+            else:
+                update_sql = f"""update accounts set name = "{out_user_name}" where name_id = "{out_user_id}";"""
+                MysqlHelper.update_values(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    sql=update_sql,
+                    env=self.env,
+                    machine=""
                 )
-                is False
-        ):
-            Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n")
-            Common.logging(
-                self.log_type, self.crawler, self.env, "不满足抓取规则\n"
+        else:
+            insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{out_user_name}", "{out_user_id}", "{self.platform}", 1 )"""
+            MysqlHelper.update_values(
+                log_type=self.mode,
+                crawler=self.platform,
+                sql=insert_sql,
+                env=self.env,
+                machine="",
             )
-        elif (
-                any(
-                    str(word)
-                    if str(word) in video_dict["video_title"]
-                    else False
-                    for word in get_config_from_mysql(
-                        log_type=self.log_type,
-                        source=self.crawler,
+
+    def get_user_list(self):
+        select_user_sql = f"""select name_id from accounts where platform = "{self.platform}" and useful = 1"""
+        out_user_info = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
+        )
+        if out_user_info:
+            result = []
+            for i in out_user_info:
+                result.append(i[0])
+            return result
+        else:
+            return []
+
+    def get_detail_video_list(self):
+        url = "https://api.lidongze.cn/jeecg-boot/ugc/getDetailVideoListsEn2"
+        headers = {
+            'Host': 'api.lidongze.cn',
+            'xweb_xhr': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+            'token': '',
+            'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
+            'accept-language': 'en-US,en;q=0.9'
+        }
+        page_index = 1
+        total_page = 2
+        while page_index <= total_page:
+            try:
+                if self.limit_flag:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
                         env=self.env,
-                        text="filter",
-                        action="",
+                        message="本轮已经抓取足够数量的视频"
                     )
+                    return
+                else:
+                    query = {
+                        "groupId": "1650323161797439489",
+                        "pageNo": page_index,
+                        "pageSize": 10,
+                        "appid": "wx9a60184c443f39af",
+                        "type": 3,
+                        "hxid": "1556555457243828666"
+                    }
+                    params = {
+                        "v": self.cryptor.aes_encrypt(data=json.dumps(query))
+                    }
+                    response = requests.request("GET", url, headers=headers, params=params)
+                    result = json.loads(self.cryptor.aes_decrypt(response.text))
+                    total_page = result['list']['pages']
+                    page_index = result['list']['current'] + 1
+                    for index, video_obj in enumerate(result['list']['records'], 1):
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="扫描到一条视频",
+                                data=video_obj
+                            )
+                            self.process_video_obj(video_obj)
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
+                            )
+            except Exception as e:
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
                 )
-                is True
-        ):
-            Common.logger(self.log_type, self.crawler).info("已中过滤词\n")
-            Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n")
-        elif self.repeat_video(video_dict["video_id"]) != 0:
-            Common.logger(self.log_type, self.crawler).info("视频已下载\n")
-            Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n")
-        else:
-            # out_video_id = md5(video_title.encode('utf8')).hexdigest()
-            # out_user_id = md5(user_name.encode('utf8')).hexdigest()
-            video_dict["out_user_id"] = video_obj.get("authid", 0)
-            video_dict["platform"] = self.crawler
-            video_dict["strategy"] = self.log_type
-            video_dict["out_video_id"] = str(video_dict["video_id"])
-            video_dict["width"] = video_dict["video_width"]
-            video_dict["height"] = video_dict["video_height"]
-            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
-            video_dict["user_id"] = self.our_uid
-            video_dict["publish_time"] = video_dict["publish_time_str"]
-            video_dict["video_url"] = video_obj['videoaddr']
-            video_dict["avatar_url"] = video_obj['authimg']
-            video_dict["cover_url"] = video_obj['indeximg']
-            # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
-            self.download_count += 1
-            self.mq.send_msg(video_dict)
+            time.sleep(random.randint(5, 10))
+
+    def schedule(self):
+        """
+        先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
+        """
+        self.get_recommend_list()
+        if self.limit_flag:
+            return
+        time.sleep(2 * 60)
+        self.get_detail_video_list()
+        if self.limit_flag:
+            return
+        time.sleep(5 * 60)
+        self.mode = "author"
+        user_list = self.get_user_list()
+        if user_list:
+            for index, user_id in enumerate(user_list):
+                try:
+                    if self.limit_flag:
+                        AliyunLogger.logging(
+                            code="2000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="本轮已经抓取足够数量的视频"
+                        )
+                        return
+                    self.get_user_videos(user_id=user_id)
+                except Exception as e:
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="抓取账号视频出现异常,账号 id 是{}, 报错原因是{}".format(user_id, e)
+                    )
 
 
-if __name__ == "__main__":
-    ZL = HHXXZFDScheduling(
-        log_type="recommend",
-        crawler="hhxxzfd",
-        rule_dict={},
-        our_uid="luojunhuihaoshuai",
-        env="dev"
-    )
-    for i in range(4):
-        ZL.get_videoList(page_id=i + 1, page_limit=10)
-        print(ZL.download_count)
+if __name__ == '__main__':
+    pass

+ 16 - 15
xiaoniangaoplus/xiaoniangaoplus/xiaoniangao_plus_scheduling.py

@@ -33,7 +33,7 @@ class XiaoNianGaoPlusRecommend:
     @classmethod
     def start_wechat(cls, log_type, crawler, env, rule_dict, our_uid):
         if env == "dev":
-            chromedriverExecutable = "/Users/a123456/Downloads/chromedriver_v111/chromedriver"
+            chromedriverExecutable = "/usr/bin/chromedriver"
         else:
             chromedriverExecutable = "/Users/a123456/Downloads/chromedriver_v111/chromedriver"
 
@@ -42,29 +42,28 @@ class XiaoNianGaoPlusRecommend:
         # 微信的配置文件
         caps = {
             "platformName": "Android",
-            "devicesName": "Android",
-            "platformVersion": "13",
-            # "udid": "emulator-5554",
+            "deviceName": "Android",
             "appPackage": "com.tencent.mm",
             "appActivity": ".ui.LauncherUI",
-            "autoGrantPermissions": "true",
-            "noReset": True,
-            "resetkeyboard": True,
+            "autoGrantPermissions": True,
             "unicodekeyboard": True,
-            "showChromedriverLog": True,
-            "printPageSourceOnFailure": True,
+            "resetkeyboard": True,
+            "noReset": True,
             "recreateChromeDriverSessions": True,
-            "enableWebviewDetailsCollection": True,
-            "setWebContentsDebuggingEnabled": True,
+            "printPageSourceOnFailure": True,
             "newCommandTimeout": 6000,
             "automationName": "UiAutomator2",
-            "chromedriverExecutable": chromedriverExecutable,
-            "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
+            "showChromedriverLog": True,
+            "chromeOptions": {
+                "androidProcess": "com.tencent.mm:tools"
+            },
+            "enableWebviewDetailsCollection": True,
+            "setWebContentsDebuggingEnabled": True,
+            "chromedriverExecutable": chromedriverExecutable
         }
-        driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        driver = webdriver.Remote("http://localhost:4750/wd/hub", caps)
         driver.implicitly_wait(30)
         # action = TouchAction(driver)
-
         for i in range(120):
             try:
                 if driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
@@ -115,8 +114,10 @@ class XiaoNianGaoPlusRecommend:
     def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath):
         time.sleep(1)
         webViews = driver.contexts
+        print(webViews)
         driver.switch_to.context(webViews[1])
         windowHandles = driver.window_handles
+        print(windowHandles)
         for handle in windowHandles:
             driver.switch_to.window(handle)
             time.sleep(1)

+ 3 - 0
zhufusonglaoyou/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/4/13

+ 3 - 0
zhufusonglaoyou/zhufusonglaoyou_main/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/4/13

+ 161 - 0
zhufusonglaoyou/zhufusonglaoyou_main/run_zfsly_recommend.py

@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/10/20
+import argparse
+import random
+
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.public import get_consumer, ack_message, task_fun_mq, get_rule_from_mysql
+from common.scheduling_db import MysqlHelper
+from ganggangdouchuan.ganggangdouchuan_recommend import GGDCScheduling
+from common.aliyun_log import AliyunLogger
+
+class GGDCMain:
+    @classmethod
+    def main(cls, log_type, crawler, topic_name, group_id, env):
+        consumer = get_consumer(topic_name, group_id)
+        # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+        # 长轮询时间3秒(最多可设置为30秒)。
+        wait_seconds = 30
+        # 一次最多消费3条(最多可设置为16条)。
+        batch = 1
+        AliyunLogger.logging(
+            code="1000",
+            platform=crawler,
+            mode=log_type,
+            env=env,
+            message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                    f"WaitSeconds:{wait_seconds}\n"
+                    f"TopicName:{topic_name}\n"
+                    f"MQConsumer:{group_id}",
+        )
+        while True:
+            try:
+                # 长轮询消费消息。
+                recv_msgs = consumer.consume_message(batch, wait_seconds)
+                for msg in recv_msgs:
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"Receive\n"
+                        f"MessageId:{msg.message_id}\n"
+                        f"MessageBodyMD5:{msg.message_body_md5}\n"
+                        f"MessageTag:{msg.message_tag}\n"
+                        f"ConsumedTimes:{msg.consumed_times}\n"
+                        f"PublishTime:{msg.publish_time}\n"
+                        f"Body:{msg.message_body}\n"
+                        f"NextConsumeTime:{msg.next_consume_time}\n"
+                        f"ReceiptHandle:{msg.receipt_handle}\n"
+                        f"Properties:{msg.properties}",
+                    )
+                    # ack_mq_message
+                    ack_message(
+                        log_type=log_type,
+                        crawler=crawler,
+                        recv_msgs=recv_msgs,
+                        consumer=consumer,
+                    )
+
+                    # 处理爬虫业务
+                    task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f"调度任务:{task_dict}"
+                    )
+                    rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                    task_id = task_dict["id"]
+                    select_user_sql = (
+                        f"""select * from crawler_user_v3 where task_id={task_id}"""
+                    )
+                    user_list = MysqlHelper.get_values(
+                        log_type, crawler, select_user_sql, env, action=""
+                    )
+                    our_uid_list = []
+                    for user in user_list:
+                        our_uid_list.append(user["uid"])
+                    our_uid = random.choice(our_uid_list)
+                    AliyunLogger.logging(
+                        code="1003",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message="成功获取信息,启动爬虫,开始一轮抓取",
+                    )
+                    new_r = get_rule_from_mysql(
+                        task_id=task_id, log_type=log_type, crawler=crawler, env=env
+                    )
+                    r_d = {}
+                    for item in new_r:
+                        for k, val in item.items():
+                            r_d[k] = val
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f"抓取规则:{r_d}"
+                    )
+                    # 初始化
+                    GGDC = GGDCScheduling(
+                        log_type=log_type,
+                        crawler=crawler,
+                        env=env,
+                        rule_dict=r_d,
+                        our_uid=our_uid,
+                    )
+                    for i in range(50):
+                        if GGDC.download_count >= int(
+                            rule_dict.get("videos_cnt", {}).get("min", 10)
+                        ):
+                            GGDC.download_count = 0
+                            break
+                        else:
+                            GGDC.get_videoList(page_id=i + 1)
+                            time.sleep(60)
+                    AliyunLogger.logging(
+                        code="1004",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message="成功抓取完一轮",
+                    )
+
+            except MQExceptionBase as err:
+                # Topic中没有消息可消费。
+                if err.type == "MessageNotExist":
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"No new message! RequestId:{err.req_id}\n",
+                    )
+                    continue
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Consume Message Fail! Exception:{err}\n",
+                )
+                time.sleep(2)
+                continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    GGDCMain.main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 1 - 0
zhufusonglaoyou/zhufusonglaoyou_recommend/__init__.py

@@ -0,0 +1 @@
+from .ganggangdouchuan_recommend2 import GGDCScheduling

+ 262 - 0
zhufusonglaoyou/zhufusonglaoyou_recommend/zhufusonglaoyou_recommend.py

@@ -0,0 +1,262 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/12/14
+import json
+import os
+import random
+import sys
+import time
+import uuid
+import requests
+from Crypto.Cipher import AES
+from Crypto.Hash import MD5
+from Crypto.Util.Padding import pad, unpad
+from base64 import b64encode, b64decode
+
+from common.mq import MQ
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.aliyun_log import AliyunLogger
+from common.pipeline import PiaoQuanPipeline
+from common.public import clean_title
+
+
+def decrypt(a, e, n):
+    e = MD5.new(e.encode()).hexdigest()
+    key = e[16:].encode()
+    iv = e[:16].encode()
+
+    cipher = AES.new(key, AES.MODE_CBC, iv)
+
+    if n:
+        encrypted_data = b64decode(a)
+        decrypted_data = unpad(cipher.decrypt(encrypted_data), AES.block_size)
+        return decrypted_data.decode()
+    else:
+        padded_data = pad(a.encode(), AES.block_size)
+        encrypted_data = cipher.encrypt(padded_data)
+        return b64encode(encrypted_data).decode()
+
+
+def find_tencent_url(tx_vid):
+    headers = {
+        "Host": "h5vv.video.qq.com",
+        "xweb_xhr": "1",
+        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
+        "Content-Type": "application/x-www-form-urlencoded",
+        "Accept": "*/*",
+        "Sec-Fetch-Site": "cross-site",
+        "Sec-Fetch-Mode": "cors",
+        "Sec-Fetch-Dest": "empty",
+        "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html",
+        "Accept-Language": "en",
+    }
+    video_id = tx_vid
+    url = "https://h5vv.video.qq.com/getinfo?vid={}&platform=101001&charge=0&otype=json&defn=shd".format(video_id)
+    response = requests.get(url, headers=headers)
+    result = json.loads(response.text.replace("QZOutputJson=", "")[:-1])
+    vl = result["vl"]["vi"][0]
+    key = vl["fvkey"]
+    name = vl["fn"]
+    folder = vl["ul"]["ui"][0]["url"]
+    url = folder + name + "?vkey=" + key
+    return url
+
+
+class ZFSLYScheduling:
+    def __init__(self, log_type, crawler, rule_dict, env, our_uid):
+        self.platform = "zhufusonglaoyou"
+        self.log_type = log_type
+        self.crawler = crawler
+        self.rule_dict = rule_dict
+        self.env = env
+        self.our_uid = our_uid
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.download_count = 0
+
+    # 获取视频id_list
+    def get_videoList(self, page_id):
+        time.sleep(random.randint(5, 10))
+        url = "https://zhufusonglaoyou2.mengniu99.com/api/getcatevideos"
+        params = {
+            "cateid": "video",
+            "page": page_id,
+            "timeline": 0,
+            "version": "9.0.2",
+        }
+        headers = {
+            'Host': 'zhufusonglaoyou2.mengniu99.com',
+            'xweb_xhr': '1',
+            'Authorization': 'o7hOQ5XsP-OtIuOK8qAXe368o45E',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+            'Sign': '2c694618acd1218cb0876a825165ca45',
+            'Content-Type': 'application/json',
+            'Accept': '*/*',
+            'Sec-Fetch-Site': 'cross-site',
+            'Sec-Fetch-Mode': 'cors',
+            'Sec-Fetch-Dest': 'empty',
+            'Referer': 'https://servicewechat.com/wx5b38d01fa06bba64/4/page-frame.html',
+            'Accept-Language': 'en-US,en;q=0.9'
+        }
+        while True:
+            try:
+                response = requests.get(url, headers=headers, params=params)
+                decrypted_data = decrypt(
+                    response.json()["data"][:-2], response.json()["_yyy"], True
+                )
+                result = json.loads(decrypted_data)
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=self.crawler,
+                    mode=self.log_type,
+                    env=self.env,
+                    data={},
+                    message="开始抓取第{}页".format(page_id),
+                )
+                break
+            except:
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.crawler,
+                    mode=self.log_type,
+                    env=self.env,
+                    data={},
+                    message="抓取第{}页,未获取数据,编码错误".format(page_id),
+                )
+                Common.logger(self.log_type, self.crawler).info("编码不对,解密失败\n")
+                return
+
+        if "totalCount" not in result:
+            Common.logger(self.log_type, self.crawler).info(
+                f"get_videoList:{response.text}\n"
+            )
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                data={},
+                message="抓取第{}页,未获取数据".format(page_id),
+            )
+            return
+        elif len(result["videos"]) == 0:
+            Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                data={},
+                message="抓取第{}页,没有更多数据啦".format(page_id),
+            )
+            return
+        else:
+            data_list = result["videos"]
+            for index, video_obj in enumerate(data_list):
+                try:
+                    AliyunLogger.logging(
+                        code="1001",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        data={},
+                        message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1),
+                    )
+                    self.process_video_obj(video_obj)
+                except Exception as e:
+                    Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        data=video_obj,
+                        message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format(
+                            e, page_id, index + 1
+                        ),
+                    )
+            AliyunLogger.logging(
+                code="1000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                data={},
+                message="完成抓取第{}页".format(page_id),
+            )
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        video_id = video_obj.get("videoid", 0)
+        video_title = clean_title(video_obj.get("title", "no title"))
+        video_time = video_obj.get("v_time", 0)
+        publish_time_stamp = int(time.time())
+        publish_time_str = time.strftime(
+            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        )
+        user_name = video_obj["nickname"]
+        video_dict = {
+            "video_title": video_title,
+            "video_id": video_id,
+            "duration": video_time,
+            "play_cnt": 0,
+            "like_cnt": 0,
+            "comment_cnt": 0,
+            "share_cnt": 0,
+            "user_name": user_name,
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "update_time_stamp": int(time.time()),
+            "video_width": 0,
+            "video_height": 0,
+            "profile_id": 0,
+            "profile_mid": 0,
+            "cover_url": video_obj["cover"],
+            "session": f"ganggangdouchuan-{int(time.time())}",
+        }
+        video_dict["out_video_id"] = str(video_dict["video_id"])
+        rule_pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.log_type,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id
+        )
+        flag = rule_pipeline.process_item()
+        if flag:
+            video_dict["out_user_id"] = video_dict["profile_id"]
+            video_dict["platform"] = self.crawler
+            video_dict["strategy"] = self.log_type
+            video_dict["width"] = video_dict["video_width"]
+            video_dict["height"] = video_dict["video_height"]
+            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+            video_dict["user_id"] = self.our_uid
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            video_dict["video_url"] = find_tencent_url(video_obj["txvid"])
+            video_dict["avatar_url"] = video_obj["avatarurl"]
+            video_dict["cover_url"] = video_obj["cover"]
+            self.download_count += 1
+            self.mq.send_msg(video_dict)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+
+
+if __name__ == "__main__":
+    ZL =ZFSLYScheduling(
+        log_type="recommend",
+        crawler="zhufusonglaoyou",
+        rule_dict={},
+        our_uid="luojunhuihaoshuai",
+        env="dev",
+    )
+    for i in range(1):
+        ZL.get_videoList(page_id=i + 1)
+        print(ZL.download_count)