Selaa lähdekoodia

新公众号代码

zhangyong 1 vuosi sitten
vanhempi
commit
dfe8a3720f

+ 418 - 0
gongzhonghao/gongzhonghao_author/gongzhonghao_url_author.py

@@ -0,0 +1,418 @@
+import datetime
+import json
+import os
+import random
+import re
+import sys
+import time
+import uuid
+import requests
+
+
+sys.path.append(os.getcwd())
+from selenium.webdriver import DesiredCapabilities
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.common.by import By
+from selenium import webdriver
+
+from common.scheduling_db import MysqlHelper
+from common.mq import MQ
+from common.common import Common
+from common.public import get_config_from_mysql
+from common import AliyunLogger
+from datetime import datetime
+
+class GongzhonghaoUrlAuthor:
+    platform = "公众号"
+
+    @classmethod
+    def get_all_videos(cls, log_type, crawler, task_dict, rule_dict, user_list, env):
+        total_s = 8 * 60 * 60  # 每个爬虫每天抓取的时间是12h(8h等待+4h抓取)
+        wait_average_time = int((total_s / len(user_list)))
+        for user_dict in user_list:
+            a = user_dict["nick_name"]
+            Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}\n')
+            Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}\n')
+            AliyunLogger.logging(
+                code="1003",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message="开始抓取公众号: {}".format(user_dict["nick_name"]),
+            )
+            try:
+                cls.get_videoList(
+                    log_type=log_type,
+                    crawler=crawler,
+                    task_dict=task_dict,
+                    rule_dict=rule_dict,
+                    user_dict=user_dict,
+                    env=env,
+                )
+                sleep_time = random.randint(
+                    wait_average_time - 120, wait_average_time - 60
+                )
+                Common.logger(log_type, crawler).info("休眠 {} 秒\n".format(sleep_time))
+                Common.logging(log_type, crawler, env, "休眠 {} 秒\n".format(sleep_time))
+                time.sleep(sleep_time)
+            except Exception as e:
+                Common.logger(log_type, crawler).info(
+                    f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n'
+                )
+                Common.logging(
+                    log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n'
+                )
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="抓取公众号: {} 时异常".format(user_dict["nick_name"]),
+                )
+            AliyunLogger.logging(
+                code="1004",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message="完成抓取公众号: {}".format(user_dict["nick_name"]),
+            )
+
+    @classmethod
+    def get_video_url(cls, article_url, env):
+        # 打印请求配置
+        ca = DesiredCapabilities.CHROME
+        ca["goog:loggingPrefs"] = {"performance": "ALL"}
+
+        # 不打开浏览器运行
+        chrome_options = webdriver.ChromeOptions()
+        chrome_options.add_argument("headless")
+        chrome_options.add_argument(
+            f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"
+        )
+        chrome_options.add_argument("--no-sandbox")
+
+        # driver初始化
+        if env == "prod":
+            driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options)
+        else:
+            driver = webdriver.Chrome(
+                desired_capabilities=ca,
+                options=chrome_options,
+                service=Service(
+                    "/Users/tzld/Downloads/chromedriver_mac64/chromedriver"
+                ),
+            )
+
+        driver.implicitly_wait(10)
+        driver.get(article_url)
+        time.sleep(1)
+
+        if (
+                len(
+                    driver.find_elements(
+                        By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]'
+                    )
+                )
+                != 0
+        ):
+            video_url = driver.find_element(
+                By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]'
+            ).get_attribute("src")
+        elif (
+                len(
+                    driver.find_elements(
+                        By.XPATH, '//span[@class="js_tx_video_container"]/*[1]'
+                    )
+                )
+                != 0
+        ):
+            iframe = driver.find_element(
+                By.XPATH, '//span[@class="js_tx_video_container"]/*[1]'
+            ).get_attribute("src")
+            video_id = iframe.split("vid=")[-1].split("&")[0]
+            video_url = cls.get_tencent_video_url(video_id)
+        else:
+            video_url = 0
+        driver.quit()
+        if "mpvideo.qpic.cn" in str(video_url):
+            time.sleep(random.randint(1, 3))
+        return video_url
+
+    @classmethod
+    def get_wechat_gh(cls,content_link):
+        payload = {}
+        headers = {
+            'authority': 'mp.weixin.qq.com',
+            'cookie': 'RK=kLuB01bYUa; ptcz=604f91ae284ed19ddcddda0c052312f03f096ccaa23994b0dc7aac856159a1d9; iip=0; rewardsn=; wxtokenkey=777; pac_uid=1_364544322; pgv_info=ssid=s5424148811; pgv_pvid=8423646400; o_cookie=364544322; wwapp.vid=; wwapp.cst=; wwapp.deviceid=; login_type=wxqrcode; tvfe_boss_uuid=97fe85e41c02f816; ua_id=FS5Q0DLf7QjeurnpAAAAAG3yjawqm2QVreYybCeE-bE=; wxuin=84408959445830; mm_lang=zh_CN; sig=h01d8310f2cf065f1baf641dec377a7cf209b3acd87e6f47e759a8eff53a83be44365d97fd1e013a7d4; uuid=626f86245f04d876538319d2b0ad00a8; xid=69967389815bcec44c878b4ec5f7d0cd; _clck=3891672333|1|fbt|0; qm_authimgs_id=1; qm_verifyimagesession=h014171884333ea321544e60514b652e6ea94b41703bea3db998ba552a68c3bb029407c558b3c658287; qqhxqqcomrouteLine=index; eas_sid=11q6j8S5i49225n2H271a3i9W6; ariaDefaultTheme=undefined; rewardsn=; wxtokenkey=777',
+            'referer': 'https://weixin.sogou.com/link?url=dn9a_-gY295K0Rci_xozVXfdMkSQTLW6cwJThYulHEtVjXrGTiVgS-jLzX0QJsZc9LKGBXLDqu6hcy8W9YK4n1qXa8Fplpd9kqb1on3XUORxrmoftjAEj_GbEcfeUOWbw4CoyV3mfI6CnS5wEgRgloC4xjPDiE6GeHrvBBz3sVJJqopuR3-XqA0a-_G6lnkfM41cvBft-VHFr1bNo2EnzytenNSxFGs7t5_16x7SsuyAXBbT1gj0mwfbwdmomkYm6Wv3FtUFWt3zAjcIGepUqA..&type=2&query=%E7%83%AD%E7%82%B9&token=BE1F165D212650B2999C655336D9D740998E8E7A6475BD69&k=43&h=9',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
+        }
+        try:
+            response = requests.request("GET", content_link, headers=headers, data=payload)
+            wechat_gh = re.search(r'var user_name = "(.*?)"', response.text).group(1)
+            return wechat_gh
+        except Exception:
+            return None
+
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}" ; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    # 获取文章列表
+    @classmethod
+    def get_videoList(cls, log_type, crawler, task_dict, rule_dict, user_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        wechat_gh = cls.get_wechat_gh(user_dict["nick_name"])
+        if None == wechat_gh:
+            Common.logging(
+                log_type,
+                crawler,
+                env,
+                f"获取用主页id为空{task_dict}\n",
+            )
+            AliyunLogger.logging(
+                code="2004",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"获取用主页id为空{task_dict}",
+            )
+            return
+        time.sleep(1)
+        url = "http://61.48.133.26:30001/GetGh_Doc"
+        payload = json.dumps({
+            "appid": wechat_gh,
+            "decode": "1"
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        r = requests.request("POST", url, headers=headers, data=payload)
+        if "list" not in r.json():
+            Common.logger(log_type, crawler).warning(
+                f"status_code:{r.status_code}, get_videoList:{r.text}\n"
+            )
+            Common.logging(
+                log_type,
+                crawler,
+                env,
+                f"status_code:{r.status_code}, get_videoList:{r.text}\n",
+            )
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"status_code:{r.status_code}, get_videoList:{r.text}\n",
+            )
+            time.sleep(60 * 15)
+            return
+        if len(r.json()["list"]) == 0:
+            Common.logger(log_type, crawler).info("没有更多视频了\n")
+            Common.logging(log_type, crawler, env, "没有更多视频了\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message="没有更多视频了\n",
+            )
+            return
+        else:
+            user_name = r.json().get("gh_name")
+            app_msg_list = r.json()["list"]
+            for article in app_msg_list:
+                try:
+                    trace_id = crawler + str(uuid.uuid1())
+                    publish_time_str = article.get("published_time", 0)
+                    date_format = "%Y-%m-%d %H:%M:%S"
+
+                    date_time_obj = datetime.strptime(publish_time_str, date_format)
+
+                    publish_time_stamp = int(date_time_obj.timestamp())
+
+                    article_url = article.get("url", "")
+                    video_dict = {
+                        "video_id": article.get("aid", ""),
+                        "video_title": article.get("title", "")
+                            .replace(" ", "")
+                            .replace('"', "")
+                            .replace("'", ""),
+                        "publish_time_stamp": publish_time_stamp,
+                        "publish_time_str": publish_time_str,
+                        "user_name": user_name,
+                        "play_cnt": 0,
+                        "comment_cnt": 0,
+                        "like_cnt": 0,
+                        "share_cnt": 0,
+                        "user_id": user_dict["uid"],
+                        "avatar_url": user_dict["avatar_url"],
+                        "cover_url": article.get("head_pic", ""),
+                        "article_url": article.get("head_pic", ""),
+                        "video_url": cls.get_video_url(article_url, env),
+                        "session": f"gongzhonghao-author1-{int(time.time())}",
+                    }
+                    for k, v in video_dict.items():
+                        Common.logger(log_type, crawler).info(f"{k}:{v}")
+                    Common.logging(
+                        log_type, crawler, env, f"video_dict:{video_dict}"
+                    )
+                    AliyunLogger.logging(
+                        code="1001",
+                        trace_id=trace_id,
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message="扫描到一条视频",
+                        data=video_dict,
+                    )
+                    if (
+                            int(time.time()) - publish_time_stamp
+                            > 3600
+                            * 24
+                            * int(rule_dict.get("period", {}).get("max", 1000))
+                    ):
+                        Common.logger(log_type, crawler).info(
+                            f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n"
+                        )
+                        Common.logging(
+                            log_type,
+                            crawler,
+                            env,
+                            f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n",
+                        )
+                        AliyunLogger.logging(
+                            code="2004",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            data=video_dict,
+                            message="发布时间超过{}天".format(
+                                int(rule_dict.get("period", {}).get("max", 1000))
+                            ),
+                        )
+                        return
+
+                    if (
+                            video_dict["article_url"] == 0
+                            or video_dict["video_url"] == 0
+                    ):
+                        Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n")
+                        Common.logging(log_type, crawler, env, "文章涉嫌违反相关法律法规和政策\n")
+                        AliyunLogger.logging(
+                            code="2005",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            data=video_dict,
+                            message="无效文章或视频",
+                        )
+                    # 标题敏感词过滤
+                    elif (
+                            any(
+                                str(word)
+                                if str(word) in video_dict["video_title"]
+                                else False
+                                for word in get_config_from_mysql(
+                                    log_type=log_type,
+                                    source=crawler,
+                                    env=env,
+                                    text="filter",
+                                    action="",
+                                )
+                            )
+                            is True
+                    ):
+                        Common.logger(log_type, crawler).info("标题已中过滤词\n")
+                        Common.logging(log_type, crawler, env, "标题已中过滤词\n")
+                        AliyunLogger.logging(
+                            code="2003",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            data=video_dict,
+                            message="标题已中过滤词\n",
+                        )
+                    # 已下载判断
+                    elif (
+                            cls.repeat_video(
+                                log_type,
+                                crawler,
+                                video_dict["video_id"],
+                                video_dict["video_title"],
+                                env,
+                            )
+                            != 0
+                    ):
+                        Common.logger(log_type, crawler).info("视频已下载\n")
+                        Common.logging(log_type, crawler, env, "视频已下载\n")
+                        AliyunLogger.logging(
+                            code="2002",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            data=video_dict,
+                            message="视频已下载",
+                        )
+                    else:
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = 0
+                        video_dict["height"] = 0
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = user_dict[
+                            "uid"
+                        ]  # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中)
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        mq.send_msg(video_dict)
+                        AliyunLogger.logging(
+                            code="1002",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            data=video_dict,
+                            message="成功发送 MQ 至 ETL",
+                        )
+                        time.sleep(random.randint(1, 8))
+                except Exception as e:
+                    Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
+                    Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"抓取单条视频异常:{e}\n",
+                    )
+            Common.logger(log_type, crawler).info("休眠 60 秒\n")
+            Common.logging(log_type, crawler, env, "休眠 60 秒\n")
+            time.sleep(60)
+
+
+if __name__ == '__main__':
+    log_type = "author"
+    crawler = "gongzhonghao"
+    env = "dev"
+    task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author',
+                 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}},
+                 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0,
+                 'taskName': '公众号账号', 'updateTime': 1688456876643}
+    rule_dict = {"period": {"min": 1, "max": 1}, "duration": {"min": 20, "max": 2700}}
+    task_id = 54
+    select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+    user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+    GongzhonghaoUrlAuthor.get_all_videos(log_type=log_type,
+                                         crawler=crawler,
+                                         task_dict=task_dict,
+                                         rule_dict=rule_dict,
+                                         user_list=user_list,
+                                         env=env)

