""" @author: luojunhui """ import hashlib from datetime import datetime, timezone, date, timedelta from requests import RequestException from urllib.parse import urlparse, parse_qs from tenacity import ( stop_after_attempt, wait_exponential, retry_if_exception_type, ) def str_to_md5(strings): """ 字符串转化为 md5 值 :param strings: :return: """ # 将字符串转换为字节 original_bytes = strings.encode("utf-8") # 创建一个md5 hash对象 md5_hash = hashlib.md5() # 更新hash对象,传入原始字节 md5_hash.update(original_bytes) # 获取16进制形式的MD5哈希值 md5_value = md5_hash.hexdigest() return md5_value def proxy(): """ 快代理 """ # 隧道域名:端口号 tunnel = "j685.kdltps.com:15818" # 用户名密码方式 username = "t14070979713487" password = "hqwanfvy" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return proxies def request_retry(retry_times, min_retry_delay, max_retry_delay): """ :param retry_times: :param min_retry_delay: :param max_retry_delay: """ common_retry = dict( stop=stop_after_attempt(retry_times), wait=wait_exponential(min=min_retry_delay, max=max_retry_delay), retry=retry_if_exception_type((RequestException, TimeoutError)), reraise=True, # 重试耗尽后重新抛出异常 ) return common_retry def yield_batch(data, batch_size): """ 生成批次数据 :param data: :param batch_size: :return: """ for i in range(0, len(data), batch_size): yield data[i : i + batch_size] def extract_root_source_id(path: str) -> dict: """ 提取path参数 :param path: :return: """ params = parse_qs(urlparse(path).query) jump_page = params.get("jumpPage", [None])[0] if jump_page: params2 = parse_qs(jump_page) res = { "video_id": params2["pages/user-videos?id"][0], "root_source_id": params2["rootSourceId"][0], } return res else: return {} def show_desc_to_sta(show_desc): def decode_show_v(show_v): """ :param show_v: :return: """ foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8") foo = eval(foo) return int(foo) def decode_show_k(show_k): """ :param show_k: :return: """ this_dict = { "阅读": "show_view_count", # 文章 "看过": "show_view_count", # 图文 "观看": "show_view_count", # 视频 "赞": "show_like_count", "付费": "show_pay_count", "赞赏": "show_zs_count", } if show_k not in this_dict: print(f"error from decode_show_k, show_k not found: {show_k}") return this_dict.get(show_k, "show_unknown") show_desc = show_desc.replace("+", "") sta = {} for show_kv in show_desc.split("\u2004\u2005"): if not show_kv: continue show_k, show_v = show_kv.split("\u2006") k = decode_show_k(show_k) v = decode_show_v(show_v) sta[k] = v res = { "show_view_count": sta.get("show_view_count", 0), "show_like_count": sta.get("show_like_count", 0), "show_pay_count": sta.get("show_pay_count", 0), "show_zs_count": sta.get("show_zs_count", 0), } return res def generate_gzh_id(url): biz = url.split("biz=")[1].split("&")[0] idx = url.split("&idx=")[1].split("&")[0] sn = url.split("&sn=")[1].split("&")[0] url_bit = "{}-{}-{}".format(biz, idx, sn).encode() md5_hash = hashlib.md5() md5_hash.update(url_bit) md5_value = md5_hash.hexdigest() return md5_value def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str: """ :param string_format: :param timestamp: """ dt_object = ( datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() ) date_string = dt_object.strftime(string_format) return date_string def days_remaining_in_month(): # 获取当前日期 today = date.today() # 获取下个月的第一天 if today.month == 12: next_month = today.replace(year=today.year + 1, month=1, day=1) else: next_month = today.replace(month=today.month + 1, day=1) # 计算本月最后一天(下个月第一天减去1天) last_day_of_month = next_month - timedelta(days=1) # 计算剩余天数 remaining_days = (last_day_of_month - today).days return remaining_days