Przeglądaj źródła

Merge remote-tracking branch 'origin/master'

zhangyong 1 rok temu
rodzic
commit
2ab3dbee91

+ 0 - 1
common/pipeline.py

@@ -139,7 +139,6 @@ class PiaoQuanPipeline:
                 message="重复的视频",
                 data=self.item,
             )
-            print("video_repeat")
             return False
         return True
 

+ 61 - 0
resend_msg.py

@@ -0,0 +1,61 @@
+import json
+from common.mq import MQ
+from tqdm import tqdm
+
+if __name__ == "__main__":
+    # path = 'message.txt'
+    # with open(path, "r", encoding="utf-8") as f:
+    #     datas = f.readlines()
+    # for line in tqdm(datas):
+    # # for line in datas:
+    #     video_dict = {}
+    #     msg = json.loads(line)['msg']
+    #     # print(msg)
+    #     platform = json.loads(line)['platform']
+    #     strategy = json.loads(line)['strategy']
+    #     strs = msg.replace("CrawlerEtlParam", "")[1: -1]
+    #     str_list = strs.split(", ")
+    #     str_list = [i.replace("'", "") for i in str_list]
+    #     key_dict = {
+    #         "crawlerRule": "crawler_rule",
+    #         "userId": "user_id",
+    #         "outUserId": "out_user_id",
+    #         "userName": "user_name",
+    #         "avatarUrl": "avatar_url",
+    #         "outVideoId": "out_video_id",
+    #         "videoTitle": "video_title",
+    #         "coverUrl": "cover_url",
+    #         "videoUrl": "video_url",
+    #         "publishTime": "publish_time",
+    #         "playCnt": "play_cnt",
+    #         "likeCnt": "like_cnt",
+    #         "shareCnt": "share_cnt",
+    #         "collectionCnt": "collection_cnt",
+    #         "commentCnt": "comment_cnt",
+    #         "strategyType": "strategy"
+    #     }
+    #     for index, i in enumerate(str_list[:-4]):
+    #         if "=" not in i:
+    #             continue
+    #         else:
+    #             key = i.split("=")[0]
+    #             value = i[len(key) + 1:]
+    #             new_key = key_dict.get(key, key)
+    #             video_dict[new_key] = value.replace("null", "")
+    #
+    #     video_dict['strategy'] = strategy
+    #     video_dict['platform'] = platform
+    #     video_dict['crawler_rule'] = json.dumps({})
+    #     print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+    #     mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+    #     mq.send_msg(video_dict)
+
+    video_dict = {'video_title': '吴尊友因病去世!吴老师,您真的不容易,千言万语,汇成一句话您走好❗️', 'video_id': '5262651713', 'duration': 49, 'play_cnt': 71, 'like_cnt': 0, 'comment_cnt': 0, 'share_cnt': 1, 'user_name': '夏日❤️莲莲', 'publish_time_stamp': 1698398572, 'publish_time_str': '2023-10-27 17:22:52', 'video_width': 537, 'video_height': 954, 'avatar_url': 'https://cdn-xphoto2.xiaoniangao.cn/4987933869?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=wopOmtlcp9tGyWHYW9uy7DIXO%2Bg%3D&x-oss-process=image%2Fresize%2Cw_200%2Ch_200%2Climit_0%2Finterlace%2C1%2Fquality%2Cq_50%2Fcrop%2Cw_200%2Ch_200%2Cg_center%2Fformat%2Cjpg%2Fauto-orient%2C0', 'profile_id': 55888345, 'profile_mid': 185546, 'cover_url': 'https://cdn-xphoto2.xiaoniangao.cn/5262652619?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=qIIRzRICgyv40n3uMFeMwHCY8JY%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0', 'video_url': 'https://cdn-xalbum2.xiaoniangao.cn/5262651713?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=hFGFAB49mmgUYwYcF4679bE%2BgLg%3D', 'session': 'xiaoniangao-author-1698402882'}
+    video_dict['strategy'] = "author"
+    video_dict['platform'] = "xiaoniangao"
+    video_dict['user_id'] = 58528269
+    video_dict['out_video_id'] = video_dict['video_id']
+    print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+    mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+    mq.send_msg(video_dict)
+

