Kaynağa Gözat

1. 视频号代码开发完成

罗俊辉 1 yıl önce
ebeveyn
işleme
cd64ae2988

+ 1 - 0
shipinhao/shipinhao_author/__init__.py

@@ -0,0 +1 @@
+from .shipinhao_scheduling import ShiPinHaoAccount

+ 34 - 34
shipinhao/shipinhao_author/shipinhao_author_test.py

@@ -16,29 +16,29 @@ class SphAuthor:
             "buffer": "",
             "buffer": "",
             "query": self.name,
             "query": self.name,
             "count": "21",
             "count": "21",
-            "token":  self.token,
+            "token": self.token,
             "lang": "zh_CN",
             "lang": "zh_CN",
             "f": "json",
             "f": "json",
-            "ajax": "1"
+            "ajax": "1",
         }
         }
         headers = {
         headers = {
-            'authority': 'mp.weixin.qq.com',
-            'accept': '*/*',
-            'accept-language': 'en,zh-CN;q=0.9,zh;q=0.8',
-            'cookie': self.cookie,
-            'referer': 'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token=1678001807&lang=zh_CN',
-            'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
-            'sec-ch-ua-mobile': '?0',
-            'sec-ch-ua-platform': '"macOS"',
-            'sec-fetch-dest': 'empty',
-            'sec-fetch-mode': 'cors',
-            'sec-fetch-site': 'same-origin',
-            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
-            'x-requested-with': 'XMLHttpRequest'
+            "authority": "mp.weixin.qq.com",
+            "accept": "*/*",
+            "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+            "cookie": self.cookie,
+            "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token=1678001807&lang=zh_CN",
+            "sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
+            "sec-ch-ua-mobile": "?0",
+            "sec-ch-ua-platform": '"macOS"',
+            "sec-fetch-dest": "empty",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-site": "same-origin",
+            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+            "x-requested-with": "XMLHttpRequest",
         }
         }
         response = requests.request("GET", url, headers=headers, params=params)
         response = requests.request("GET", url, headers=headers, params=params)
         user_list = response.json()
         user_list = response.json()
-        target_user = user_list['acct_list'][0]  # 可以优化
+        target_user = user_list["acct_list"][0]  # 可以优化
         return target_user
         return target_user
 
 
     def get_video_list(self):
     def get_video_list(self):
@@ -46,39 +46,39 @@ class SphAuthor:
         url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
         url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
         params = {
         params = {
             "action": "get_feed_list",
             "action": "get_feed_list",
-            "username": user_info['username'],
+            "username": user_info["username"],
             "buffer": "",
             "buffer": "",
             "count": "15",
             "count": "15",
             "scene": "1",
             "scene": "1",
             "token": self.token,
             "token": self.token,
             "lang": "zh_CN",
             "lang": "zh_CN",
             "f": "json",
             "f": "json",
-            "ajax": "1"
+            "ajax": "1",
         }
         }
         headers = {
         headers = {
-            'authority': 'mp.weixin.qq.com',
-            'accept': '*/*',
-            'accept-language': 'en,zh-CN;q=0.9,zh;q=0.8',
-            'cookie': self.cookie,
-            'referer': 'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token=1678001807&lang=zh_CN',
-            'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
-            'sec-ch-ua-mobile': '?0',
-            'sec-ch-ua-platform': '"macOS"',
-            'sec-fetch-dest': 'empty',
-            'sec-fetch-mode': 'cors',
-            'sec-fetch-site': 'same-origin',
-            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
-            'x-requested-with': 'XMLHttpRequest'
+            "authority": "mp.weixin.qq.com",
+            "accept": "*/*",
+            "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+            "cookie": self.cookie,
+            "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token=1678001807&lang=zh_CN",
+            "sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
+            "sec-ch-ua-mobile": "?0",
+            "sec-ch-ua-platform": '"macOS"',
+            "sec-fetch-dest": "empty",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-site": "same-origin",
+            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+            "x-requested-with": "XMLHttpRequest",
         }
         }
 
 
         response = requests.request("GET", url, headers=headers, params=params)
         response = requests.request("GET", url, headers=headers, params=params)
         video_list = response.json()
         video_list = response.json()
         # print(json.dumps(video_list, ensure_ascii=False, indent=4))
         # print(json.dumps(video_list, ensure_ascii=False, indent=4))
         # print(len(video_list['list']))
         # print(len(video_list['list']))
-        for obj in video_list['list']:
-            print(obj['desc'])
+        for obj in video_list["list"]:
+            print(obj["desc"])
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     Sph = SphAuthor("心煤")
     Sph = SphAuthor("心煤")
-    Sph.get_video_list()
+    Sph.get_video_list()

