# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/8/16 import os import random import sys import time import ffmpeg import requests import urllib3 sys.path.append(os.getcwd()) from main.common import Common from main.feishu_lib import Feishu from main.publish import Publish class GZH: # 翻页参数 begin = 0 # 每个用户抓取文章数量 gzh_count = [] # 获取已下载视频宽高、时长等信息 @classmethod def get_video_info_from_local(cls, video_path): probe = ffmpeg.probe(video_path) # print('video_path: {}'.format(video_path)) # format1 = probe['format'] # bit_rate = int(format1['bit_rate']) / 1000 # duration = format['duration'] # size = int(format1['size']) / 1024 / 1024 video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: print('No video stream found!') return width = int(video_stream['width']) height = int(video_stream['height']) # num_frames = int(video_stream['nb_frames']) # fps = int(video_stream['r_frame_rate'].split('/')[0]) / int(video_stream['r_frame_rate'].split('/')[1]) duration = float(video_stream['duration']) # print('width: {}'.format(width)) # print('height: {}'.format(height)) # print('num_frames: {}'.format(num_frames)) # print('bit_rate: {}k'.format(bit_rate)) # print('fps: {}'.format(fps)) # print('size: {}MB'.format(size)) # print('duration: {}'.format(duration)) return width, height, duration # 获取 搜索词/token @classmethod def get_cookie_token(cls, log_type, text): try: sheet = Feishu.get_values_batch(log_type, "gzh", "pxHL2C") token = sheet[0][1] cookie = sheet[1][1] if text == "cookie": return cookie elif text == "token": return token except Exception as e: Common.logger(log_type).error("get_cookie_token:{}\n", e) # 获取视频下载链接 @classmethod def get_url(cls, log_type, url): try: payload = {} headers = { 'Cookie': 'rewardsn=; wxtokenkey=777' } urllib3.disable_warnings() response = requests.get(url=url, headers=headers, data=payload, verify=False) # Common.logger(log_type).info('gzh_response:{}', response.text) response_list = response.text.splitlines() video_url_list = [] for m in response_list: if "mpvideo.qpic.cn" in m: video_url = m.split("url: '")[1].split("',")[0].replace(r"\x26amp;", "&") video_url_list.append(video_url) # Common.logger(log_type).info('video_url_list:{}\n', video_url_list) if len(video_url_list) == 0: video_url = 0 else: video_url = video_url_list[0] return video_url except Exception as e: Common.logger(log_type).error("get_url异常:{}\n", e) # 获取公众号文章信息,并写入文章列表 @classmethod def get_gzh_url(cls, log_type, username, userid, head_url): while True: try: url = "https://mp.weixin.qq.com/cgi-bin/appmsg?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token="+str(cls.get_cookie_token(log_type, "token"))+"&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': cls.get_cookie_token(log_type, "cookie"), } params = { "action": "list_ex", "begin": str(cls.begin), "count": "5", "fakeid": userid, "type": "9", "query": "", "token": str(cls.get_cookie_token(log_type, "token")), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) cls.begin += 5 if 'app_msg_list' not in r.json() or len(r.json()['app_msg_list']) == 0: Common.logger(log_type).warning("get_gzh_url:response:{}\n", r.text) break else: app_msg_list = r.json()['app_msg_list'] for gzh_url in app_msg_list: # print(gzh_url) # title if 'title' in gzh_url: title = gzh_url['title'] else: title = 0 # aid if 'aid' in gzh_url: aid = gzh_url['aid'] else: aid = 0 # create_time if 'create_time' in gzh_url: create_time = gzh_url['create_time'] else: create_time = 0 # duration if 'duration' in gzh_url: duration = gzh_url['duration'] else: duration = 0 # cover_url if 'cover' in gzh_url: cover_url = gzh_url['cover'] else: cover_url = 0 # gzh_url if 'link' in gzh_url: gzh_url = gzh_url['link'] else: gzh_url = 0 play_cnt = 0 like_cnt = 0 if cls.get_url(log_type, gzh_url) == 0: video_url = 0 else: video_url = cls.get_url(log_type, gzh_url) Common.logger(log_type).info("title:{}", title) Common.logger(log_type).info("aid:{}", aid) Common.logger(log_type).info("create_time:{}", create_time) Common.logger(log_type).info("duration:{}", duration) Common.logger(log_type).info("cover_url:{}", cover_url) Common.logger(log_type).info("gzh_url:{}", gzh_url) Common.logger(log_type).info("video_url:{}", video_url) # 判断无效文章 if gzh_url == 0 or video_url == 0: Common.logger(log_type).info("文章无视频 / 视频地址解析失败\n") elif int(time.time()) - int(create_time) > 3600*24*3: Common.logger(log_type).info( "发布时间{}超过 3 天\n", time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(create_time))) return # 时长判断 elif int(duration) < 60: Common.logger(log_type).info("时长:{}<60秒\n", duration) # 已下载表去重 elif str(aid) in [x for y in Feishu.get_values_batch(log_type, "gzh", "fCs3BT") for x in y]: Common.logger(log_type).info("文章已下载\n") # 文章去重 elif str(aid) in [x for y in Feishu.get_values_batch(log_type, "gzh", "P6GKb3") for x in y]: Common.logger(log_type).info("文章已存在\n") else: # 已抓取文章列表添加当前文章ID cls.gzh_count.append(aid) # 公众号文章表插入行 upload_time = time.time() Feishu.insert_columns(log_type, 'gzh', 'P6GKb3', 'ROWS', 1, 2) # 抓取到的文章写入飞书表 values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), '公众号', title, str(aid), play_cnt, like_cnt, duration, "宽*高", time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(create_time)), username, userid, head_url, cover_url, gzh_url, video_url]] time.sleep(1) Feishu.update_values(log_type, 'gzh', 'P6GKb3', 'F2:W2', values) Common.logger(log_type).info("文章写入文档成功\n") if len(cls.gzh_count) >= 1: Common.logger(log_type).info("当前用户已抓取:{}条数据\n", len(cls.gzh_count)) cls.gzh_count = [] return time.sleep(10) except Exception as e: Common.logger(log_type).error("get_gzh_url异常:{}\n", e) # 下载/上传 @classmethod def download_publish(cls, log_type, env): try: gzh_sheet = Feishu.get_values_batch(log_type, 'gzh', 'P6GKb3') for i in range(1, len(gzh_sheet)): download_title = gzh_sheet[i][7].strip().replace('"', '')\ .replace('“', '').replace('“', '…').replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") download_vid = gzh_sheet[i][8] download_play_cnt = gzh_sheet[i][9] download_like_cnt = gzh_sheet[i][10] download_duration = gzh_sheet[i][11] download_send_time = gzh_sheet[i][13] download_username = gzh_sheet[i][14] download_userid = gzh_sheet[i][15] download_head_url = gzh_sheet[i][16] download_cover_url = gzh_sheet[i][17] download_video_url = gzh_sheet[i][19] download_video_comment_cnt = 0 download_video_share_cnt = 0 Common.logger(log_type).info("download_title:{}", download_title) Common.logger(log_type).info("download_send_time:{}", download_send_time) Common.logger(log_type).info("download_username:{}", download_username) Common.logger(log_type).info("download_video_url:{}", download_video_url) # Common.logger(log_type).info("download_vid:{}", download_vid) # Common.logger(log_type).info("download_play_cnt:{}", download_play_cnt) # Common.logger(log_type).info("download_like_cnt:{}", download_like_cnt) # Common.logger(log_type).info("download_duration:{}", download_duration) # Common.logger(log_type).info("download_userid:{}", download_userid) # Common.logger(log_type).info("download_head_url:{}", download_head_url) # Common.logger(log_type).info("download_cover_url:{}", download_cover_url) # 判断空行 if download_video_url is None or download_title is None: Feishu.dimension_range(log_type, 'gzh', 'P6GKb3', 'ROWS', i+1, i+1) Common.logger(log_type).info("空行,删除成功\n") return # 已下载判断 elif str(download_vid) in [x for y in Feishu.get_values_batch(log_type, 'gzh', 'fCs3BT') for x in y]: Feishu.dimension_range(log_type, 'gzh', 'P6GKb3', 'ROWS', i + 1, i + 1) Common.logger(log_type).info("视频已下载\n") return # 已下载判断 elif str(download_title) in [x for y in Feishu.get_values_batch(log_type, 'gzh', 'fCs3BT') for x in y]: Feishu.dimension_range(log_type, 'gzh', 'P6GKb3', 'ROWS', i + 1, i + 1) Common.logger(log_type).info("视频已下载\n") return else: # 下载封面 Common.download_method(log_type=log_type, text="cover", d_name=str(download_title), d_url=str(download_cover_url)) # 下载视频 Common.download_method(log_type=log_type, text="video", d_name=str(download_title), d_url=str(download_video_url)) # 获取视频宽高 video_info = cls.get_video_info_from_local("./videos/" + download_title + "/video.mp4") download_video_resolution = str(video_info[0]) + "*" + str(video_info[1]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" with open("./videos/" + download_title + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(download_vid) + "\n" + str(download_title) + "\n" + str(int(download_duration)) + "\n" + str(download_play_cnt) + "\n" + str(download_video_comment_cnt) + "\n" + str(download_like_cnt) + "\n" + str(download_video_share_cnt) + "\n" + str(download_video_resolution) + "\n" + str(int(time.mktime( time.strptime(download_send_time, "%Y/%m/%d %H:%M:%S")))) + "\n" + str(download_username) + "\n" + str(download_head_url) + "\n" + str(download_video_url) + "\n" + str(download_cover_url) + "\n" + "gongzhonghao\n") Common.logger(log_type).info("==========视频信息已保存至info.txt==========") # 上传视频 Common.logger(log_type).info("开始上传视频:{}".format(download_title)) our_video_id = Publish.upload_and_publish(log_type, env, "play") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str(our_video_id) + "/info" Common.logger(log_type).info("视频上传完成:{}", download_title) # 保存视频 ID 到云文档 Common.logger(log_type).info("保存视频ID至云文档:{}", download_title) # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "gzh", "fCs3BT", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "公众号", str(download_title), str(download_vid), our_video_link, download_play_cnt, download_like_cnt, download_duration, str(download_video_resolution), str(download_send_time), str(download_username), str(download_userid), str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "gzh", "fCs3BT", "D2:W2", values) # 删除行或列,可选 ROWS、COLUMNS Feishu.dimension_range(log_type, "gzh", "P6GKb3", "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频:{},下载/上传成功\n", download_title) return except Exception as e: Common.logger(log_type).error("download_publish异常:{}\n", e) Feishu.dimension_range(log_type, "gzh", "P6GKb3", "ROWS", 2, 2) # 执行下载/上传 @classmethod def run_download_publish(cls, log_type, env): try: while True: time.sleep(1) if len(Feishu.get_values_batch(log_type, 'gzh', 'P6GKb3')) == 1: Common.logger(log_type).info("下载/上传完成\n") break else: cls.download_publish(log_type, env) except Exception as e: Common.logger(log_type).error("run_download_publish异常:{}\n", e) # 根据关键字搜索 UP 主信息,并写入电影票(勿动) @classmethod def search_user_by_word(cls, log_type, env): try: sheet = Feishu.get_values_batch(log_type, "gzh", "pxHL2C") for i in range(3, len(sheet)): word = sheet[i][0] index = sheet[i][1] url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=1011071554&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': cls.get_cookie_token(log_type, "cookie"), } params = { "action": "search_biz", "begin": "0", "count": "5", "query": word, "token": cls.get_cookie_token(log_type, "token"), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) if "list" not in r.json() or len(r.json()["list"]) == 0: Common.logger(log_type).warning("search_user_by_word,随机休眠 3-15 分钟:{}\n", r.text) time.sleep(random.randint(60 * 3, 60 * 15)) else: fakeid = r.json()["list"][int(index)-1]["fakeid"] head_url = r.json()["list"][int(index)-1]["round_head_img"] time.sleep(0.5) Feishu.update_values(log_type, 'gzh', 'pxHL2C', 'C'+str(i+1)+':C'+str(i+1), [[fakeid]]) Common.logger(log_type).info("{}的fakeid写入飞书成功成功", word) time.sleep(0.5) Feishu.update_values(log_type, 'gzh', 'pxHL2C', 'D'+str(i+1)+':D'+str(i+1), [[head_url]]) Common.logger(log_type).info("{}的头像写入飞书成功\n", word) cls.get_gzh_url(log_type, word, fakeid, head_url) Common.logger(log_type).info("下载/上传 {} 公众号视频\n", word) cls.run_download_publish(log_type, env) Common.logger(log_type).info('{}视频抓取完成,随机休眠 3-15 分钟\n', word) time.sleep(random.randint(60*3, 60*15)) Common.logger(log_type).info("获取所有用户视频完成\n") except Exception as e: Common.logger(log_type).error("search_user_by_word异常:{}\n", e) if __name__ == "__main__": # GZH.search_user_by_word("gzh") # GZH.get_all_gzh('gzh') # GZH.download_publish('gzh', 'dev') # print(GZH.get_cookie_token('gzh', 'token')) GZH.get_gzh_url('gzh', '何静同学', 'MzkyODMzODQ2Mg==', 'http://mmbiz.qpic.cn/mmbiz_png/go7km0I9Dg3NTxRdMs8MIC6DricCibEdH3OVnEFLmspaVB67iaLdje4lCHFsdjqdXpelf5EicPwHfLWibHWCg5R5urg/0?wx_fmt=png')