# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/9/13 import json import os import random import re import shutil import sys import time from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.publish import Publish from common.scheduling_db import MysqlHelper class HTZFRecommend: platform = "海豚祝福" i = 0 @classmethod def today_download_cnt(cls, log_type, crawler, env): select_sql = """ SELECT COUNT(*) FROM crawler_video WHERE platform IN ("haitunzhufu", "海豚祝福") AND DATE(create_time) = CURDATE(); """ today_download_cnt = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")[0]['COUNT(*)'] # print(today_download_cnt) return today_download_cnt @classmethod def start_wechat(cls, log_type, crawler, videos_cnt, env): if env == "dev": chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver" else: chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" Common.logger(log_type, crawler).info("启动微信") caps = { "platformName": "Android", "platformVersion": "11", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "autoGrantPermissions": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollention": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) driver.implicitly_wait(20) for i in range(120): try: if driver.find_element(By.ID, "com.tencent.mm:id/f2s"): break elif driver.find_element(By.ID, "com.android.system:id/dismiss_view"): Common.logger(log_type, crawler).info("发现并关闭系统下拉菜单栏") else: pass except NoSuchElementException: pass Common.logger(log_type, crawler).info("下滑,展示小程序选择面板") size = driver.get_window_size() driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200) time.sleep(3) Common.logger(log_type, crawler).info('打开小程序"海豚祝福"') driver.find_elements(By.XPATH, '//*[@text="海豚祝福"]')[-1].click() time.sleep(5) cls.get_videoList(log_type, crawler, driver, videos_cnt, env) time.sleep(1) driver.quit() @classmethod def search_elements(cls, driver: WebDriver, xpath): time.sleep(1) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: elements = driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass @classmethod def repeat_out_video_id(cls, log_type, crawler, out_video_id, env): sql = f""" select * from crawler_video where platform in ("众妙音信", "刚刚都传", "吉祥幸福", "知青天天看", "zhufuquanzi", "祝福圈子", "haitunzhufu", "海豚祝福") and out_video_id="{out_video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_video_url(cls, driver: WebDriver, video_element): video_element.click() time.sleep(5) video_url_elements = cls.search_elements(driver, '//*[@id="myVideo"]') if video_url_elements: return video_url_elements[0].get_attribute("src") else: return @classmethod def get_videoList(cls, log_type, crawler, driver: WebDriver, videos_cnt, env): driver.implicitly_wait(20) webviews = driver.contexts Common.logger(log_type, crawler).info(f"webviews:{webviews}") driver.switch_to.context(webviews[1]) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: if cls.search_elements(driver, '//*[@class="bottom_scroll"]'): Common.logger(log_type, crawler).info("切换到小程序") break except NoSuchElementException: time.sleep(1) cls.search_elements(driver, '//*[@class="nav cur"]')[-1].click() Common.logger(log_type, crawler).info('点击"推荐"列表成功\n') index = 0 while True: if cls.search_elements(driver, '//*[@class="list"]') is None: Common.logger(log_type, crawler).info("列表页窗口已销毁\n") cls.i = 0 return videoList_elements = cls.search_elements(driver, '//*[@class="img_bf"]') if videoList_elements is None or len(videoList_elements) == 0: cls.i = 0 Common.logger(log_type, crawler).warning(f"videoList_elements:{videoList_elements}") return video_list = videoList_elements[index:] if len(video_list) == 0 or video_list is None: Common.logger(log_type, crawler).info("到底啦~~~~~~~~~~") cls.i = 0 return for i, video_element in enumerate(video_list): try: today_download_cnt = cls.today_download_cnt(log_type, crawler, env) Common.logger(log_type, crawler).info(f"今日已下载{today_download_cnt}条视频\n") if today_download_cnt >= videos_cnt: cls.i = 0 return if video_element is None: Common.logger(log_type, crawler).info("没有更多数据啦~\n") cls.i = 0 return cls.i += 1 cls.search_elements(driver, '//*[@class="img_bf"]') Common.logger(log_type, crawler).info(f"拖动第{cls.i}条视频至屏幕顶部") driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})", video_element) time.sleep(3) video_title = video_element.find_elements(By.XPATH, '//*[@class="title"]')[i+index].text plat_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="wan"]')[i+index].text play_cnt = int(re.sub(r"\D", "", plat_cnt_str)) * 10000 if "万" in plat_cnt_str else plat_cnt_str cover_url = video_element.find_elements(By.XPATH, '//*[@class="img"]')[i+index].get_attribute('src') play_icon = video_element.find_elements(By.XPATH, '//*[@class="bf"]')[i+index] out_video_id = md5(video_title.encode('utf8')).hexdigest() video_dict = { "video_title": video_title, 'video_id': out_video_id, "plat_cnt_str": plat_cnt_str, "play_cnt": play_cnt, 'comment_cnt': 0, 'like_cnt': 0, 'share_cnt': 0, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), 'user_name': "haitunzhufu", 'user_id': "haitunzhufu", "cover_url": cover_url, 'avatar_url': cover_url, 'session': f"haitunzhufu-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if video_title is None or cover_url is None: Common.logger(log_type, crawler).info("无效视频\n") elif cls.repeat_out_video_id(log_type, crawler, out_video_id, env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') else: video_url = cls.get_video_url(driver, play_icon) if video_url is None: Common.logger(log_type, crawler).info("未获取到视频播放地址\n") # driver.press_keycode(AndroidKey.BACK) continue video_dict["video_url"] = video_url Common.logger(log_type, crawler).info(f"video_url:{video_url}") cls.download_publish(log_type, crawler, video_dict, env) driver.press_keycode(AndroidKey.BACK) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logger(log_type, crawler).info("已抓取完一组,休眠 10 秒\n") time.sleep(10) index = index + len(video_list) @classmethod def get_our_uid(cls, log_type, crawler, env): select_sql = f""" SELECT uid FROM crawler_user_v3 WHERE `source`="{crawler}"; """ uids = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") uid_list = [] for uid_dict in uids: uid_list.append(uid_dict["uid"]) return random.choice(uid_list) @classmethod def download_publish(cls, log_type, crawler, video_dict, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text='video', title=video_dict['video_title'], url=video_dict['video_url']) ffmpeg_dict = Common.ffmpeg(log_type, crawler, f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") if ffmpeg_dict is None: md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() shutil.rmtree(f"./{crawler}/videos/{md_title}/") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return video_dict["duration"] = ffmpeg_dict["duration"] video_dict["video_width"] = ffmpeg_dict["width"] video_dict["video_height"] = ffmpeg_dict["height"] # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url']) # 保存视频信息至txt Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="推荐榜爬虫策略", our_uid=cls.get_our_uid(log_type, crawler, env), env=env, oss_endpoint="out") if env == 'dev': our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return # 视频信息保存至飞书 Feishu.insert_columns(log_type, crawler, "d51d20", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "推荐榜爬虫策略", video_dict["video_title"], video_dict["video_id"], video_dict["play_cnt"], video_dict["duration"], f'{video_dict["video_width"]}*{video_dict["video_height"]}', our_video_link, video_dict["cover_url"], video_dict["video_url"]]] time.sleep(1) Feishu.update_values(log_type, crawler, "d51d20", "F2:V2", values) Common.logger(log_type, crawler).info(f"视频已保存至飞书文档\n") rule_dict = {} # 视频信息保存数据库 insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['user_id']}", "{cls.platform}", "推荐榜爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env, action='') Common.logger(log_type, crawler).info('视频信息写入数据库成功!\n') if __name__ == "__main__": # HTZFRecommend.start_wechat("recommend", "haitunzhufu", 5, "dev") # HTZFRecommend.today_download_cnt("recommend", "haitunzhufu", "dev") # HTZFRecommend.get_play_cnt() print(HTZFRecommend.get_our_uid("recommend", "haitunzhufu", "prod")) pass