瀏覽代碼

小年糕+小程序代码

罗俊辉 1 年之前
父節點
當前提交
210390cdff

+ 115 - 0
xiaoniangao/xiaoniangao_main/run_xng_plus.py

@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/9/27
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import get_consumer, ack_message, task_fun_mq
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from xiaoniangao.xiaoniangao_plus.xiaoniangao_plus_scheduling import XiaoNianGaoPlusRecommend
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'WaitSeconds:{wait_seconds}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}')
+    Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                           f'WaitSeconds:{wait_seconds}\n'
+                                           f'TopicName:{topic_name}\n'
+                                           f'MQConsumer:{group_id}')
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                xng_play_start_time = int(time.time())
+                Common.logger(log_type, crawler).info(f"Receive\n"
+                                                      f"MessageId:{msg.message_id}\n"
+                                                      f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                      f"MessageTag:{msg.message_tag}\n"
+                                                      f"ConsumedTimes:{msg.consumed_times}\n"
+                                                      f"PublishTime:{msg.publish_time}\n"
+                                                      f"Body:{msg.message_body}\n"
+                                                      f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                      f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                      f"Properties:{msg.properties}")
+                Common.logging(log_type, crawler, env, f"Receive\n"
+                                                       f"MessageId:{msg.message_id}\n"
+                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                       f"MessageTag:{msg.message_tag}\n"
+                                                       f"ConsumedTimes:{msg.consumed_times}\n"
+                                                       f"PublishTime:{msg.publish_time}\n"
+                                                       f"Body:{msg.message_body}\n"
+                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                       f"Properties:{msg.properties}")
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 处理爬虫业务
+                task_dict = task_fun_mq(msg.message_body)['task_dict']
+                rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                task_id = task_dict['id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
+                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                # Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+                Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                XiaoNianGaoPlusRecommend.start_wechat(log_type=log_type,
+                                                      crawler=crawler,
+                                                      rule_dict=rule_dict,
+                                                      our_uid=our_uid,
+                                                      env=env)
+                # Common.del_logs(log_type, crawler)
+                Common.logger(log_type, crawler).info('抓取一轮结束\n')
+                Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+                xng_play_end_time = int(time.time())
+                xng_play_duration = xng_play_start_time - xng_play_end_time
+                Common.logger(log_type, crawler).info(f"duration {xng_play_duration}")
+                Common.logging(log_type, crawler, env, f"duration {xng_play_duration}")
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                continue
+
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(log_type=args.log_type,
+         crawler=args.crawler,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
+         env=args.env)

+ 27 - 0
xiaoniangao/xiaoniangao_main/run_xng_plus_dev.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/14
+import os
+import sys
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from xiaoniangao.xiaoniangao_plus.xiaoniangao_plus_scheduling import XiaoNianGaoPlusRecommend
+
+
+def xiaoniangao_plus_recommend_main(log_type, crawler, env):
+    Common.logger(log_type, crawler).info(f'开始抓取:小年糕播放\n')
+    Common.logging(log_type, crawler, env, "开始抓取:小年糕播放\n")
+    XiaoNianGaoPlusRecommend.start_wechat(log_type=log_type,
+                                          crawler=crawler,
+                                          rule_dict={"duration": {"min": 40, "max": 0},
+                                                     "play_cnt": {"min": 20000, "max": 0},
+                                                     "period": {"min": 60, "max": 60}},
+                                          our_uid=6267140,
+                                          env=env)
+    Common.logger(log_type, crawler).info("抓取一轮结束\n")
+    Common.logging(log_type, crawler, env, "抓取一轮结束\n")
+
+
+if __name__ == "__main__":
+    xiaoniangao_plus_recommend_main("recommend", "xiaoniangao", "dev")

+ 42 - 46
xiaoniangao/xiaoniangao_plus/xiaoniangao_plus_scheduling.py

@@ -35,7 +35,8 @@ class XiaoNianGaoPlusRecommend:
             chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver"
             chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver"
 
 
         Common.logger(log_type, crawler).info("启动微信")
         Common.logger(log_type, crawler).info("启动微信")
-        # Common.logging(log_type, crawler, env, '启动微信')
+        Common.logging(log_type, crawler, env, '启动微信')
+        # 微信的配置文件
         caps = {
         caps = {
             "platformName": "Android",
             "platformName": "Android",
             "devicesName": "Android",
             "devicesName": "Android",
@@ -85,11 +86,7 @@ class XiaoNianGaoPlusRecommend:
         # Common.logging(log_type, crawler, env, '打开小程序"小年糕+"')
         # Common.logging(log_type, crawler, env, '打开小程序"小年糕+"')
         driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
         driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
         time.sleep(5)
         time.sleep(5)
-
-        # print(driver.contexts)
-
         cls.get_videoList(log_type, crawler, driver, env, rule_dict, our_uid)
         cls.get_videoList(log_type, crawler, driver, env, rule_dict, our_uid)
-
         time.sleep(1)
         time.sleep(1)
         driver.quit()
         driver.quit()
 
 
@@ -110,52 +107,51 @@ class XiaoNianGaoPlusRecommend:
     @classmethod
     @classmethod
     def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath):
     def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath):
         time.sleep(1)
         time.sleep(1)
-        # print(driver.)
         webViews = driver.contexts
         webViews = driver.contexts
-        print(webViews)
-        # Common.logger(log_type, crawler).info(f"webViews:{webViews}")
-        # Common.logging(log_type, crawler, env, f"webViews:{webViews}")
-        driver.switch_to.context(webViews[2])
+        driver.switch_to.context(webViews[-1])
         windowHandles = driver.window_handles
         windowHandles = driver.window_handles
         for handle in windowHandles:
         for handle in windowHandles:
             driver.switch_to.window(handle)
             driver.switch_to.window(handle)
             time.sleep(1)
             time.sleep(1)
             try:
             try:
                 driver.find_element(By.XPATH, xpath)
                 driver.find_element(By.XPATH, xpath)
-                Common.logger(log_type, crawler).info("切换到小程序成功\n")
-                Common.logging(log_type, crawler, env, '切换到小程序成功\n')
+                Common.logger(log_type, crawler).info("切换到WebView成功\n")
+                Common.logging(log_type, crawler, env, '切换到WebView成功\n')
                 return
                 return
             except NoSuchElementException:
             except NoSuchElementException:
                 time.sleep(1)
                 time.sleep(1)
 
 
     @classmethod
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform in ("众妙音信", "刚刚都传", "吉祥幸福", "知青天天看", "zhufuquanzi", "祝福圈子", "haitunzhufu", "海豚祝福") and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("众妙音信", "刚刚都传", "吉祥幸福", "知青天天看", "zhufuquanzi", "祝福圈子", "haitunzhufu", "海豚祝福", "小年糕") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
         return len(repeat_video)
 
 
     @classmethod
     @classmethod
     def swipe_up(cls, driver: WebDriver):
     def swipe_up(cls, driver: WebDriver):
-        cls.search_elements(driver, '//*[@class="bless--list"]')
+        cls.search_elements(driver, '//*[@class="list-list--list"]')
         size = driver.get_window_size()
         size = driver.get_window_size()
         driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8),
         driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8),
                      int(size["width"] * 0.5), int(size["height"] * 0.4), 200)
                      int(size["width"] * 0.5), int(size["height"] * 0.4), 200)
 
 
     @classmethod
     @classmethod
-    def get_video_url(cls, log_type, crawler, driver: WebDriver, video_title_element):
+    def get_video_url(cls, log_type, crawler, driver: WebDriver, video_title_element, env):
         for i in range(3):
         for i in range(3):
-            cls.search_elements(driver, '//*[@class="bless--list"]')
+            cls.search_elements(driver, '//*[@class="list-list--list"]')
             Common.logger(log_type, crawler).info(f"video_title_element:{video_title_element[0]}")
             Common.logger(log_type, crawler).info(f"video_title_element:{video_title_element[0]}")
             time.sleep(1)
             time.sleep(1)
             Common.logger(log_type, crawler).info("滑动标题至可见状态")
             Common.logger(log_type, crawler).info("滑动标题至可见状态")
-            driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0])
+            driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});",
+                                  video_title_element[0])
             time.sleep(3)
             time.sleep(3)
             Common.logger(log_type, crawler).info("点击标题")
             Common.logger(log_type, crawler).info("点击标题")
             video_title_element[0].click()
             video_title_element[0].click()
-            # driver.execute_script("arguments[0].click();", video_title_element[0])
+            cls.check_to_applet(log_type=log_type, crawler=crawler, driver=driver, env=env,
+                                xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]')
             Common.logger(log_type, crawler).info("点击标题完成")
             Common.logger(log_type, crawler).info("点击标题完成")
-            time.sleep(1)
-            video_url_elements = cls.search_elements(driver, '//*[@class="index--video-item index--video"]')
+            time.sleep(10)
+            video_url_elements = cls.search_elements(driver,
+                                                     '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]')
             if video_url_elements:
             if video_url_elements:
                 return video_url_elements[0].get_attribute("src")
                 return video_url_elements[0].get_attribute("src")
 
 
@@ -163,31 +159,24 @@ class XiaoNianGaoPlusRecommend:
     def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict, our_uid):
     def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict, our_uid):
         mq = MQ(topic_name="topic_crawler_etl_" + env)
         mq = MQ(topic_name="topic_crawler_etl_" + env)
         driver.implicitly_wait(20)
         driver.implicitly_wait(20)
+        # 切换到 web_view
         cls.check_to_applet(log_type=log_type, crawler=crawler, env=env, driver=driver,
         cls.check_to_applet(log_type=log_type, crawler=crawler, env=env, driver=driver,
-                            xpath='//*[@class="tags--tag tags--tag-0 tags--checked"]')
+                            xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
         time.sleep(1)
         time.sleep(1)
-
         page = 0
         page = 0
         while True:
         while True:
-            # if cls.search_elements(driver, '//*[@class="bless--list"]') is None:
-            #     Common.logger(log_type, crawler).info("窗口已销毁\n")
-            #     Common.logging(log_type, crawler, env, '窗口已销毁\n')
-            #     cls.i = 0
-            #     cls.download_cnt = 0
-            #     cls.element_list = []
-            #     return
-            #
-            # cls.swipe_up(driver)
-
+            if cls.search_elements(driver, '//*[@class="list-list--list"]') is None:
+                Common.logger(log_type, crawler).info("窗口已销毁\n")
+                Common.logging(log_type, crawler, env, '窗口已销毁\n')
+                cls.i = 0
+                cls.download_cnt = 0
+                cls.element_list = []
+                return
 
 
+            cls.swipe_up(driver)
             page_source = driver.page_source
             page_source = driver.page_source
-            # print(page_source)
-            # return
             soup = BeautifulSoup(page_source, 'html.parser')
             soup = BeautifulSoup(page_source, 'html.parser')
             soup.prettify()
             soup.prettify()
-            print(soup)
-            return
-
             video_list_elements = soup.findAll("wx-view", class_="expose--adapt-parent")
             video_list_elements = soup.findAll("wx-view", class_="expose--adapt-parent")
             # video_list_elements 有,cls.element_list 中没有的元素
             # video_list_elements 有,cls.element_list 中没有的元素
             video_list_elements = list(set(video_list_elements).difference(set(cls.element_list)))
             video_list_elements = list(set(video_list_elements).difference(set(cls.element_list)))
@@ -198,7 +187,7 @@ class XiaoNianGaoPlusRecommend:
 
 
             if len(video_list_elements) == 0:
             if len(video_list_elements) == 0:
                 for i in range(10):
                 for i in range(10):
-                    Common.logger(log_type, crawler).info(f"向上滑动第{i+1}次")
+                    Common.logger(log_type, crawler).info(f"向上滑动第{i + 1}次")
                     cls.swipe_up(driver)
                     cls.swipe_up(driver)
                     time.sleep(0.5)
                     time.sleep(0.5)
                 continue
                 continue
@@ -215,16 +204,22 @@ class XiaoNianGaoPlusRecommend:
                     cls.i += 1
                     cls.i += 1
                     Common.logger(log_type, crawler).info(f"第{cls.i}条视频")
                     Common.logger(log_type, crawler).info(f"第{cls.i}条视频")
                     Common.logging(log_type, crawler, env, f"第{cls.i}条视频")
                     Common.logging(log_type, crawler, env, f"第{cls.i}条视频")
-
+                    # 标题
                     video_title = video_element.find("wx-view", class_="dynamic--title").text
                     video_title = video_element.find("wx-view", class_="dynamic--title").text
+                    # 播放量字符串
                     play_str = video_element.find("wx-view", class_="dynamic--views").text
                     play_str = video_element.find("wx-view", class_="dynamic--views").text
-                    like_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[0].text
-                    comment_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[1].text
+                    info_list = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")
+                    # 点赞数量
+                    like_str = info_list[1].text
+                    # 评论数量
+                    comment_str = info_list[2].text
+                    # 视频时长
                     duration_str = video_element.find("wx-view", class_="dynamic--duration").text
                     duration_str = video_element.find("wx-view", class_="dynamic--duration").text
                     user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
                     user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
+                    # 头像 URL
                     avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
                     avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
+                    # 封面 URL
                     cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]
                     cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]