+ 203 - 0
gongzhonghao/gongzhonghao_main/run_newgzh_author.py

@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/30
+import argparse
+from multiprocessing import Process
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from gongzhonghao.gongzhonghao_author.gongzhonghao_url_author import GongzhonghaoUrlAuthor
+
+
+def get_author_videos(log_type, crawler, token_index, task_dict, rule_dict, user_list, env):
+    Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+    Common.logger(log_type, crawler).info(f"user_list:{user_list}")
+    Common.logging(log_type, crawler, env, f"user_list:{user_list}")
+    GongzhonghaoUrlAuthor.get_all_videos(log_type=log_type,
+                                      crawler=crawler,
+                                      task_dict=task_dict,
+                                      token_index=token_index,
+                                      rule_dict=rule_dict,
+                                      user_list=user_list,
+                                      env=env)
+    # Common.del_logs(log_type, crawler)
+    Common.logger(log_type, crawler).info('抓取一轮结束\n')
+    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'WaitSeconds:{wait_seconds}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}')
+    Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                           f'WaitSeconds:{wait_seconds}\n'
+                                           f'TopicName:{topic_name}\n'
+                                           f'MQConsumer:{group_id}')
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                f'WaitSeconds:{wait_seconds}\n'
+                f'TopicName:{topic_name}\n'
+                f'MQConsumer:{group_id}'
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                Common.logger(log_type, crawler).info(f"Receive\n"
+                                                      f"MessageId:{msg.message_id}\n"
+                                                      f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                      f"MessageTag:{msg.message_tag}\n"
+                                                      f"ConsumedTimes:{msg.consumed_times}\n"
+                                                      f"PublishTime:{msg.publish_time}\n"
+                                                      f"Body:{msg.message_body}\n"
+                                                      f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                      f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                      f"Properties:{msg.properties}")
+                Common.logging(log_type, crawler, env, f"Receive\n"
+                                                       f"MessageId:{msg.message_id}\n"
+                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                       f"MessageTag:{msg.message_tag}\n"
+                                                       f"ConsumedTimes:{msg.consumed_times}\n"
+                                                       f"PublishTime:{msg.publish_time}\n"
+                                                       f"Body:{msg.message_body}\n"
+                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                       f"Properties:{msg.properties}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                            f"MessageId:{msg.message_id}\n"
+                            f"MessageBodyMD5:{msg.message_body_md5}\n"
+                            f"MessageTag:{msg.message_tag}\n"
+                            f"ConsumedTimes:{msg.consumed_times}\n"
+                            f"PublishTime:{msg.publish_time}\n"
+                            f"Body:{msg.message_body}\n"
+                            f"NextConsumeTime:{msg.next_consume_time}\n"
+                            f"ReceiptHandle:{msg.receipt_handle}\n"
+                            f"Properties:{msg.properties}"
+                )
+
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)['task_dict']
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"调度任务:{task_dict}"
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
+                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n"
+                )
+
+                # 解析 user_list
+                task_id = task_dict['id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+
+                # 计算启动脚本数 crawler_num
+                user_num = len(user_list)
+                chunk_size = 100  # 每个进程处理的用户数量
+                crawler_num = int(user_num // chunk_size)  # 向下取整
+                if user_num % chunk_size != 0:
+                    crawler_num += 1
+                Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
+                Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务"
+
+                )
+                # 多进程并行抓取
+                processes = []
+                for i in range(crawler_num):
+                    start = i * chunk_size
+                    end = min((i + 1) * chunk_size, user_num + 1)
+                    process = Process(target=get_author_videos, args=(
+                        f"{log_type}{i + 1}", crawler, i + 1, task_dict, rule_dict, user_list[start:end], env))
+                    process.start()
+                    processes.append(process)
+
+                for process in processes:
+                    process.join()
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n"
+                )
+                continue
+
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n"
+            )
+
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(log_type=args.log_type,
+         crawler=args.crawler,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
+         env=args.env)