""" @author: luojunhui """ import os import re import html import hashlib import threading import oss2 import requests from uuid import uuid4 from datetime import datetime, timezone from fake_useragent import FakeUserAgent from urllib.parse import urlparse, parse_qs class Functions(object): """ functions class """ @classmethod def show_desc_to_sta(cls, show_desc): """ :return: """ def decode_show_v(show_v): """ :param show_v: :return: """ foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8') foo = eval(foo) return int(foo) def decode_show_k(show_k): """ :param show_k: :return: """ this_dict = { '阅读': 'show_view_count', # 文章 '看过': 'show_view_count', # 图文 '观看': 'show_view_count', # 视频 '赞': 'show_like_count', '付费': 'show_pay_count', '赞赏': 'show_zs_count', } if show_k not in this_dict: print(f'error from decode_show_k, show_k not found: {show_k}') return this_dict.get(show_k, 'show_unknown') show_desc = show_desc.replace('+', '') sta = {} for show_kv in show_desc.split('\u2004\u2005'): if not show_kv: continue show_k, show_v = show_kv.split('\u2006') k = decode_show_k(show_k) v = decode_show_v(show_v) sta[k] = v res = { 'show_view_count': sta.get('show_view_count', 0), 'show_like_count': sta.get('show_like_count', 0), 'show_pay_count': sta.get('show_pay_count', 0), 'show_zs_count': sta.get('show_zs_count', 0), } return res @classmethod def generateGzhId(cls, url): """ generate url :param url: :return: """ biz = url.split("biz=")[1].split("&")[0] idx = url.split("&idx=")[1].split("&")[0] sn = url.split("&sn=")[1].split("&")[0] url_bit = "{}-{}-{}".format(biz, idx, sn).encode() md5_hash = hashlib.md5() md5_hash.update(url_bit) md5_value = md5_hash.hexdigest() return md5_value @classmethod def job_with_thread(cls, job_func): """ 每个任务放到单个线程中 :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() @classmethod def str_to_md5(cls, strings): """ 字符串转化为 md5 值 :param strings: :return: """ # 将字符串转换为字节 original_bytes = strings.encode('utf-8') # 创建一个md5 hash对象 md5_hash = hashlib.md5() # 更新hash对象,传入原始字节 md5_hash.update(original_bytes) # 获取16进制形式的MD5哈希值 md5_value = md5_hash.hexdigest() return md5_value @classmethod def float_to_percentage(cls, value, decimals=3) -> str: """ 把小数转化为百分数 :param value: :param decimals: :return: """ percentage_value = round(value * 100, decimals) return "{}%".format(percentage_value) @classmethod def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int: """ :param string_format: :param date_string: :return: """ date_obj = datetime.strptime(date_string, string_format) timestamp = date_obj.timestamp() return int(timestamp) @classmethod def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str: """ :param string_format: :param timestamp: """ dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() date_string = dt_object.strftime(string_format) return date_string @classmethod def proxy(cls): """ 快代理 """ # 隧道域名:端口号 tunnel = "j685.kdltps.com:15818" # 用户名密码方式 username = "t14070979713487" password = "hqwanfvy" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel} } return proxies @classmethod def get_video_url(cls, article_url): """ :param article_url: :return: """ response = requests.get( url=article_url, headers={'User-Agent': FakeUserAgent().random}, # proxies=cls.proxy() ) html_text = response.text w = re.search( r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M ).group(1) url = html.unescape( re.sub( r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w ) ) return url @classmethod def get_source_account(cls, article_url: str) -> dict: """ 获取公众号名称和头像 :param article_url: :return: """ response = requests.get( url=article_url, headers={'User-Agent': FakeUserAgent().random}, # proxies=cls.proxy() ) html_text = response.text # 正则表达式用于提取 hit_nickname 和 hit_username regex_nickname = r"hit_nickname:\s*'([^']+)'" regex_username = r"hit_username:\s*'([^']+)'" # 提取 hit_nickname 和 hit_username nickname = re.search(regex_nickname, html_text) username = re.search(regex_username, html_text) # 输出提取的结果 if nickname and username: return { 'name': nickname.group(1), 'gh_id': username.group(1) } else: return {} @classmethod def download_gzh_video(cls, article_url): """ 下载公众号视频 :param article_url: :return: """ try: video_url = cls.get_video_url(article_url) except Exception as e: return save_path = "static/{}.mp4".format(cls.str_to_md5(video_url)) headers = { 'Accept': '*/*', 'Accept-Language': 'zh,zh-CN;q=0.9', 'Connection': 'keep-alive', 'Origin': 'https://mp.weixin.qq.com', 'Referer': 'https://mp.weixin.qq.com/', 'Sec-Fetch-Dest': 'video', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36', 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } res = requests.get(video_url, headers=headers) with open(save_path, "wb") as f: f.write(res.content) TEN_KB = 1024 * 10 if os.path.getsize(save_path) > TEN_KB: return save_path else: return None @classmethod def upload_to_oss(cls, local_video_path): """ 把视频上传到 oss :return: """ oss_video_key = "long_articles/video/" + str(uuid4()) access_key_id = "LTAIP6x1l3DXfSxm" access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" endpoint = "oss-cn-hangzhou.aliyuncs.com" bucket_name = "art-pubbucket" bucket = oss2.Bucket( oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name ) bucket.put_object_from_file(key=oss_video_key, filename=local_video_path) return oss_video_key @classmethod def extract_path(cls, path: str): """ 提取path参数 :param path: :return: """ params = parse_qs(urlparse(path).query) jump_page = params.get('jumpPage', [None])[0] if jump_page: params2 = parse_qs(jump_page) res = { "video_id": params2['pages/user-videos?id'][0], "root_source_id": params2['rootSourceId'][0], } return res else: return {} @classmethod def extract_params_from_url(cls, url: str, key: str): """ extract params from url """ params = parse_qs(urlparse(url).query) info = params.get(key, []) return info[0] if info else None @classmethod def download_baidu_videos(cls, video_url, save_path): """ :param video_url: baidu video url :param save_path: save path """ if os.path.exists(save_path): return save_path response = requests.get( video_url, headers={ 'User-Agent': FakeUserAgent().chrome, "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9" } ) with open(save_path, 'wb') as f: f.write(response.content) TEN_KB = 1024 * 10 if os.path.getsize(save_path) > TEN_KB: return save_path else: return None