+ 183 - 46
shipinhao/shipinhao_author/shipinhao_scheduling.py

@@ -1,11 +1,21 @@
+import os
+import sys
+import datetime
+import time
+import uuid
+
 import requests
 import requests
-from common.aliyun_log import AliyunLogger
+
+sys.path.append(os.getcwd())
+from common import PiaoQuanPipeline, AliyunLogger
+from common.feishu import Feishu
 from common.db import MysqlHelper
 from common.db import MysqlHelper
+from common.mq import MQ
 
 
 
 
 def find_target_user(name, user_list):
 def find_target_user(name, user_list):
     for obj in user_list:
     for obj in user_list:
-        if obj['nickname'] == name:
+        if obj["nickname"] == name:
             return obj
             return obj
         else:
         else:
             continue
             continue
@@ -13,21 +23,33 @@ def find_target_user(name, user_list):
 
 
 
 
 class ShiPinHaoAccount:
 class ShiPinHaoAccount:
-    def __init__(self, token, cookie, account_name, platform, mode, rule_dict, env):
-        self.token = token
-        self.cookie = cookie
-        self.account_name = account_name
+    def __init__(self, platform, mode, rule_dict, user_dict, env):
+        # self.token = token
+        # self.cookie = cookie
+        self.account_name = user_dict["name"]
         self.platform = platform
         self.platform = platform
         self.mode = mode
         self.mode = mode
         self.rule_dict = rule_dict
         self.rule_dict = rule_dict
+        self.user_dict = user_dict
         self.env = env
         self.env = env
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+
+    def get_token_from_mysql(self):
+        self.token = ""
+        self.cookie = ""
 
 
     def get_history_id(self):
     def get_history_id(self):
         """
         """
         从数据库表中读取 id
         从数据库表中读取 id
         """
         """
         select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
         select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
-        name_id = MysqlHelper.get_values(log_type=self.mode, crawler=self.platform, sql=select_user_sql, env=self.env, machine="")
+        name_id = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
+        )
         if name_id:
         if name_id:
             return name_id[0]
             return name_id[0]
         else:
         else:
@@ -49,45 +71,53 @@ class ShiPinHaoAccount:
                 "token": self.token,
                 "token": self.token,
                 "lang": "zh_CN",
                 "lang": "zh_CN",
                 "f": "json",
                 "f": "json",
-                "ajax": "1"
+                "ajax": "1",
             }
             }
             headers = {
             headers = {
-                'authority': 'mp.weixin.qq.com',
-                'accept': '*/*',
-                'accept-language': 'en,zh-CN;q=0.9,zh;q=0.8',
-                'cookie': self.cookie,
-                'referer': 'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN'.format(
-                    self.token),
-                'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
-                'x-requested-with': 'XMLHttpRequest'
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
             }
             }
             response = requests.request("GET", url, headers=headers, params=params)
             response = requests.request("GET", url, headers=headers, params=params)
-            user_list = response.json()['acct_list']
+            user_list = response.json()["acct_list"]
             target_user = find_target_user(name=self.account_name, user_list=user_list)
             target_user = find_target_user(name=self.account_name, user_list=user_list)
             # 写入 MySql 数据库
             # 写入 MySql 数据库
             if target_user:
             if target_user:
                 update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
                 update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
                 print(update_sql)
                 print(update_sql)
-                MysqlHelper.update_values(log_type=self.mode, crawler=self.platform, sql=update_sql, env=self.env, machine="")
-                return target_user['username']
+                MysqlHelper.update_values(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    sql=update_sql,
+                    env=self.env,
+                    machine="",
+                )
+                return target_user["username"]
             else:
             else:
                 return False
                 return False
 
 
     def get_account_videos(self):
     def get_account_videos(self):
         user_id = self.get_account_id()
         user_id = self.get_account_id()
-        buffer = ""
         if user_id:
         if user_id:
             url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
             url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
             headers = {
             headers = {
-                'authority': 'mp.weixin.qq.com',
-                'accept': '*/*',
-                'accept-language': 'en,zh-CN;q=0.9,zh;q=0.8',
-                'cookie': self.cookie,
-                'referer': 'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN'.format(
-                    self.token),
-                'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
-                'x-requested-with': 'XMLHttpRequest'
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
             }
             }
+            buffer = ""  # 翻页指示器
             while True:
             while True:
                 params = {
                 params = {
                     "action": "get_feed_list",
                     "action": "get_feed_list",
@@ -98,29 +128,137 @@ class ShiPinHaoAccount:
                     "token": self.token,
                     "token": self.token,
                     "lang": "zh_CN",
                     "lang": "zh_CN",
                     "f": "json",
                     "f": "json",
-                    "ajax": "1"
+                    "ajax": "1",
                 }
                 }
                 response = requests.request("GET", url, headers=headers, params=params)
                 response = requests.request("GET", url, headers=headers, params=params)
