# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/28 import datetime import difflib import json import os import shutil import sys import time from hashlib import md5 import requests import urllib3 # from requests.adapters import HTTPAdapter from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium import webdriver sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.public import get_config_from_mysql from common.publish import Publish from common.scheduling_db import MysqlHelper class GongzhonghaoFollow3: # 翻页参数 begin = 0 platform = "公众号" # 基础门槛规则 @staticmethod def download_rule(video_dict): """ 下载视频的基本规则 :param video_dict: 视频信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ # 视频时长 20秒 - 45 分钟 if 60 * 45 >= int(float(video_dict['duration'])) >= 20: # 宽或高 if int(video_dict['video_width']) >= 0 or int(video_dict['video_height']) >= 0: return True else: return False else: return False @classmethod def title_like(cls, log_type, crawler, title, env): select_sql = f""" select * from crawler_video where platform="公众号" """ video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") if len(video_list) == 0: return None for video_dict in video_list: video_title = video_dict["video_title"] if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8: return True else: pass # 获取 token @classmethod def get_token(cls, log_type, crawler): while True: # try: sheet = Feishu.get_values_batch(log_type, crawler, "l1VZki") if sheet is None: time.sleep(1) continue token = sheet[0][1] cookie = sheet[1][1] gzh_name = sheet[2][1] gzh_time = sheet[3][1] token_dict = {'token': token, 'cookie': cookie, 'gzh_name': gzh_name, 'gzh_time': gzh_time} return token_dict # except Exception as e: # Common.logger(log_type, crawler).error(f"get_cookie_token异常:{e}\n") # 获取用户 fakeid @classmethod def get_fakeid(cls, log_type, crawler, user, index): # try: while True: token_dict = cls.get_token(log_type, crawler) url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=1011071554&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': token_dict['cookie'], } params = { "action": "search_biz", "begin": "0", "count": "5", "query": str(user), "token": token_dict['token'], "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() # s = requests.session() # # max_retries=3 重试3次 # s.mount('http://', HTTPAdapter(max_retries=3)) # s.mount('https://', HTTPAdapter(max_retries=3)) # r = s.get(url=url, headers=headers, params=params, verify=False, proxies=Common.tunnel_proxies(), timeout=5) r = requests.get(url=url, headers=headers, params=params, verify=False) r.close() if r.json()["base_resp"]["err_msg"] == "invalid session": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_fakeid:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 过期啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"token_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue if r.json()["base_resp"]["err_msg"] == "freq control": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_fakeid:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 频控啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"公众号_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue if "list" not in r.json() or len(r.json()["list"]) == 0: Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_fakeid:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 频控啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"公众号_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue fakeid = r.json()["list"][int(index) - 1]["fakeid"] head_url = r.json()["list"][int(index) - 1]["round_head_img"] fakeid_dict = {'fakeid': fakeid, 'head_url': head_url} return fakeid_dict # except Exception as e: # Common.logger(log_type, crawler).error(f"get_fakeid异常:{e}\n") # 获取腾讯视频下载链接 @classmethod def get_tencent_video_url(cls, video_id): # try: url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json' response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}') response = json.loads(response) url = response['vl']['vi'][0]['ul']['ui'][0]['url'] fvkey = response['vl']['vi'][0]['fvkey'] video_url = url + str(video_id) + '.mp4?vkey=' + fvkey return video_url # except Exception as e: # Common.logger(log_type, crawler).error(f"get_tencent_video_url异常:{e}\n") @classmethod def get_video_url(cls, article_url, env): # try: # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--no-sandbox") # driver初始化 if env == "prod": driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) else: driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service( '/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver')) driver.implicitly_wait(10) # Common.logger(log_type, crawler).info('打开文章链接') driver.get(article_url) time.sleep(1) if len(driver.find_elements(By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]')) != 0: video_url = driver.find_element( By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]').get_attribute('src') elif len(driver.find_elements(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]')) != 0: iframe = driver.find_element(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]').get_attribute( 'src') video_id = iframe.split('vid=')[-1].split('&')[0] video_url = cls.get_tencent_video_url(video_id) else: video_url = 0 driver.quit() return video_url # except Exception as e: # Common.logger(log_type, crawler).info(f'get_video_url异常:{e}\n') # 获取文章列表 @classmethod def get_videoList(cls, log_type, crawler, user, index, oss_endpoint, env): # try: while True: fakeid_dict = cls.get_fakeid(log_type, crawler, user, index) token_dict = cls.get_token(log_type, crawler) url = "https://mp.weixin.qq.com/cgi-bin/appmsg?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=" + str(token_dict['token']) + "&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': token_dict['cookie'], } params = { "action": "list_ex", "begin": str(cls.begin), "count": "5", "fakeid": fakeid_dict['fakeid'], "type": "9", "query": "", "token": str(token_dict['token']), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() # s = requests.session() # # max_retries=3 重试3次 # s.mount('http://', HTTPAdapter(max_retries=3)) # s.mount('https://', HTTPAdapter(max_retries=3)) # r = s.get(url=url, headers=headers, params=params, verify=False, proxies=Common.tunnel_proxies(), timeout=5) r = requests.get(url=url, headers=headers, params=params, verify=False) r.close() if r.json()["base_resp"]["err_msg"] == "invalid session": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 过期啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"token_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue if r.json()["base_resp"]["err_msg"] == "freq control": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 频控啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"公众号_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue if 'app_msg_list' not in r.json(): Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}") Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n") Common.logger(log_type, crawler).warning( f"公众号_3:{token_dict['gzh_name']}, 更换日期:{token_dict['gzh_time']} 频控啦\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"公众号_3:{token_dict['gzh_name']}\n更换日期:{token_dict['gzh_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 10) continue if len(r.json()['app_msg_list']) == 0: Common.logger(log_type, crawler).info('没有更多视频了\n') return else: cls.begin += 5 app_msg_list = r.json()['app_msg_list'] for article_url in app_msg_list: # title if 'title' in article_url: title = article_url['title'].replace('/', '').replace('\n', '') \ .replace('.', '').replace('“', '').replace('”', '').replace(' ', '')\ .replace('"', '').replace("'", "") else: title = 0 # aid if 'aid' in article_url: aid = article_url['aid'] else: aid = 0 # create_time if 'create_time' in article_url: create_time = article_url['create_time'] else: create_time = 0 publish_time_stamp = int(create_time) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) avatar_url = fakeid_dict['head_url'] # cover_url if 'cover' in article_url: cover_url = article_url['cover'] else: cover_url = 0 # article_url if 'link' in article_url: article_url = article_url['link'] else: article_url = 0 video_url = cls.get_video_url(article_url, env) video_dict = { 'video_id': aid, 'video_title': title, 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': user, 'play_cnt': 0, 'comment_cnt': 0, 'like_cnt': 0, 'share_cnt': 0, 'user_id': fakeid_dict['fakeid'], 'avatar_url': avatar_url, 'cover_url': cover_url, 'article_url': article_url, 'video_url': video_url, 'session': f'gongzhonghao-follow-{int(time.time())}' } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if int(time.time()) - publish_time_stamp >= 3600 * 24 * 3: Common.logger(log_type, crawler).info(f'发布时间{publish_time_str} > 3 天\n') cls.begin = 0 return cls.download_publish(log_type, crawler, video_dict, oss_endpoint, env) Common.logger(log_type, crawler).info('休眠 60 秒\n') time.sleep(60) # except Exception as e: # Common.logger(log_type, crawler).error("get_videoList异常:{}\n", e) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, video_dict, oss_endpoint, env): # try: if video_dict['article_url'] == 0 or video_dict['video_url'] == 0: Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n") # 标题敏感词过滤 elif any(word if word in video_dict['video_title'] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info("标题已中过滤词\n") # 已下载判断 elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info("视频已下载\n") # 标题相似度 elif cls.title_like(log_type, crawler, video_dict['video_title'], env) is True: Common.logger(log_type, crawler).info(f'标题相似度>=80%:{video_dict["video_title"]}\n') else: # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() # 获取视频时长 ffmpeg_dict = Common.ffmpeg(log_type, crawler, f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") if ffmpeg_dict is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return video_dict["video_width"] = ffmpeg_dict["width"] video_dict["video_height"] = ffmpeg_dict["height"] video_dict["duration"] = ffmpeg_dict["duration"] video_size = ffmpeg_dict["size"] Common.logger(log_type, crawler).info(f'video_width:{video_dict["video_width"]}') Common.logger(log_type, crawler).info(f'video_height:{video_dict["video_height"]}') Common.logger(log_type, crawler).info(f'duration:{video_dict["duration"]}') Common.logger(log_type, crawler).info(f'video_size:{video_size}') # 视频size=0,直接删除 if int(video_size) == 0 or cls.download_rule(video_dict) is False: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 保存视频信息至 "./videos/{video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") strategy = "定向爬虫策略" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid="follow", oss_endpoint=oss_endpoint, env=env) if env == 'prod': our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{str(our_video_id)}/info" else: our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{str(our_video_id)}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return # 视频信息保存数据库 rule_dict = { "duration": {"min": 20, "max": 45 * 60}, "publish_day": {"min": 3} } insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['user_id']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息插入数据库成功!') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "47e39d", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", video_dict['video_title'], video_dict['video_id'], our_video_link, int(video_dict['duration']), f"{video_dict['video_width']}*{video_dict['video_height']}", video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['article_url'], video_dict['video_url']]] time.sleep(0.5) Feishu.update_values(log_type, crawler, "47e39d", "F2:Z2", values) Common.logger(log_type, crawler).info('视频下载/上传成功\n') # except Exception as e: # Common.logger(log_type, crawler).error(f"download_publish异常:{e}\n") @classmethod def get_users(cls): # user_sheet = Feishu.get_values_batch("follow", 'gongzhonghao', 'Bzv72P') # user_list = [] # for i in range(81, len(user_sheet)): # user_name = user_sheet[i][0] # index = user_sheet[i][1] # user_dict = { # "user_name": user_name, # "index": index, # } # user_list.append(user_dict) # print(len(user_list)) # print(user_list) user_list = [{'user_name': '方敏爱美食', 'index': 1}, {'user_name': '针灸推拿特色技术', 'index': 1}, {'user_name': '挺进天山', 'index': 1}, {'user_name': '紫陌捻花', 'index': 1}, {'user_name': '巨响养身', 'index': 1}, {'user_name': '荣观世界', 'index': 1}, {'user_name': 'Music音乐世界', 'index': 1}, {'user_name': '微观调查组', 'index': 1}, {'user_name': '用汉方拥抱世界', 'index': 1}, {'user_name': '医学养身秘诀', 'index': 1}, {'user_name': '医学老人养身', 'index': 1}, {'user_name': '热文微观', 'index': 1}, {'user_name': '医学养身秘笈', 'index': 1}, {'user_name': '你未读消息', 'index': 1}, {'user_name': '零点相聚', 'index': 2}, {'user_name': '观念颠覆一切', 'index': 1}, {'user_name': '侯老师说食疗精选', 'index': 1}, {'user_name': '侯老师说食疗', 'index': 1}, {'user_name': '今日看点收集', 'index': 1}, {'user_name': '君拍', 'index': 1}, {'user_name': '惊爆视频', 'index': 3}, {'user_name': '绝美生活', 'index': 2}, {'user_name': '新龙虎局势', 'index': 1}, {'user_name': '行走的足音', 'index': 1}, {'user_name': '月光下小夜曲', 'index': 1}, {'user_name': '罪与罚的言', 'index': 1}, {'user_name': '祝福音画', 'index': 1}, {'user_name': '这年头儿', 'index': 1}, {'user_name': '祝福励志正能量', 'index': 1}, {'user_name': '出借人清查组', 'index': 1}, {'user_name': '强哥来了', 'index': 1}, {'user_name': '绝美相册', 'index': 1}, {'user_name': '绝美立体相册', 'index': 1}, {'user_name': '生活美相册', 'index': 1}, {'user_name': '祝您生活幸福', 'index': 1}, {'user_name': '完美生活', 'index': 3}, {'user_name': '新龙虎局世', 'index': 1}, {'user_name': '精美音画相册', 'index': 1}, {'user_name': '音画场景', 'index': 1}, {'user_name': '出借人投诉处', 'index': 1}] return user_list @classmethod def get_all_videos(cls, log_type, crawler, oss_endpoint, env): user_list = cls.get_users() for user_dict in user_list: try: user_name = user_dict['user_name'] index = user_dict['index'] Common.logger(log_type, crawler).info(f'获取 {user_name} 公众号视频\n') cls.get_videoList(log_type, crawler, user_name, index, oss_endpoint, env) cls.begin = 0 Common.logger(log_type, crawler).info('休眠 60 秒\n') time.sleep(60) except Exception as e: Common.logger(log_type, crawler).info(f'get_all_videos异常:{e}\n') if __name__ == "__main__": GongzhonghaoFollow3.get_users() # GongzhonghaoFollow.get_users() # GongzhonghaoFollow.get_videoList(log_type="follow", # crawler="gongzhonghao", # user="香音难忘", # index=1, # oss_endpoint="out", # env="dev") pass