Jelajahi Sumber

把pipeline 里面的print取消了, 完善了众妙音信线下爬虫(已经接入了新后台), 新增文件 resend_msg,把 ETL 下载失败的消息重新发送;

罗俊辉 1 tahun lalu
induk
melakukan
7e7bea6ab6

+ 0 - 1
common/pipeline.py

@@ -139,7 +139,6 @@ class PiaoQuanPipeline:
                 message="重复的视频",
                 data=self.item,
             )
-            print("video_repeat")
             return False
         return True
 

+ 61 - 0
resend_msg.py

@@ -0,0 +1,61 @@
+import json
+from common.mq import MQ
+from tqdm import tqdm
+
+if __name__ == "__main__":
+    # path = 'message.txt'
+    # with open(path, "r", encoding="utf-8") as f:
+    #     datas = f.readlines()
+    # for line in tqdm(datas):
+    # # for line in datas:
+    #     video_dict = {}
+    #     msg = json.loads(line)['msg']
+    #     # print(msg)
+    #     platform = json.loads(line)['platform']
+    #     strategy = json.loads(line)['strategy']
+    #     strs = msg.replace("CrawlerEtlParam", "")[1: -1]
+    #     str_list = strs.split(", ")
+    #     str_list = [i.replace("'", "") for i in str_list]
+    #     key_dict = {
+    #         "crawlerRule": "crawler_rule",
+    #         "userId": "user_id",
+    #         "outUserId": "out_user_id",
+    #         "userName": "user_name",
+    #         "avatarUrl": "avatar_url",
+    #         "outVideoId": "out_video_id",
+    #         "videoTitle": "video_title",
+    #         "coverUrl": "cover_url",
+    #         "videoUrl": "video_url",
+    #         "publishTime": "publish_time",
+    #         "playCnt": "play_cnt",
+    #         "likeCnt": "like_cnt",
+    #         "shareCnt": "share_cnt",
+    #         "collectionCnt": "collection_cnt",
+    #         "commentCnt": "comment_cnt",
+    #         "strategyType": "strategy"
+    #     }
+    #     for index, i in enumerate(str_list[:-4]):
+    #         if "=" not in i:
+    #             continue
+    #         else:
+    #             key = i.split("=")[0]
+    #             value = i[len(key) + 1:]
+    #             new_key = key_dict.get(key, key)
+    #             video_dict[new_key] = value.replace("null", "")
+    #
+    #     video_dict['strategy'] = strategy
+    #     video_dict['platform'] = platform
+    #     video_dict['crawler_rule'] = json.dumps({})
+    #     print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+    #     mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+    #     mq.send_msg(video_dict)
+
+    video_dict = {'video_title': '吴尊友因病去世!吴老师,您真的不容易,千言万语,汇成一句话您走好❗️', 'video_id': '5262651713', 'duration': 49, 'play_cnt': 71, 'like_cnt': 0, 'comment_cnt': 0, 'share_cnt': 1, 'user_name': '夏日❤️莲莲', 'publish_time_stamp': 1698398572, 'publish_time_str': '2023-10-27 17:22:52', 'video_width': 537, 'video_height': 954, 'avatar_url': 'https://cdn-xphoto2.xiaoniangao.cn/4987933869?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=wopOmtlcp9tGyWHYW9uy7DIXO%2Bg%3D&x-oss-process=image%2Fresize%2Cw_200%2Ch_200%2Climit_0%2Finterlace%2C1%2Fquality%2Cq_50%2Fcrop%2Cw_200%2Ch_200%2Cg_center%2Fformat%2Cjpg%2Fauto-orient%2C0', 'profile_id': 55888345, 'profile_mid': 185546, 'cover_url': 'https://cdn-xphoto2.xiaoniangao.cn/5262652619?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=qIIRzRICgyv40n3uMFeMwHCY8JY%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0', 'video_url': 'https://cdn-xalbum2.xiaoniangao.cn/5262651713?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=hFGFAB49mmgUYwYcF4679bE%2BgLg%3D', 'session': 'xiaoniangao-author-1698402882'}
+    video_dict['strategy'] = "author"
+    video_dict['platform'] = "xiaoniangao"
+    video_dict['user_id'] = 58528269
+    video_dict['out_video_id'] = video_dict['video_id']
+    print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+    mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+    mq.send_msg(video_dict)
+

+ 146 - 39
zhongmiaoyinxin/zhongmiaoyinxin_recommend/zhongmiaoyinxin_recommend_new.py

@@ -5,16 +5,15 @@ import json
 import os
 import sys
 import time
+import uuid
 from hashlib import md5
 
 from appium import webdriver
 from appium.webdriver.common.touch_action import TouchAction
 from appium.webdriver.extensions.android.nativekey import AndroidKey
 from appium.webdriver.webdriver import WebDriver
-from bs4 import BeautifulSoup
 from selenium.common.exceptions import NoSuchElementException
 from selenium.webdriver.common.by import By
-import multiprocessing
 
 sys.path.append(os.getcwd())
 from common.mq import MQ
@@ -29,7 +28,7 @@ class ZMYXRecommend:
 
     def __init__(self, log_type, crawler, env, rule_dict, our_uid):
         self.mq = None
-        self.platform = "众妙音信"
+        self.platform = "zhongmiaoyinxin"
         self.download_cnt = 0
         self.element_list = []
         self.count = 0
@@ -69,32 +68,46 @@ class ZMYXRecommend:
             # "chromeOptions": {"androidProcess": "com.tencent.mm:tools"},
             'browserName': ''
         }
