wangkun 2 年 前
コミット
5259ff045d

+ 35 - 0
common/public.py

@@ -104,6 +104,41 @@ def task_fun(task_str):
     return task_dict
 
 
+def get_consumer(topic_name, group_id):
+    # 初始化client。
+    mq_client = MQClient(
+        # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
+        "${HTTP_ENDPOINT}",
+        # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
+        "${ACCESS_KEY}",
+        # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
+        "${SECRET_KEY}"
+    )
+    # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
+    # topic_name = "${TOPIC}"
+    topic_name = f"{topic_name}"
+    # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
+    # group_id = "${GROUP_ID}"
+    group_id = f"{group_id}"
+    # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
+    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
+    instance_id = "${INSTANCE_ID}"
+
+    consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
+    return consumer
+
+
+def ack_message(log_type, crawler, recv_msgs, consumer):
+    # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
+    # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
+    try:
+        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
+        consumer.ack_message(receipt_handle_list)
+        Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
+    except MQExceptionBase as err:
+        Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
+
+
 def download_rule(log_type, crawler, video_dict, rule_dict):
     """
     下载视频的基本规则

+ 10 - 1
gongzhonghao/gongzhonghao_author/gongzhonghao1_author.py

@@ -237,6 +237,12 @@ class GongzhonghaoAuthor1:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 10)
                 continue
+            if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
+                Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                if 20 >= datetime.datetime.now().hour >= 10:
+                    Feishu.bot(log_type, crawler,f"公众号:{user_dict['user_name']}\n抓取异常, 请检查该公众号\n")
+                return
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
                 Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
@@ -437,6 +443,7 @@ class GongzhonghaoAuthor1:
     def get_all_videos(cls, log_type, crawler, rule_dict, env):
         while True:
             sheetid = "Bzv72P"
+            # sheetid = "SHRnwl"
             user_sheet = Feishu.get_values_batch(log_type, crawler, sheetid)
             if user_sheet is None:
                 Common.logger(log_type, crawler).warning(f"user_sheet:{user_sheet}, 2秒后重试")
@@ -463,6 +470,7 @@ class GongzhonghaoAuthor1:
                     time.sleep(60)
                 except Exception as e:
                     Common.logger(log_type, crawler).info(f'抓取{user_dict["user_name"]}公众号时异常:{e}\n')
+            break
 
 
 if __name__ == "__main__":
@@ -470,5 +478,6 @@ if __name__ == "__main__":
     # print(GongzhonghaoAuthor1.get_users("author", "gongzhonghao", "Bzv72P", "dev"))
     # print(get_config_from_mysql("author", "gongzhonghao", "dev", "filter", action=""))
     # print(title_like("author", "gongzhonghao", "公众号", "123", "dev"))
-    print(GongzhonghaoAuthor1.get_user_info("author", "gongzhonghao", "幸福花朵", "dev"))
+    # print(GongzhonghaoAuthor1.get_user_info("author", "gongzhonghao", "幸福花朵", "dev"))
+    GongzhonghaoAuthor1.get_all_videos("author", "gongzhonghao", {}, "dev")
     pass

+ 7 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao2_author.py

@@ -236,6 +236,12 @@ class GongzhonghaoAuthor2:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 10)
                 continue
+            if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
+                Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                if 20 >= datetime.datetime.now().hour >= 10:
+                    Feishu.bot(log_type, crawler,f"公众号:{user_dict['user_name']}\n抓取异常, 请检查该公众号\n")
+                return
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
                 Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
@@ -468,6 +474,7 @@ class GongzhonghaoAuthor2:
                     time.sleep(60)
                 except Exception as e:
                     Common.logger(log_type, crawler).info(f'抓取{user_dict["user_name"]}公众号时异常:{e}\n')
+            break
 
 
 if __name__ == "__main__":

+ 7 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao3_author.py

@@ -241,6 +241,12 @@ class GongzhonghaoAuthor3:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 10)
                 continue
+            if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
+                Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                if 20 >= datetime.datetime.now().hour >= 10:
+                    Feishu.bot(log_type, crawler,f"公众号:{user_dict['user_name']}\n抓取异常, 请检查该公众号\n")
+                return
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
                 Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
@@ -474,6 +480,7 @@ class GongzhonghaoAuthor3:
                     time.sleep(60)
                 except Exception as e:
                     Common.logger(log_type, crawler).info(f'抓取{user_dict["user_name"]}公众号时异常:{e}\n')
+            break
 
 
 if __name__ == "__main__":

+ 7 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao4_author.py

@@ -238,6 +238,12 @@ class GongzhonghaoAuthor4:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 10)
                 continue
+            if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
+                Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                if 20 >= datetime.datetime.now().hour >= 10:
+                    Feishu.bot(log_type, crawler,f"公众号:{user_dict['user_name']}\n抓取异常, 请检查该公众号\n")
+                return
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
                 Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
@@ -469,6 +475,7 @@ class GongzhonghaoAuthor4:
                     time.sleep(60)
                 except Exception as e:
                     Common.logger(log_type, crawler).info(f'抓取{user_dict["user_name"]}公众号时异常:{e}\n')
+            break
 
 if __name__ == "__main__":
     GongzhonghaoAuthor4.get_token("author", "gongzhonghao", "dev")

+ 7 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao5_author.py

@@ -239,6 +239,12 @@ class GongzhonghaoAuthor5:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 10)
                 continue
+            if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
+                Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                if 20 >= datetime.datetime.now().hour >= 10:
+                    Feishu.bot(log_type, crawler,f"公众号:{user_dict['user_name']}\n抓取异常, 请检查该公众号\n")
+                return
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}")
                 Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
@@ -470,6 +476,7 @@ class GongzhonghaoAuthor5:
                     time.sleep(60)
                 except Exception as e:
                     Common.logger(log_type, crawler).info(f'抓取{user_dict["user_name"]}公众号时异常:{e}\n')
+            break
 
 
 if __name__ == "__main__":

+ 16 - 1
requirements.txt

@@ -1,13 +1,28 @@
-Appium_Python_Client==2.10.0
+# pip3 install Appium-Python-Client
+Appium_Python_Client==2.10.1
+# 翻墙, pip3 install git+https://github.com/pyatom/pyatom/
 atomac==1.2.0
+# pip3 install ffmpeg-python
 ffmpeg==1.4
+# pip3 install loguru
 loguru==0.6.0
+# pip3 install lxml
 lxml==4.9.1
+# pip3 install mq_http_sdk, 若您使用的SDK版本为v1.0.0,您需要安装大于等于2.5且小于3.0版本的Python。若您使用的SDK版本大于v1.0.0,您需要安装2.5及以上版本的Python。
+mq_http_sdk==1.0.3
+# sudo pip3 install oss2
 oss2==2.15.0
+# pip3 install psutil
 psutil==5.9.2
+# pip3 install PyExecJS
 PyExecJS==1.5.1
+# pip3 install PyMysql
 PyMySQL==1.0.2
+# pip3 install redis
 redis==4.5.1
+# pip3 install requests
 requests==2.27.1
+# pip3 install selenium
 selenium==4.9.1
+# pip3 install urllib3
 urllib3==1.26.9

+ 42 - 75
suisuiniannianyingfuqi/suisuiniannianyingfuqi_main/run_suisuiniannianyingfuqi_recommend_mq.py

@@ -4,40 +4,27 @@
 import argparse
 import random
 from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
 sys.path.append(os.getcwd())
-from common.public import task_fun
+from common.public import task_fun, get_consumer, ack_message
 from common.common import Common
 from common.scheduling_db import MysqlHelper
 from suisuiniannianyingfuqi.suisuiniannianyingfuqi_recommend.suisuiniannianyingfuqi_recommend_scheduling import \
     SuisuiniannianyingfuqiRecommendScheduling
 
 
-def main(log_type, crawler, task, env):
-    # 初始化client。
-    mq_client = MQClient(
-        # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
-        "${HTTP_ENDPOINT}",
-        # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
-        "${ACCESS_KEY}",
-        # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
-        "${SECRET_KEY}"
-    )
-    # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
-    topic_name = "${TOPIC}"
-    # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
-    group_id = "${GROUP_ID}"
-    # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
-    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
-    instance_id = "${INSTANCE_ID}"
-
-    consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
-
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
     # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
     # 长轮询时间3秒(最多可设置为30秒)。
     wait_seconds = 3
     # 一次最多消费3条(最多可设置为16条)。
     batch = 1
-    Common.logger(log_type, crawler).info("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}\n'
+                                          f'WaitSeconds:{wait_seconds}\n')
     while True:
         try:
             # 长轮询消费消息。
@@ -53,71 +40,51 @@ def main(log_type, crawler, task, env):
                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
                                                       f"Properties:{msg.properties}\n")
-                print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
-                                  \nMessageTag: %s\nConsumedTimes: %s \
-                                  \nPublishTime: %s\nBody: %s \
-                                  \nNextConsumeTime: %s \
-                                  \nReceiptHandle: %s \
-                                  \nProperties: %s\n" % \
-                       (msg.message_id, msg.message_body_md5,
-                        msg.message_tag, msg.consumed_times,
-                        msg.publish_time, msg.message_body,
-                        msg.next_consume_time, msg.receipt_handle, msg.properties)))
-        except MQExceptionBase as e:
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 处理爬虫业务
+                task_dict = task_fun(task)['task_dict']
+                rule_dict = task_fun(task)['rule_dict']
+                task_id = task_dict['task_id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                Common.logger(log_type, crawler).info(f"调度任务:\n{task_dict}")
+                Common.logger(log_type, crawler).info(f"抓取规则:\n{rule_dict}")
+                Common.logger(log_type, crawler).info(f"用户列表:\n{user_list}")
+                Common.logger(log_type, crawler).info('开始抓取 岁岁年年迎福气小程序\n')
+                SuisuiniannianyingfuqiRecommendScheduling.get_videoList(log_type=log_type,
+                                                                        crawler=crawler,
+                                                                        our_uid=our_uid,
+                                                                        rule_dict=rule_dict,
+                                                                        env=env)
+                Common.del_logs(log_type, crawler)
+                Common.logger(log_type, crawler).info('抓取完一轮\n')
+
+        except MQExceptionBase as err:
             # Topic中没有消息可消费。
-            if e.type == "MessageNotExist":
-                print(("No new message! RequestId: %s" % e.req_id))
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
                 continue
 
-            print(("Consume Message Fail! Exception:%s\n" % e))
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
             time.sleep(2)
             continue
 
-        # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
-        # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
-        try:
-            receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
-            consumer.ack_message(receipt_handle_list)
-            print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
-        except MQExceptionBase as e:
-            print(("\nAk Message Fail! Exception:%s" % e))
-            # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
-            if e.sub_errors:
-                for sub_error in e.sub_errors:
-                    print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
-                           (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
-
-    task_dict = task_fun(task)['task_dict']
-    rule_dict = task_fun(task)['rule_dict']
-    task_id = task_dict['task_id']
-    select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
-    user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
-    our_uid_list = []
-    for user in user_list:
-        our_uid_list.append(user["uid"])
-    our_uid = random.choice(our_uid_list)
-    Common.logger(log_type, crawler).info(f"调度任务:\n{task_dict}")
-    Common.logger(log_type, crawler).info(f"抓取规则:\n{rule_dict}")
-    Common.logger(log_type, crawler).info(f"用户列表:\n{user_list}")
-    Common.logger(log_type, crawler).info('开始抓取 岁岁年年迎福气小程序\n')
-    SuisuiniannianyingfuqiRecommendScheduling.get_videoList(log_type=log_type,
-                                                            crawler=crawler,
-                                                            our_uid=our_uid,
-                                                            rule_dict=rule_dict,
-                                                            env=env)
-    Common.del_logs(log_type, crawler)
-    Common.logger(log_type, crawler).info('抓取完一轮\n')
-
-
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()  ## 新建参数解释器对象
     parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
     parser.add_argument('--crawler')  ## 添加参数
-    parser.add_argument('--task')  ## 添加参数
-    # parser.add_argument('--oss_endpoint')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
     parser.add_argument('--env')  ## 添加参数
     args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
     main(log_type=args.log_type,
          crawler=args.crawler,
-         task=args.task,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
          env=args.env)