-                video_list = response.json()
-                buffer = video_list['last_buff']
-                # print(json.dumps(video_list, ensure_ascii=False, indent=4))
-                # print(len(video_list['list']))
-                for obj in video_list['list']:
-                    print(obj['desc'])
+                res_json = response.json()
+                # 开始判断视频是否有信息,是否频控
+                if res_json["base_resp"]["err_msg"] == "invalid session":
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
+                    )
+                    if 20 >= datetime.datetime.now().hour >= 10:
+                        Feishu.bot(
+                            log_type=self.mode,
+                            crawler=self.platform,
+                            text="视频号Token 过期啦"
+                            # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
+                        )
+                    time.sleep(60 * 15)
+                    continue
+                if res_json["base_resp"]["err_msg"] == "freq control":
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
+                    )
+                    if 20 >= datetime.datetime.now().hour >= 10:
+                        Feishu.bot(
+                            log_type=self.mode,
+                            crawler=self.platform,
+                            text="视频号Token 过期啦"
+                            # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
+                        )
+                    time.sleep(60 * 15)
+                    continue
+                if not res_json["list"]:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="没有更多视频了",
+                    )
+                    return
+                else:
+                    buffer = res_json["last_buff"]
+                    for obj in res_json["list"]:
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                message="扫描到一条视频",
+                                env=self.env,
+                                data=obj,
+                            )
+                            repeat_flag = self.process_video_obj(obj)
+                            if not repeat_flag:
+                                return
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message=f"抓取单条视频异常:{e}\n",
+                            )
         else:
         else:
-            print("Did not find any user info")
+            AliyunLogger.logging(
+                code="3000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="{}\t获取 id 失败".format(self.account_name),
+            )
 
 
     def process_video_obj(self, video_obj):
     def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
         video_dict = {
         video_dict = {
-            "video_id": video_obj['nonce_id'],
-            "video_title": video_obj['desc'],
-            "cover_url": video_obj['media']["cover_url"],
-            "video_url": video_obj['media']['video_url'],
-            "avatar_url": video_obj['head_url'],
-            "width": video_obj['media']['width'],
-            "height": video_obj['media']['height']
+            "video_id": video_obj["nonce_id"],
+            "video_title": video_obj["desc"],
+            "publish_time_stamp": int(time.time()),
+            "publish_time_str": time.strftime(
+                "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
+            ),
+            "play_cnt": 0,
+            "comment_cnt": 0,
+            "like_cnt": 0,
+            "share_cnt": 0,
+            "user_id": self.user_dict["user_id"],
+            "cover_url": video_obj["media"]["cover_url"],
+            "video_url": video_obj["media"]["video_url"],
+            "avatar_url": video_obj["head_url"],
+            "width": video_obj["media"]["width"],
+            "height": video_obj["media"]["height"],
+            "duration": video_obj["media"]["video_play_len_s"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "crawler_rule": self.rule_dict,
+            "session": f"shipinhao-author-{int(time.time())}",
         }
         }
-        print(self.platform)
+        # 无更新时间,去重即可
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        if not pipeline.repeat_video():
+            return False
+        else:
+            video_dict["out_user_id"] = video_dict["user_id"]
+            video_dict["user_id"] = self.user_dict["uid"]
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            self.mq.send_msg(video_dict)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+        return True
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
@@ -133,7 +271,6 @@ if __name__ == "__main__":
         platform="shipinhao",
         platform="shipinhao",
         mode="author",
         mode="author",
         rule_dict={},
         rule_dict={},
-        env="prod"
+        env="prod",
     )
     )
     SP.get_account_videos()
     SP.get_account_videos()
-

+ 167 - 0
shipinhao/shipinhao_main/run_shipinhao_author.py

@@ -0,0 +1,167 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/30
+import argparse
+from multiprocessing import Process
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from shipinhao.shipinhao_author import ShiPinHaoAccount
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                )
+                for user_dict in user_list:
+                    try:
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            message="开始抓取视频号{}".format(user_dict["name"]),
+                        )
+                        # 初始化
+                        SPHA = ShiPinHaoAccount(
+                            platform=crawler,
+                            mode=log_type,
+                            rule_dict=rule_dict,
+                            user_dict=user_dict,
+                            env=env,
+                        )
+                        SPHA.get_account_videos()
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            message="完成抓取视频号{}".format(user_dict["name"]),
+                        )
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=crawler,
+                            mode=log_type,
+                            message="抓取视频号{}出现问题, 报错为{}".format(user_dict["name"], e),
+                        )
+
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )