# -*- coding: utf-8 -*- # @Author: wang # @Time: 2023/9/6 import json import os import sys import time from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.common import Common from common.mq import MQ from common.public import download_rule, get_config_from_mysql from common.scheduling_db import MysqlHelper class ZFQZRecommend: platform = "祝福圈子" download_cnt = 0 i = 0 @classmethod def start_wechat(cls, log_type, crawler, env, rule_dict, our_uid): if env == "dev": chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver" else: chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" Common.logger(log_type, crawler).info("启动微信") Common.logging(log_type, crawler, env, '启动微信') caps = { "platformName": "Android", "devicesName": "Android", "platformVersion": "11", # "udid": "emulator-5554", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": "true", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) driver.implicitly_wait(30) for i in range(120): try: if driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): Common.logger(log_type, crawler).info("微信启动成功") Common.logging(log_type, crawler, env, '微信启动成功') break elif driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): Common.logger(log_type, crawler).info("发现并关闭系统下拉菜单") Common.logging(log_type, crawler, env, '发现并关闭系统下拉菜单') driver.find_element(By.ID, "com.android.system:id/dismiss_view").click() else: pass except NoSuchElementException: time.sleep(1) Common.logger(log_type, crawler).info("下滑,展示小程序选择面板") Common.logging(log_type, crawler, env, '下滑,展示小程序选择面板') size = driver.get_window_size() driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(5) Common.logger(log_type, crawler).info('打开小程序"祝福圈子"') Common.logging(log_type, crawler, env, '打开小程序"祝福圈子"') driver.find_elements(By.XPATH, '//*[@text="祝福圈子"]')[-1].click() time.sleep(10) cls.get_videoList(log_type, crawler, driver, env, rule_dict, our_uid) time.sleep(3) driver.quit() @classmethod def search_elements(cls, driver: WebDriver, xpath): time.sleep(1) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: elements = driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass @classmethod def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath): time.sleep(1) webViews = driver.contexts Common.logger(log_type, crawler).info(f"webViews:{webViews}") Common.logging(log_type, crawler, env, f"webViews:{webViews}") driver.switch_to.context(webViews[1]) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: driver.find_element(By.XPATH, xpath) Common.logger(log_type, crawler).info("切换到小程序成功\n") Common.logging(log_type, crawler, env, '切换到小程序成功\n') return except NoSuchElementException: time.sleep(1) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("众妙音信", "刚刚都传", "吉祥幸福", "知青天天看", "zhufuquanzi", "祝福圈子", "haitunzhufu", "海豚祝福") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict, our_uid): mq = MQ(topic_name="topic_crawler_etl_" + env) driver.implicitly_wait(20) cls.check_to_applet(log_type=log_type, crawler=crawler, env=env, driver=driver, xpath='//*[@class="tags--tag tags--tag-0 tags--checked"]') time.sleep(3) index = 0 while True: if cls.search_elements(driver, '//*[@class="bless--list"]') is None: Common.logger(log_type, crawler).info("窗口已销毁\n") Common.logging(log_type, crawler, env, '窗口已销毁\n') return video_list_elements = cls.search_elements(driver, '//*[@is="pages/discover/components/bless/dynamic/dynamic"]') if video_list_elements is None: Common.logger(log_type, crawler).warning(f"当前视频列表为空:{video_list_elements}") Common.logging(log_type, crawler, env, f"当前视频列表为空:{video_list_elements}") return video_list = video_list_elements[index:] if len(video_list) == 0 or video_list is None: Common.logger(log_type, crawler).info("到底啦~~~~~~~~~~\n") Common.logging(log_type, crawler, env, "到底啦~~~~~~~~~~\n") return for i, video_element in enumerate(video_list): try: if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 10)): Common.logger(log_type, crawler).info(f"本轮已抓取视频数:{cls.download_cnt}") Common.logging(log_type, crawler, env, f"本轮已抓取视频数:{cls.download_cnt}") cls.download_cnt = 0 cls.i = 0 return if video_element is None: Common.logger(log_type, crawler).info("没有更多数据啦~\n") Common.logging(log_type, crawler, env, "没有更多数据啦~\n") return cls.i += 1 cls.search_elements(driver, '//*[@is="pages/discover/components/bless/dynamic/dynamic"]') Common.logger(log_type, crawler).info(f"拖动第{cls.i}条视频至屏幕中间") Common.logging(log_type, crawler, env, f"拖动第{cls.i}条视频至屏幕中间") time.sleep(3) driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})", video_element) video_title = video_element.find_elements(By.XPATH, '//*[@class="dynamic--title"]')[index+i].text play_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--views"]')[index + i].text duration_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--duration"]')[index + i].text user_name = video_element.find_elements(By.XPATH, '//*[@class="dynamic--nick-top"]')[index + i].text like_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--commerce"]/*[1]/*[2]')[index + i].text comment_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--commerce"]/*[2]/*[2]')[index + i].text cover_url = video_element.find_elements(By.XPATH, '//*[@class="dynamic--bg-image"]')[index+i].get_attribute('src') avatar_url = video_element.find_elements(By.XPATH, '//*[@class="avatar--avatar"]')[index+i].get_attribute('src') play_cnt = int(play_cnt_str.replace("+", "").replace("次播放", "")) duration = int(duration_str.split(":")[0].strip())*60 + int(duration_str.split(":")[-1].strip()) if "点赞" in like_cnt_str: like_cnt = 0 elif "万" in like_cnt_str: like_cnt = int(like_cnt_str.split("万")[0])*10000 else: like_cnt = int(like_cnt_str) if "评论" in comment_cnt_str: comment_cnt = 0 elif "万" in comment_cnt_str: comment_cnt = int(comment_cnt_str.split("万")[0])*10000 else: comment_cnt = int(comment_cnt_str) out_video_id = md5(video_title.encode('utf8')).hexdigest() out_user_id = md5(user_name.encode('utf8')).hexdigest() video_dict = { "video_title": video_title, "video_id": out_video_id, "duration": duration, "play_cnt": play_cnt, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": 0, "user_name": user_name, "user_id": out_user_id, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"zhufuquanzi-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"video_dict:{video_dict}") if video_title is None or cover_url is None: Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, '无效视频\n') elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, out_video_id, env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: video_element.click() time.sleep(3) video_url_elements = cls.search_elements(driver, '//*[@class="index--video-item index--video"]') if video_url_elements is None or len(video_url_elements) == 0: Common.logger(log_type, crawler).info("未获取到视频播放地址\n") Common.logging(log_type, crawler, env, "未获取到视频播放地址\n") driver.press_keycode(AndroidKey.BACK) else: video_url = video_url_elements[0].get_attribute("src") video_dict["video_url"] = video_url Common.logger(log_type, crawler).info(f"video_url:{video_url}") video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) cls.download_cnt += 1 driver.press_keycode(AndroidKey.BACK) Common.logger(log_type, crawler).info("符合抓取条件,mq send msg 成功\n") Common.logging(log_type, crawler, env, "符合抓取条件,ACK MQ 成功\n") except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") Common.logger(log_type, crawler).info("已抓取完一组,休眠 10 秒\n") Common.logging(log_type, crawler, env, "已抓取完一组,休眠 10 秒\n") time.sleep(10) index = index + len(video_list) if __name__ == "__main__": rule_dict1 = {"period": {"min": 365, "max": 365}, "duration": {"min": 30, "max": 1800}, "favorite_cnt": {"min": 5000, "max": 0}, "videos_cnt": {"min": 10, "max": 20}, "share_cnt": {"min": 1000, "max": 0}} ZFQZRecommend.start_wechat("recommend", "zhufuquanzi", "dev", rule_dict1, 6267141)