+ 142 - 0
zhongmiaoyinxin/zhongmiaoyinxin_main/run_zmyx_recommend.py

@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/10/23
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common.aliyun_log import AliyunLogger
+from zhongmiaoyinxin.zhongmiaoyinxin_recommend import ZMYXRecommend
+
+
+def main(my_platform, mode, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=my_platform,
+        mode=mode,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=mode,
+                    crawler=my_platform,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"调度任务:{task_dict}"
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"抓取规则:{rule_dict}\n"
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    mode, my_platform, select_user_sql, env, action=""
+                )
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message="成功获取信息,启动爬虫,开始一轮抓取",
+                )
+                ZMYX = ZMYXRecommend(
+                    crawler=my_platform,
+                    log_type=mode,
+                    env=env,
+                    rule_dict=rule_dict,
+                    our_uid=our_uid,
+                )
+                AliyunLogger.logging(
+                    code="1004",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message="成功抓取完一轮",
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="1000",
+                platform=my_platform,
+                mode=mode,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        my_platform=args.crawler,
+        mode=args.log_type,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 1 - 3
zhongmiaoyinxin/zhongmiaoyinxin_recommend/__init__.py

@@ -1,3 +1 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/4/17
+from .zhongmiaoyinxin_recommend_new import ZMYXRecommend

+ 146 - 39
zhongmiaoyinxin/zhongmiaoyinxin_recommend/zhongmiaoyinxin_recommend_new.py

@@ -5,16 +5,15 @@ import json
 import os
 import sys
 import time
+import uuid
 from hashlib import md5
 
 from appium import webdriver
 from appium.webdriver.common.touch_action import TouchAction
 from appium.webdriver.extensions.android.nativekey import AndroidKey
 from appium.webdriver.webdriver import WebDriver
-from bs4 import BeautifulSoup
 from selenium.common.exceptions import NoSuchElementException
 from selenium.webdriver.common.by import By
-import multiprocessing
 
 sys.path.append(os.getcwd())
 from common.mq import MQ
@@ -29,7 +28,7 @@ class ZMYXRecommend:
 
     def __init__(self, log_type, crawler, env, rule_dict, our_uid):
         self.mq = None
-        self.platform = "众妙音信"
+        self.platform = "zhongmiaoyinxin"
         self.download_cnt = 0
         self.element_list = []
         self.count = 0
@@ -69,32 +68,46 @@ class ZMYXRecommend:
             # "chromeOptions": {"androidProcess": "com.tencent.mm:tools"},
             'browserName': ''
         }
-        self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        try:
+            self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        except:
+            AliyunLogger.logging(
+                code="3002",
+                platform=self.platform,
+                mode=self.log_type,
+                env=self.env,
+                message="appium 启动异常"
+            )
+            return
         self.driver.implicitly_wait(30)
+        wechat_flag = self.check_wechat()
+        if wechat_flag:
+            size = self.driver.get_window_size()
+            self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
+                              int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
+            time.sleep(1)
+            self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click()
+            AliyunLogger.logging(
+                code="1000",
+                platform=self.platform,
+                env=self.env,
+                mode=self.log_type,
+                message="打开小程序西瓜悦成功"
 
-        for i in range(120):
-            try:
-                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
-                    print("启动微信成功")
-                    break
-                elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
-                    print("发现并关闭系统下拉菜单")
-                    self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click()
-                else:
-                    pass
-            except NoSuchElementException:
-                time.sleep(1)
-
-        size = self.driver.get_window_size()
-        self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
-                          int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
-        time.sleep(1)
-        self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click()
-        print("打开小程序成功")
-        time.sleep(5)
-        self.get_videoList()
-        time.sleep(100)
-        self.driver.quit()
+            )
+            time.sleep(5)
+            self.get_videoList()
+            time.sleep(100)
+            self.driver.quit()
+        else:
+            AliyunLogger.logging(
+                code="3001",
+                platform=self.platform,
+                mode=self.log_type,
+                env=self.env,
+                message="打开微信异常"
+            )
+            return
 
     def search_elements(self, xpath):
         time.sleep(1)