-        self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        try:
+            self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        except:
+            AliyunLogger.logging(
+                code="3002",
+                platform=self.platform,
+                mode=self.log_type,
+                env=self.env,
+                message="appium 启动异常"
+            )
+            return
         self.driver.implicitly_wait(30)
+        wechat_flag = self.check_wechat()
+        if wechat_flag:
+            size = self.driver.get_window_size()
+            self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
+                              int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
+            time.sleep(1)
+            self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click()
+            AliyunLogger.logging(
+                code="1000",
+                platform=self.platform,
+                env=self.env,
+                mode=self.log_type,
+                message="打开小程序西瓜悦成功"
 
-        for i in range(120):
-            try:
-                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
-                    print("启动微信成功")
-                    break
-                elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
-                    print("发现并关闭系统下拉菜单")
-                    self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click()
-                else:
-                    pass
-            except NoSuchElementException:
-                time.sleep(1)
-
-        size = self.driver.get_window_size()
-        self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
-                          int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
-        time.sleep(1)
-        self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click()
-        print("打开小程序成功")
-        time.sleep(5)
-        self.get_videoList()
-        time.sleep(100)
-        self.driver.quit()
+            )
+            time.sleep(5)
+            self.get_videoList()
+            time.sleep(100)
+            self.driver.quit()
+        else:
+            AliyunLogger.logging(
+                code="3001",
+                platform=self.platform,
+                mode=self.log_type,
+                env=self.env,
+                message="打开微信异常"
+            )
+            return
 
     def search_elements(self, xpath):
         time.sleep(1)
@@ -109,6 +122,35 @@ class ZMYXRecommend:
             except NoSuchElementException:
                 pass
 
+    # 检查是否打开微信
+    def check_wechat(self):
+        for i in range(10):
+            try:
+                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=self.platform,
+                        mode=self.log_type,
+                        env=self.env,
+                        message="启动微信成功"
+                    )
+                    return True
+                elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
+                    print("发现并关闭系统下拉菜单")
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=self.platform,
+                        mode=self.log_type,
+                        env=self.env,
+                        message="第{}次错误打开了通知栏".format(i + 1)
+                    )
+                    self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click()
+                else:
+                    pass
+            except NoSuchElementException:
+                time.sleep(10)
+        return False
+
     def check_to_applet(self):
         while True:
             webview = self.driver.contexts
@@ -138,30 +180,76 @@ class ZMYXRecommend:
         window_size = self.driver.get_window_size()
         TouchAction(self.driver).tap(x=int(window_size['width'] * 0.5), y=int(window_size['height'] * 0.1)).perform()
 
+    def get_video_url(self, video_element):
+        video_element.click()
+        time.sleep(5)
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                video_url_element = self.driver.find_element(By.XPATH, '//wx-video[@class="videoh"]')
+                video_url = video_url_element.get_attribute("src")
+                self.close_ad()
+                return video_url
+            except NoSuchElementException:
+                time.sleep(1)
+
     def get_videoList(self):
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.driver.implicitly_wait(20)
-        print("关闭广告")
         self.close_ad()
-        print("切换到 webview")
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.platform,
+            mode=self.log_type,
+            env=self.env,
+            message="已经关闭广告"
+        )
         self.check_to_applet()
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.platform,
+            mode=self.log_type,
+            env=self.env,
+            message="成功切换到 webview"
+        )
         index = 0
         while True:
             if self.search_elements('//*[@id="scrollContainer"]') is None:
-                print("窗口已销毁")
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="窗口已销毁"
+                )
                 return
-            print("获取视频列表")
             video_elements = self.search_elements('//wx-view[@class="cover"]')
             if video_elements is None:
-                print("视频列表为空列表")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="视频列表为空列表"
+                )
                 return
             video_element_temp = video_elements[index:]
             if len(video_element_temp) == 0:
-                print("视频已经到底")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.log_type,
+                    env=self.env,
+                    message="视频已经到底"
+                )
                 return
             for i, video_element in enumerate(video_element_temp):
                 if video_element is None:
                     return
+                # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引
+                trace_id = self.crawler + str(uuid.uuid1())
                 self.download_cnt += 1
                 self.search_elements('//wx-view[@class="cover"]')
                 time.sleep(3)
@@ -198,20 +286,39 @@ class ZMYXRecommend:
                     mode=self.log_type,
                     item=video_dict,
                     rule_dict=self.rule_dict,
-                    env=self.env
+                    env=self.env,
+                    trace_id=trace_id
                 )
                 flag = pipeline.process_item()
                 if flag:
                     print(video_dict)
-                else:
-                    print("被规则过滤")
-                # self.mq.send_msg(video_dict)
-                # print(video_dict)
+                    video_url = self.get_video_url(video_element)
+                    if video_url is None:
+                        self.driver.press_keycode(AndroidKey.BACK)
+                    else:
+                        video_dict["video_url"] = video_url
+                        video_dict['strategy'] = self.log_type
+                        video_dict["out_user_id"] = ""
+                        video_dict["platform"] = self.crawler
+                        video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+                        video_dict["user_id"] = self.our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        self.mq.send_msg(video_dict)
+                        AliyunLogger.logging(
+                            code="1002",
+                            platform=self.platform,
+                            mode=self.log_type,
+                            env=self.env,
+                            data=video_dict,
+                            trace_id=trace_id,
+                            message="成功发送 MQ 至 ETL",
+                        )
+                        self.driver.press_keycode(AndroidKey.BACK)
 
 
 def run():
     rule_dict1 = {}
-    ZMYXRecommend("recommend", "zhongmiaoyinxin", "dev", rule_dict1, 6267141)
+    ZMYXRecommend("recommend", "zhongmiaoyinxin", "prod", rule_dict1, 6267141)
 
 
 if __name__ == "__main__":