""" @author: luojunhui """ import hashlib from datetime import datetime, timezone from requests import RequestException from urllib.parse import urlparse, parse_qs from tenacity import ( stop_after_attempt, wait_exponential, retry_if_exception_type, ) def str_to_md5(strings): """ 字符串转化为 md5 值 :param strings: :return: """ # 将字符串转换为字节 original_bytes = strings.encode("utf-8") # 创建一个md5 hash对象 md5_hash = hashlib.md5() # 更新hash对象,传入原始字节 md5_hash.update(original_bytes) # 获取16进制形式的MD5哈希值 md5_value = md5_hash.hexdigest() return md5_value def proxy(): """ 快代理 """ # 隧道域名:端口号 tunnel = "j685.kdltps.com:15818" # 用户名密码方式 username = "t14070979713487" password = "hqwanfvy" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return proxies def request_retry(retry_times, min_retry_delay, max_retry_delay): """ :param retry_times: :param min_retry_delay: :param max_retry_delay: """ common_retry = dict( stop=stop_after_attempt(retry_times), wait=wait_exponential(min=min_retry_delay, max=max_retry_delay), retry=retry_if_exception_type((RequestException, TimeoutError)), reraise=True, # 重试耗尽后重新抛出异常 ) return common_retry def yield_batch(data, batch_size): """ 生成批次数据 :param data: :param batch_size: :return: """ for i in range(0, len(data), batch_size): yield data[i : i + batch_size] def extract_root_source_id(path: str) -> dict: """ 提取path参数 :param path: :return: """ params = parse_qs(urlparse(path).query) jump_page = params.get("jumpPage", [None])[0] if jump_page: params2 = parse_qs(jump_page) res = { "video_id": params2["pages/user-videos?id"][0], "root_source_id": params2["rootSourceId"][0], } return res else: return {} def show_desc_to_sta(show_desc): def decode_show_v(show_v): """ :param show_v: :return: """ foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8') foo = eval(foo) return int(foo) def decode_show_k(show_k): """ :param show_k: :return: """ this_dict = { '阅读': 'show_view_count', # 文章 '看过': 'show_view_count', # 图文 '观看': 'show_view_count', # 视频 '赞': 'show_like_count', '付费': 'show_pay_count', '赞赏': 'show_zs_count', } if show_k not in this_dict: print(f'error from decode_show_k, show_k not found: {show_k}') return this_dict.get(show_k, 'show_unknown') show_desc = show_desc.replace('+', '') sta = {} for show_kv in show_desc.split('\u2004\u2005'): if not show_kv: continue show_k, show_v = show_kv.split('\u2006') k = decode_show_k(show_k) v = decode_show_v(show_v) sta[k] = v res = { 'show_view_count': sta.get('show_view_count', 0), 'show_like_count': sta.get('show_like_count', 0), 'show_pay_count': sta.get('show_pay_count', 0), 'show_zs_count': sta.get('show_zs_count', 0), } return res def generate_gzh_id(url): biz = url.split("biz=")[1].split("&")[0] idx = url.split("&idx=")[1].split("&")[0] sn = url.split("&sn=")[1].split("&")[0] url_bit = "{}-{}-{}".format(biz, idx, sn).encode() md5_hash = hashlib.md5() md5_hash.update(url_bit) md5_value = md5_hash.hexdigest() return md5_value def timestamp_to_str(timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str: """ :param string_format: :param timestamp: """ dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() date_string = dt_object.strftime(string_format) return date_string