@@ -109,6 +122,35 @@ class ZMYXRecommend:
             except NoSuchElementException:
                 pass
 
+    # 检查是否打开微信
+    def check_wechat(self):
+        for i in range(10):
+            try:
+                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=self.platform,
+                        mode=self.log_type,
+                        env=self.env,
+                        message="启动微信成功"
+                    )
+                    return True
+                elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
+                    print("发现并关闭系统下拉菜单")
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=self.platform,
+                        mode=self.log_type,
+                        env=self.env,
+                        message="第{}次错误打开了通知栏".format(i + 1)
+                    )
+                    self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click()
+                else:
+                    pass
+            except NoSuchElementException:
+                time.sleep(10)
+        return False
+
     def check_to_applet(self):
         while True:
             webview = self.driver.contexts
@@ -138,30 +180,76 @@ class ZMYXRecommend:
         window_size = self.driver.get_window_size()
         TouchAction(self.driver).tap(x=int(window_size['width'] * 0.5), y=int(window_size['height'] * 0.1)).perform()
 
+    def get_video_url(self, video_element):
+        video_element.click()
+        time.sleep(5)
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                video_url_element = self.driver.find_element(By.XPATH, '//wx-video[@class="videoh"]')
+                video_url = video_url_element.get_attribute("src")
+                self.close_ad()
+                return video_url
+            except NoSuchElementException:
+                time.sleep(1)
+
     def get_videoList(self):
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.driver.implicitly_wait(20)
-        print("关闭广告")
         self.close_ad()
-        print("切换到 webview")
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.platform,
+            mode=self.log_type,
+            env=self.env,
+            message="已经关闭广告"
+        )
         self.check_to_applet()
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.platform,
+            mode=self.log_type,
+            env=self.env,
+            message="成功切换到 webview"
+        )
         index = 0
         while True:
             if self.search_elements('//*[@id="scrollContainer"]') is None:
-                print("窗口已销毁")
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="窗口已销毁"
+                )
                 return
-            print("获取视频列表")
             video_elements = self.search_elements('//wx-view[@class="cover"]')
             if video_elements is None:
-                print("视频列表为空列表")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="视频列表为空列表"
+                )
                 return
             video_element_temp = video_elements[index:]
             if len(video_element_temp) == 0:
-                print("视频已经到底")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="视频已经到底"
+                )
                 return
             for i, video_element in enumerate(video_element_temp):
                 if video_element is None:
                     return
+                # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引
+                trace_id = self.crawler + str(uuid.uuid1())
                 self.download_cnt += 1
                 self.search_elements('//wx-view[@class="cover"]')
                 time.sleep(3)
@@ -198,20 +286,39 @@ class ZMYXRecommend:
                     mode=self.log_type,
                     item=video_dict,
                     rule_dict=self.rule_dict,
-                    env=self.env
+                    env=self.env,
+                    trace_id=trace_id
                 )
                 flag = pipeline.process_item()
                 if flag:
                     print(video_dict)
-                else:
-                    print("被规则过滤")
-                # self.mq.send_msg(video_dict)
-                # print(video_dict)
+                    video_url = self.get_video_url(video_element)
+                    if video_url is None:
+                        self.driver.press_keycode(AndroidKey.BACK)
+                    else:
+                        video_dict["video_url"] = video_url
+                        video_dict['strategy'] = self.log_type
+                        video_dict["out_user_id"] = ""
+                        video_dict["platform"] = self.crawler
+                        video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+                        video_dict["user_id"] = self.our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        self.mq.send_msg(video_dict)
+                        AliyunLogger.logging(
+                            code="1002",
+                            platform=self.platform,
+                            mode=self.log_type,
+                            env=self.env,
+                            data=video_dict,
+                            trace_id=trace_id,
+                            message="成功发送 MQ 至 ETL",
+                        )
+                        self.driver.press_keycode(AndroidKey.BACK)
 
 
 def run():
     rule_dict1 = {}
-    ZMYXRecommend("recommend", "zhongmiaoyinxin", "dev", rule_dict1, 6267141)
+    ZMYXRecommend("recommend", "zhongmiaoyinxin", "prod", rule_dict1, 6267141)
 
 
 if __name__ == "__main__":