Bladeren bron

好看视频代码正式上线

罗俊辉 1 jaar geleden
bovenliggende
commit
de7f90e054

+ 1 - 0
haokanshipin/haokanshipin_author/__init__.py

@@ -0,0 +1 @@
+from hksp_author import HaoKanVideoAccount

+ 18 - 1
haokanshipin/haokanshipin_author/hksp_author.py

@@ -31,7 +31,7 @@ def tunnel_proxies():
 
 class HaoKanVideoAccount(object):
     def __init__(self, platform, mode, rule_dict, user_dict, env):
-        self.account_id = user_dict["link"]
+        self.account_id = user_dict["link"].split("/")[-1]
         self.platform = platform
         self.mode = mode
         self.rule_dict = rule_dict
@@ -232,6 +232,23 @@ class HaoKanVideoAccount(object):
                 trace_id=trace_id,
             )
 
+    def schedule(self):
+        """
+        small: 只抓取小视频
+        big: 只抓取视频
+        both: 抓取小视频和视频, 两者之间休息 5 分钟
+        """
+        flag = self.user_dict['link'].split("_")[0]
+        match flag:
+            case "big":
+                self.get_video_list()
+            case "small":
+                self.get_tiny_video_list()
+            case "both":
+                self.get_tiny_video_list()
+                time.sleep(300)
+                self.get_video_list()
+
 
 if __name__ == "__main__":
     select_user_sql = f"""select * from crawler_user_v3 where source = 'haokanshipin';"""

+ 170 - 0
haokanshipin/haokanshipin_main/run_hksp_author.py

@@ -0,0 +1,170 @@
+import argparse
+import time
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from haokanshipin.haokanshipin_author import HaoKanVideoAccount
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                for user_dict in user_list:
+                    time.sleep(random.randint(10, 20))
+                    try:
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="开始抓取好看视频账号 id:{}".format(user_dict["link"]),
+                        )
+                        # 初始化
+                        Baidu = HaoKanVideoAccount(
+                            platform=crawler,
+                            mode=log_type,
+                            rule_dict=rule_dict,
+                            user_dict=user_dict,
+                            env=env,
+                        )
+                        Baidu.schedule()
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="完成抓取好看视频账号:{}".format(user_dict["link"]),
+                        )
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="抓取好看视频:{}出现问题, 报错为{}".format(user_dict["link"], e),
+                        )
+
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )