123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- """
- @author: luojunhui
- """
- import os
- import re
- import html
- import hashlib
- import threading
- import oss2
- import requests
- from uuid import uuid4
- from datetime import datetime, timezone
- from fake_useragent import FakeUserAgent
- from urllib.parse import urlparse, parse_qs
- class Functions(object):
- """
- functions class
- """
- @classmethod
- def show_desc_to_sta(cls, show_desc):
- """
- :return:
- """
- def decode_show_v(show_v):
- """
- :param show_v:
- :return:
- """
- foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
- foo = eval(foo)
- return int(foo)
- def decode_show_k(show_k):
- """
- :param show_k:
- :return:
- """
- this_dict = {
- '阅读': 'show_view_count', # 文章
- '看过': 'show_view_count', # 图文
- '观看': 'show_view_count', # 视频
- '赞': 'show_like_count',
- '付费': 'show_pay_count',
- '赞赏': 'show_zs_count',
- }
- if show_k not in this_dict:
- print(f'error from decode_show_k, show_k not found: {show_k}')
- return this_dict.get(show_k, 'show_unknown')
- show_desc = show_desc.replace('+', '')
- sta = {}
- for show_kv in show_desc.split('\u2004\u2005'):
- if not show_kv:
- continue
- show_k, show_v = show_kv.split('\u2006')
- k = decode_show_k(show_k)
- v = decode_show_v(show_v)
- sta[k] = v
- res = {
- 'show_view_count': sta.get('show_view_count', 0),
- 'show_like_count': sta.get('show_like_count', 0),
- 'show_pay_count': sta.get('show_pay_count', 0),
- 'show_zs_count': sta.get('show_zs_count', 0),
- }
- return res
- @classmethod
- def generateGzhId(cls, url):
- """
- generate url
- :param url:
- :return:
- """
- biz = url.split("biz=")[1].split("&")[0]
- idx = url.split("&idx=")[1].split("&")[0]
- sn = url.split("&sn=")[1].split("&")[0]
- url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
- md5_hash = hashlib.md5()
- md5_hash.update(url_bit)
- md5_value = md5_hash.hexdigest()
- return md5_value
- @classmethod
- def job_with_thread(cls, job_func):
- """
- 每个任务放到单个线程中
- :param job_func:
- :return:
- """
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
- @classmethod
- def str_to_md5(cls, strings):
- """
- 字符串转化为 md5 值
- :param strings:
- :return:
- """
- # 将字符串转换为字节
- original_bytes = strings.encode('utf-8')
- # 创建一个md5 hash对象
- md5_hash = hashlib.md5()
- # 更新hash对象,传入原始字节
- md5_hash.update(original_bytes)
- # 获取16进制形式的MD5哈希值
- md5_value = md5_hash.hexdigest()
- return md5_value
- @classmethod
- def float_to_percentage(cls, value, decimals=3) -> str:
- """
- 把小数转化为百分数
- :param value:
- :param decimals:
- :return:
- """
- percentage_value = round(value * 100, decimals)
- return "{}%".format(percentage_value)
- @classmethod
- def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int:
- """
- :param string_format:
- :param date_string:
- :return:
- """
- date_obj = datetime.strptime(date_string, string_format)
- timestamp = date_obj.timestamp()
- return int(timestamp)
- @classmethod
- def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
- """
- :param string_format:
- :param timestamp:
- """
- dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
- date_string = dt_object.strftime(string_format)
- return date_string
- @classmethod
- def proxy(cls):
- """
- 快代理
- """
- # 隧道域名:端口号
- tunnel = "j685.kdltps.com:15818"
- # 用户名密码方式
- username = "t14070979713487"
- password = "hqwanfvy"
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
- }
- return proxies
- @classmethod
- def get_video_url(cls, article_url):
- """
- :param article_url:
- :return:
- """
- response = requests.get(
- url=article_url,
- headers={'User-Agent': FakeUserAgent().random},
- # proxies=cls.proxy()
- )
- html_text = response.text
- w = re.search(
- r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
- ).group(1)
- url = html.unescape(
- re.sub(
- r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
- )
- )
- return url
- @classmethod
- def get_source_account(cls, article_url: str) -> dict:
- """
- 获取公众号名称和头像
- :param article_url:
- :return:
- """
- response = requests.get(
- url=article_url,
- headers={'User-Agent': FakeUserAgent().random},
- # proxies=cls.proxy()
- )
- html_text = response.text
- # 正则表达式用于提取 hit_nickname 和 hit_username
- regex_nickname = r"hit_nickname:\s*'([^']+)'"
- regex_username = r"hit_username:\s*'([^']+)'"
- # 提取 hit_nickname 和 hit_username
- nickname = re.search(regex_nickname, html_text)
- username = re.search(regex_username, html_text)
- # 输出提取的结果
- if nickname and username:
- return {
- 'name': nickname.group(1),
- 'gh_id': username.group(1)
- }
- else:
- return {}
- @classmethod
- def download_gzh_video(cls, article_url):
- """
- 下载公众号视频
- :param article_url:
- :return:
- """
- try:
- video_url = cls.get_video_url(article_url)
- except Exception as e:
- return
- save_path = "static/{}.mp4".format(cls.str_to_md5(video_url))
- headers = {
- 'Accept': '*/*',
- 'Accept-Language': 'zh,zh-CN;q=0.9',
- 'Connection': 'keep-alive',
- 'Origin': 'https://mp.weixin.qq.com',
- 'Referer': 'https://mp.weixin.qq.com/',
- 'Sec-Fetch-Dest': 'video',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'cross-site',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
- 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"'
- }
- res = requests.get(video_url, headers=headers)
- with open(save_path, "wb") as f:
- f.write(res.content)
- TEN_KB = 1024 * 10
- if os.path.getsize(save_path) > TEN_KB:
- return save_path
- else:
- return None
- @classmethod
- def upload_to_oss(cls, local_video_path):
- """
- 把视频上传到 oss
- :return:
- """
- oss_video_key = "long_articles/video/" + str(uuid4())
- access_key_id = "LTAIP6x1l3DXfSxm"
- access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
- endpoint = "oss-cn-hangzhou.aliyuncs.com"
- bucket_name = "art-pubbucket"
- bucket = oss2.Bucket(
- oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
- )
- bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
- return oss_video_key
- @classmethod
- def extract_path(cls, path: str):
- """
- 提取path参数
- :param path:
- :return:
- """
- params = parse_qs(urlparse(path).query)
- jump_page = params.get('jumpPage', [None])[0]
- if jump_page:
- params2 = parse_qs(jump_page)
- res = {
- "video_id": params2['pages/user-videos?id'][0],
- "root_source_id": params2['rootSourceId'][0],
- }
- return res
- else:
- return {}
- @classmethod
- def extract_params_from_url(cls, url: str, key: str):
- """
- extract params from url
- """
- params = parse_qs(urlparse(url).query)
- info = params.get(key, [])
- return info[0] if info else None
- @classmethod
- def download_baidu_videos(cls, video_url, save_path):
- """
- :param video_url: baidu video url
- :param save_path: save path
- """
- if os.path.exists(save_path):
- return save_path
- response = requests.get(
- video_url,
- headers={
- 'User-Agent': FakeUserAgent().chrome,
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9"
- }
- )
- with open(save_path, 'wb') as f:
- f.write(response.content)
- TEN_KB = 1024 * 10
- if os.path.getsize(save_path) > TEN_KB:
- return save_path
- else:
- return None
|