# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/1/16 import difflib import json import random # import shutil import time import ffmpeg import requests import urllib3 from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium import webdriver from main.common import Common from main.feishu_lib import Feishu from main.publish import Publish class GongZongHao: # 翻页参数 begin = 0 # 获取已下载视频宽高、时长等信息 @classmethod def get_video_info_from_ffmpeg(cls, log_type, video_path): probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: Common.logger(log_type).info('No video stream found!') return width = int(video_stream['width']) height = int(video_stream['height']) duration = float(video_stream['duration']) return width, height, duration # 过滤词库 @classmethod def filter_words(cls, log_type): try: filter_word_list = [] filter_sheet = Feishu.get_values_batch(log_type, 'gongzhonghao', 'BwN8mo') for x in filter_sheet: for y in x: if y is None: pass else: filter_word_list.append(y) return filter_word_list except Exception as e: Common.logger(log_type).info(f'filter_words异常:{e}\n') @classmethod def title_like(cls, log_type, title): sheet = Feishu.get_values_batch(log_type, 'gongzhonghao', '47e39d') for i in range(1, len(sheet)): video_title = sheet[i][7] if video_title is None: pass elif difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8: return True else: pass # 获取 token @classmethod def get_token(cls, log_type): try: sheet = Feishu.get_values_batch(log_type, "gongzhonghao", "OjyJqs") token = sheet[0][1] cookie = sheet[1][1] token_dict = {'token': token, 'cookie': cookie} return token_dict except Exception as e: Common.logger(log_type).error(f"get_cookie_token异常:{e}\n") # 获取用户 fakeid @classmethod def get_fakeid(cls, log_type, user, index): try: url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=1011071554&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': cls.get_token(log_type)['cookie'], } params = { "action": "search_biz", "begin": "0", "count": "5", "query": str(user), "token": cls.get_token(log_type)['token'], "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) if "list" not in r.json() or len(r.json()["list"]) == 0: Common.logger(log_type).warning(f"get_fakeid:{r.text},随机休眠 3-5 分钟\n") time.sleep(random.randint(60 * 3, 60 * 5)) else: fakeid = r.json()["list"][int(index) - 1]["fakeid"] head_url = r.json()["list"][int(index) - 1]["round_head_img"] fakeid_dict = {'fakeid': fakeid, 'head_url': head_url} return fakeid_dict except Exception as e: Common.logger(log_type).error(f"get_fakeid异常:{e}\n") # 获取腾讯视频下载链接 @classmethod def get_tencent_video_url(cls, log_type, video_id): try: url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json' response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}') response = json.loads(response) url = response['vl']['vi'][0]['ul']['ui'][0]['url'] fvkey = response['vl']['vi'][0]['fvkey'] video_url = url + str(video_id) + '.mp4?vkey=' + fvkey return video_url except Exception as e: Common.logger(log_type).error(f"get_tencent_video_url异常:{e}\n") @classmethod def get_video_url(cls, log_type, article_url): try: # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--no-sandbox") # driver初始化 driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/wangkun/Downloads/chromedriver/chromedriver_v108/chromedriver')) driver.implicitly_wait(10) # Common.logger(log_type).info('打开文章链接') driver.get(article_url) time.sleep(5) if len(driver.find_elements(By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]')) != 0: video_url = driver.find_element( By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]').get_attribute('src') elif len(driver.find_elements(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]')) != 0: iframe = driver.find_element(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]').get_attribute( 'src') video_id = iframe.split('vid=')[-1].split('&')[0] video_url = cls.get_tencent_video_url(log_type, video_id) else: video_url = 0 return video_url except Exception as e: Common.logger(log_type).info(f'get_video_url异常:{e}\n') # 获取文章列表 @classmethod def get_articles(cls, log_type, user, index, env): fakeid_dict = cls.get_fakeid(log_type, user, index) while True: try: url = "https://mp.weixin.qq.com/cgi-bin/appmsg?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=" + str(cls.get_token(log_type)['token']) + "&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': cls.get_token(log_type)['cookie'], } params = { "action": "list_ex", "begin": str(cls.begin), "count": "5", "fakeid": fakeid_dict['fakeid'], "type": "9", "query": "", "token": str(cls.get_token(log_type)['token']), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) cls.begin += 5 if 'app_msg_list' not in r.json(): Common.logger(log_type).warning(f"get_gzh_url:{r.text}\n") break elif len(r.json()['app_msg_list']) == 0: Common.logger(log_type).info('没有更多视频了\n') else: app_msg_list = r.json()['app_msg_list'] for article_url in app_msg_list: # title if 'title' in article_url: title = article_url['title'].replace('/', '').replace('\n', '') \ .replace('.', '').replace('“', '').replace('”', '').replace(' ', '') else: title = 0 # aid if 'aid' in article_url: aid = article_url['aid'] else: aid = 0 # create_time if 'create_time' in article_url: create_time = article_url['create_time'] else: create_time = 0 head_url = fakeid_dict['head_url'] # cover_url if 'cover' in article_url: cover_url = article_url['cover'] else: cover_url = 0 # article_url if 'link' in article_url: article_url = article_url['link'] else: article_url = 0 video_url = cls.get_video_url(log_type, article_url) Common.logger(log_type).info(f"title:{title}") # Common.logger(log_type).info(f"aid:{aid}, type{type(aid)}") # Common.logger(log_type).info("create_time:{}", create_time) # Common.logger(log_type).info("head_url:{}", head_url) # Common.logger(log_type).info("cover_url:{}", cover_url) Common.logger(log_type).info(f"article_url:{article_url}") Common.logger(log_type).info(f"video_url:{video_url}") if int(time.time()) - create_time >= 3600*24*3: Common.logger(log_type).info(f'发布时间{time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(create_time))} > 3 天\n') cls.begin = 0 return else: video_dict = { 'video_title': title, 'aid': aid, 'create_time': create_time, 'user_name': user, 'user_id': fakeid_dict['fakeid'], 'head_url': head_url, 'cover_url': cover_url, 'article_url': article_url, 'video_url': video_url } cls.download_publish(log_type, video_dict, env) Common.logger(log_type).info('休眠 10 秒\n') time.sleep(10) except Exception as e: Common.logger(log_type).error("get_gzh_url异常:{}\n", e) # 下载/上传 @classmethod def download_publish(cls, log_type, video_dict, env): try: if video_dict['article_url'] == 0 or video_dict['video_url'] == 0: Common.logger(log_type).info("文章涉嫌违反相关法律法规和政策\n") # 标题敏感词过滤 elif any(word if word in video_dict['video_title'] else False for word in cls.filter_words(log_type)) is True: Common.logger(log_type).info("标题已中过滤词\n") # 已下载判断 elif video_dict['aid'] in [x for y in Feishu.get_values_batch(log_type, 'gongzhonghao', '47e39d') for x in y]: Common.logger(log_type).info("视频已下载\n") # 标题相似度 elif cls.title_like(log_type, video_dict['video_title']) is True: Common.logger(log_type).info('标题相似度>=80%:{}\n', video_dict['video_title']) else: # 下载视频 Common.download_method(log_type, "video", video_dict['video_title'], video_dict['video_url']) # 获取视频时长 video_info = cls.get_video_info_from_ffmpeg(log_type, "./videos/" + video_dict['video_title'] + "/video.mp4") video_width = str(video_info[0]) video_height = str(video_info[1]) duration = video_info[2] # # 视频时长<50s,直接删除 # if int(duration) < 50: # # 删除视频文件夹 # shutil.rmtree("./videos/" + video_dict['video_title'] + "/") # Common.logger(log_type).info("时长:{}<50秒,删除成功\n") # return # else: # 下载封面 Common.download_method(log_type, 'cover', video_dict['video_title'], video_dict['cover_url']) # 保存视频信息至 "./videos/{video_title}/info.txt" with open("./videos/" + video_dict['video_title'] + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(video_dict['aid']) + "\n" + video_dict['video_title'] + "\n" + str(int(duration)) + "\n" + '100000' + "\n" + '100000' + "\n" + '100000' + "\n" + '100000' + "\n" + str(video_width) + '*' + str(video_height) + "\n" + str(video_dict['create_time']) + "\n" + video_dict['user_name'] + "\n" + video_dict['head_url'] + "\n" + video_dict['video_url'] + "\n" + video_dict['cover_url'] + "\n" + "gongzhonghao_xinxin" + str(time.time()) + "\n") Common.logger(log_type).info("==========视频信息已保存至info.txt==========") # 上传视频 Common.logger(log_type).info("开始上传视频") our_video_id = Publish.upload_and_publish(log_type, env, "play") if env == 'prod': our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str(our_video_id) + "/info" else: our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/" + str(our_video_id) + "/info" Common.logger(log_type).info("视频上传完成") # 保存视频 ID 到云文档 Common.logger(log_type).info("保存视频信息至云文档") # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "gongzhonghao", "47e39d", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "公众号_信欣", video_dict['video_title'], video_dict['aid'], our_video_link, int(duration), str(video_width) + '*' + str(video_height), time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(video_dict['create_time'])), video_dict['user_name'], video_dict['user_id'], video_dict['head_url'], video_dict['cover_url'], video_dict['article_url'], video_dict['video_url']]] time.sleep(1) Feishu.update_values(log_type, "gongzhonghao", "47e39d", "F2:Z2", values) Common.logger(log_type).info("视频下载/上传成功\n") except Exception as e: Common.logger(log_type).error(f"download_publish异常:{e}\n") @classmethod def get_all_videos(cls, log_type, env): try: user_sheet = Feishu.get_values_batch(log_type, 'gongzhonghao', 'Bzv72P') for i in range(1, len(user_sheet)): user_name = user_sheet[i][0] index = user_sheet[i][1] if user_name is None or index is None: Common.logger(log_type).info(f'第{i+1}行,空行\n') else: Common.logger(log_type).info(f'获取 {user_name} 公众号视频\n') cls.get_articles(log_type, user_name, index, env) cls.begin = 0 Common.logger(log_type).info('休眠1分钟\n') time.sleep(60) except Exception as e: Common.logger(log_type).info(f'get_all_videos异常:{e}\n') if __name__ == "__main__": pass