-
                     play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
                     play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
                     duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip())
                     duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip())
                     if "点赞" in like_str:
                     if "点赞" in like_str:
@@ -300,7 +295,7 @@ class XiaoNianGaoPlusRecommend:
                             continue
                             continue
                         Common.logger(log_type, crawler).info("点击标题,进入视频详情页")
                         Common.logger(log_type, crawler).info("点击标题,进入视频详情页")
                         Common.logging(log_type, crawler, env, "点击标题,进入视频详情页")
                         Common.logging(log_type, crawler, env, "点击标题,进入视频详情页")
-                        video_url = cls.get_video_url(log_type, crawler, driver, video_title_element)
+                        video_url = cls.get_video_url(log_type, crawler, driver, video_title_element, env=env)
                         if video_url is None:
                         if video_url is None:
                             Common.logger(log_type, crawler).info("未获取到视频播放地址\n")
                             Common.logger(log_type, crawler).info("未获取到视频播放地址\n")
                             driver.press_keycode(AndroidKey.BACK)
                             driver.press_keycode(AndroidKey.BACK)
@@ -316,6 +311,7 @@ class XiaoNianGaoPlusRecommend:
                         video_dict["user_id"] = our_uid
                         video_dict["user_id"] = our_uid
                         video_dict["publish_time"] = video_dict["publish_time_str"]
                         video_dict["publish_time"] = video_dict["publish_time_str"]
                         mq.send_msg(video_dict)
                         mq.send_msg(video_dict)
+                        # print(video_dict)
                         cls.download_cnt += 1
                         cls.download_cnt += 1
                         driver.press_keycode(AndroidKey.BACK)
                         driver.press_keycode(AndroidKey.BACK)
                         time.sleep(5)
                         time.sleep(5)
@@ -332,7 +328,7 @@ class XiaoNianGaoPlusRecommend:
 if __name__ == "__main__":
 if __name__ == "__main__":
     rule_dict1 = {"period": {"min": 365, "max": 365},
     rule_dict1 = {"period": {"min": 365, "max": 365},
                   "duration": {"min": 30, "max": 1800},
                   "duration": {"min": 30, "max": 1800},
-                  "favorite_cnt": {"min": 5000, "max": 0},
-                  "videos_cnt": {"min": 10, "max": 20},
-                  "share_cnt": {"min": 1000, "max": 0}}
+                  "favorite_cnt": {"min": 0, "max": 0},
+                  "videos_cnt": {"min": 100, "max": 0},
+                  "share_cnt": {"min": 0, "max": 0}}
     XiaoNianGaoPlusRecommend.start_wechat("recommend", "xiaoniangao", "dev", rule_dict1, 6267141)
     XiaoNianGaoPlusRecommend.start_wechat("recommend", "xiaoniangao", "dev", rule_dict1, 6267141)