# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/6/25 import base64 import json import os import random import re import string import sys import time import requests import urllib3 from requests.adapters import HTTPAdapter from selenium.webdriver import DesiredCapabilities, ActionChains from selenium.webdriver.chrome.service import Service from selenium import webdriver from selenium.webdriver.common.by import By from common.mq import MQ sys.path.append(os.getcwd()) from common.scheduling_db import MysqlHelper from common.common import Common from common.public import get_config_from_mysql, download_rule from common.userAgent import get_random_user_agent class XiguasearchDev: # 已下载视频数 download_cnt = 0 platform = "xigua" @classmethod def random_signature(cls): src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample( src_lowercase, lowercase_num) random.shuffle(password) new_password = 'AAAAAAAAAA' + ''.join(password)[10:-4] + 'AAAB' new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == '8': new_password = new_password_start + 'w' + new_password_end elif new_password[18] == '9': new_password = new_password_start + 'x' + new_password_end elif new_password[18] == '-': new_password = new_password_start + 'y' + new_password_end elif new_password[18] == '.': new_password = new_password_start + 'z' + new_password_end else: new_password = new_password_start + 'y' + new_password_end return new_password @classmethod def get_video_url(cls, video_info): video_url_dict = {} video_resource = video_info.get('videoResource', {}) dash_120fps = video_resource.get('dash_120fps', {}) normal = video_resource.get('normal', {}) # 从dash_120fps和normal字典中获取video_list字典 video_list = dash_120fps.get('video_list', {}) or normal.get('video_list', {}) # 获取video_list字典中的video_4、video_3、video_2或video_1的值。如果找到非空视频URL,则将其赋值给变量video_url。否则,将赋值为空字符串。 video = video_list.get('video_4') or video_list.get('video_3') or video_list.get('video_2') or video_list.get( 'video_1') video_url = video.get('backup_url_1', '') if video else '' audio_url = video.get('backup_url_1', '') if video else '' video_width = video.get('vwidth', 0) if video else 0 video_height = video.get('vheight', 0) if video else 0 video_url = re.sub(r'[^a-zA-Z0-9+/=]', '', video_url) # 从视频URL中删除特殊字符 audio_url = re.sub(r'[^a-zA-Z0-9+/=]', '', audio_url) # 从音频URL中删除特殊字符 video_url = base64.b64decode(video_url).decode('utf8') # 解码视频URL audio_url = base64.b64decode(audio_url).decode('utf8') # 解码音频URL video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height return video_url_dict @classmethod def get_comment_cnt(cls, item_id): url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?" params = { "tab_index": "0", "count": "10", "offset": "10", "group_id": str(item_id), "item_id": str(item_id), "aid": "1768", "msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==", "X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U", "_signature": cls.random_signature(), } headers = { 'authority': 'www.ixigua.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'cache-control': 'no-cache', 'cookie': 'MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3', 'pragma': 'no-cache', 'referer': f'https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540', 'sec-ch-ua': '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'tt-anti-token': 'cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35', 'x-secsdk-csrf-token': '000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if response.status_code != 200 or 'total_number' not in response.json() or response.json() == {}: return 0 return response.json().get("total_number", 0) # 获取视频详情 @classmethod def get_video_info(cls, log_type, crawler, item_id): url = 'https://www.ixigua.com/api/mixVideo/information?' headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": get_random_user_agent('pc'), "referer": "https://www.ixigua.com/7102614741050196520?logTag=0531c88ac04f38ab2c62", } params = { 'mixId': str(item_id), 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfC' 'NVVIOBNjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'X-Bogus': 'DFSzswVupYTANCJOSBk0P53WxM-r', '_signature': '_02B4Z6wo0000119LvEwAAIDCuktNZ0y5wkdfS7jAALThuOR8D9yWNZ.EmWHKV0WSn6Px' 'fPsH9-BldyxVje0f49ryXgmn7Tzk-swEHNb15TiGqa6YF.cX0jW8Eds1TtJOIZyfc9s5emH7gdWN94', } cookies = { 'ixigua-a-s': '1', 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfCNVVIOB' 'NjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'ttwid': '1%7C_yXQeHWwLZgCsgHClOwTCdYSOt_MjdOkgnPIkpi-Sr8%7C1661241238%7Cf57d0c5ef3f1d7' '6e049fccdca1ac54887c34d1f8731c8e51a49780ff0ceab9f8', 'tt_scid': 'QZ4l8KXDG0YAEaMCSbADdcybdKbUfG4BC6S4OBv9lpRS5VyqYLX2bIR8CTeZeGHR9ee3', 'MONITOR_WEB_ID': '0a49204a-7af5-4e96-95f0-f4bafb7450ad', '__ac_nonce': '06304878000964fdad287', '__ac_signature': '_02B4Z6wo00f017Rcr3AAAIDCUVxeW1tOKEu0fKvAAI4cvoYzV-wBhq7B6D8k0no7lb' 'FlvYoinmtK6UXjRIYPXnahUlFTvmWVtb77jsMkKAXzAEsLE56m36RlvL7ky.M3Xn52r9t1IEb7IR3ke8', 'ttcid': 'e56fabf6e85d4adf9e4d91902496a0e882', '_tea_utm_cache_1300': 'undefined', 'support_avif': 'false', 'support_webp': 'false', 'xiguavideopcwebid': '7134967546256016900', 'xiguavideopcwebid.sig': 'xxRww5R1VEMJN_dQepHorEu_eAc', } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if response.status_code != 200 or 'data' not in response.json() or response.json()['data'] == {}: Common.logger(log_type, crawler).warning(f"get_video_info:{response.status_code}, {response.text}\n") return None else: video_info = response.json()['data'].get("gidInformation", {}).get("packerData", {}).get("video", {}) if video_info == {}: return None video_dict = { "video_title": video_info.get("title", ""), "video_id": video_info.get("videoResource", {}).get("vid", ""), "gid": str(item_id), "play_cnt": int(video_info.get("video_watch_count", 0)), "like_cnt": int(video_info.get("video_like_count", 0)), "comment_cnt": int(cls.get_comment_cnt(item_id)), "share_cnt": 0, "favorite_cnt": 0, "duration": int(video_info.get("video_duration", 0)), "video_width": int(cls.get_video_url(video_info)["video_width"]), "video_height": int(cls.get_video_url(video_info)["video_height"]), "publish_time_stamp": int(video_info.get("video_publish_time", 0)), "publish_time_str": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(video_info.get("video_publish_time", 0)))), "user_name": video_info.get("user_info", {}).get("name", ""), "user_id": str(video_info.get("user_info", {}).get("user_id", "")), "avatar_url": str(video_info.get("user_info", {}).get("avatar_url", "")), "cover_url": video_info.get("poster_url", ""), "audio_url": cls.get_video_url(video_info)["audio_url"], "video_url": cls.get_video_url(video_info)["video_url"], "session": f"xigua-search-{int(time.time())}" } return video_dict @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env, action="") return len(repeat_video) @classmethod def get_search_videos(cls, log_type, crawler, user_list, rule_dict, env): Common.logger(log_type, crawler).info(f"搜索词总数:{len(user_list)}\n") Common.logging(log_type, crawler, env, f"搜索词总数:{len(user_list)}\n") for user_dict in user_list: try: cls.download_cnt = 0 Common.logger(log_type, crawler).info(f"开始抓取 {user_dict['link']} 视频\n") Common.logging(log_type, crawler, env, f"开始抓取 {user_dict['link']} 视频\n") cls.get_videoList(log_type=log_type, crawler=crawler, user_dict=user_dict, rule_dict=rule_dict, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"抓取{user_dict['link']}视频时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取{user_dict['link']}视频时异常:{e}\n") @classmethod def get_videoList(cls, log_type, crawler, user_dict, rule_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") if env == "dev": chromedriver = "/Users/wangkun/Downloads/chromedriver/chromedriver_v114/chromedriver" else: chromedriver = "/usr/bin/chromedriver" # driver初始化 driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service(chromedriver)) driver.implicitly_wait(10) Common.logger(log_type, crawler).info(f"打开搜索页:{user_dict['link']}") Common.logging(log_type, crawler, env, f"打开搜索页:{user_dict['link']}") driver.get(f"https://www.ixigua.com/search/{user_dict['link']}/") time.sleep(2) # Common.logger(log_type, crawler).info("关闭登录弹框") # Common.logging(log_type, crawler, env, "关闭登录弹框") if driver.find_elements(By.XPATH, '//*[@class="xg-notification-close"]') != 0: driver.find_element(By.XPATH, '//*[@class="xg-notification-close"]').click() while True: # 查找视频列表 video_elements = driver.find_elements(By.XPATH, '//*[@class="HorizontalFeedCard__coverWrapper disableZoomAnimation"]') if len(video_elements) == 0: Common.logger(log_type, crawler).warning('未搜索到视频\n') Common.logging(log_type, crawler, env, '未搜索到视频\n') driver.quit() return elif len(video_elements) == 1000: Common.logger(log_type, crawler).info("已扫描 1000 条视频\n") break elif driver.find_element(By.XPATH, '//*[@class="Feed-footer"]').text == "没有更多内容了": Common.logger(log_type, crawler).info(f"已扫描 {len(video_elements)} 条视频\n") break else: # 拖动列表最后一条视频至屏幕中间 Common.logger(log_type, crawler).info("拖动列表最后一条视频至屏幕中间") action = ActionChains(driver) action.move_to_element(video_elements[-1]).perform() time.sleep(1) for i, video_element in enumerate(video_elements): try: if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 30)): Common.logger(log_type, crawler).info(f"搜索词: {user_dict['link']},已下载视频数: {cls.download_cnt}\n") Common.logging(log_type, crawler, env, f"搜索词: {user_dict['link']},已下载视频数: {cls.download_cnt}\n") driver.quit() return Common.logger(log_type, crawler).info(f'正在抓取第{i+1}条视频') Common.logging(log_type, crawler, env, f'正在抓取第{i+1}条视频') item_id = video_element.get_attribute('href').split("com/")[-1].split("?&")[0] # title = video_element.get_attribute('title') # Common.logger(log_type, crawler).info(f"标题:{title}") # Common.logging(log_type, crawler, env, f"标题:{title}") video_dict = cls.get_video_info(log_type, crawler, item_id) if video_dict is None: Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") continue for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"{video_dict}") if download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: # title_score = get_title_score(log_type, "kuaishou", "16QspO", "0usaDk", video_dict["video_title"]) # if title_score <= 0.3: # Common.logger(log_type, crawler).info(f"权重分:{title_score}<=0.3\n") # Common.logging(log_type, crawler, env, f"权重分:{title_score}<=0.3\n") # continue # Common.logger(log_type, crawler).info(f"权重分:{title_score}>0.3\n") # Common.logging(log_type, crawler, env, f"权重分:{title_score}>0.3\n") # cls.download_publish(log_type=log_type, # crawler=crawler, # user_dict=user_dict, # video_dict=video_dict, # rule_dict=rule_dict, # title_score=title_score, # env=env) video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["strategy_type"] = log_type mq.send_msg(video_dict) cls.download_cnt += 1 Common.logger(log_type, crawler).info("满足下载规则\n") except Exception as e: Common.logger(log_type, crawler).warning(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") if __name__ == '__main__': pass