""" @author: luojunhui """ import re import oss2 import random import string import hashlib import math import statistics from scipy.stats import t from odps import ODPS from datetime import datetime, timezone, date, timedelta from typing import List from requests import RequestException from urllib.parse import urlparse, parse_qs from tenacity import ( stop_after_attempt, wait_exponential, retry_if_exception_type, ) def str_to_md5(strings): """ 字符串转化为 md5 值 :param strings: :return: """ # 将字符串转换为字节 original_bytes = strings.encode("utf-8") # 创建一个md5 hash对象 md5_hash = hashlib.md5() # 更新hash对象,传入原始字节 md5_hash.update(original_bytes) # 获取16进制形式的MD5哈希值 md5_value = md5_hash.hexdigest() return md5_value def proxy(): """ 快代理 """ # 隧道域名:端口号 tunnel = "j685.kdltps.com:15818" # 用户名密码方式 username = "t14070979713487" password = "hqwanfvy" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return proxies def async_proxy(): return { "url": "http://j685.kdltps.com:15818", "username": "t14070979713487", "password": "hqwanfvy", } def request_retry(retry_times, min_retry_delay, max_retry_delay): """ :param retry_times: :param min_retry_delay: :param max_retry_delay: """ common_retry = dict( stop=stop_after_attempt(retry_times), wait=wait_exponential(min=min_retry_delay, max=max_retry_delay), retry=retry_if_exception_type((RequestException, TimeoutError)), reraise=True, # 重试耗尽后重新抛出异常 ) return common_retry def yield_batch(data, batch_size): """ 生成批次数据 :param data: :param batch_size: :return: """ for i in range(0, len(data), batch_size): yield data[i : i + batch_size] def extract_root_source_id(path: str) -> dict: """ 提取path参数 :param path: :return: """ params = parse_qs(urlparse(path).query) jump_page = params.get("jumpPage", [None])[0] if jump_page: params2 = parse_qs(jump_page) res = { "video_id": params2["pages/user-videos?id"][0], "root_source_id": params2["rootSourceId"][0], } return res else: return {} def show_desc_to_sta(show_desc: str): def decode_show_v(show_v: str) -> int: """ 解析数值(全球通用): 支持: - 中文:1.3万 / 2千 / 5亿 - 英文:13k / 2.5m / 1.2b - 混合:1.2万阅读 / 13k views """ if not show_v: return 0 show_v = show_v.strip().lower() # 防止欧洲小数格式:1,3k show_v = show_v.replace(",", ".") # 提取 数字 + 单位 match = re.search(r"(\d+(?:\.\d+)?)([a-z\u4e00-\u9fa5]*)", show_v) if not match: return 0 num = float(match.group(1)) unit = match.group(2) # 中文单位 if "亿" in unit: num *= 1e8 elif "万" in unit: num *= 1e4 elif "千" in unit: num *= 1e3 # 英文单位 elif unit.startswith("k"): num *= 1e3 elif unit.startswith("m"): num *= 1e6 elif unit.startswith("b"): num *= 1e9 return int(num) def decode_show_k(show_k: str) -> str: """ 统一 key(中英文) """ if not show_k: return "show_unknown" show_k = show_k.strip().lower() mapping = { # 中文 "阅读": "show_view_count", "看过": "show_view_count", "观看": "show_view_count", "赞": "show_like_count", "点赞": "show_like_count", "付费": "show_pay_count", "赞赏": "show_zs_count", # 英文 "reads": "show_view_count", "views": "show_view_count", "view": "show_view_count", "likes": "show_like_count", "like": "show_like_count", "payments": "show_pay_count", "paid": "show_pay_count", } return mapping.get(show_k, "show_unknown") # ===== 主逻辑 ===== if not show_desc: return { "show_view_count": 0, "show_like_count": 0, "show_pay_count": 0, "show_zs_count": 0, } # 去掉 + show_desc = show_desc.replace("+", "") sta = {} # 按“组”切分(兼容各种奇怪空格) groups = re.split(r"[\u2004\u2005]+", show_desc) for group in groups: group = group.strip() if not group: continue # 按 key-value 分隔符拆 parts = group.split("\u2006") if len(parts) != 2: continue a, b = parts # 自动判断哪个是数字 if re.search(r"\d", a): show_v, show_k = a, b else: show_k, show_v = a, b k = decode_show_k(show_k) v = decode_show_v(show_v) if k != "show_unknown": sta[k] = v return { "show_view_count": sta.get("show_view_count", 0), "show_like_count": sta.get("show_like_count", 0), "show_pay_count": sta.get("show_pay_count", 0), "show_zs_count": sta.get("show_zs_count", 0), } def generate_gzh_id(url): biz = url.split("biz=")[1].split("&")[0] idx = url.split("&idx=")[1].split("&")[0] sn = url.split("&sn=")[1].split("&")[0] url_bit = "{}-{}-{}".format(biz, idx, sn).encode() md5_hash = hashlib.md5() md5_hash.update(url_bit) md5_value = md5_hash.hexdigest() return md5_value def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str: """ :param string_format: :param timestamp: """ dt_object = ( datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() ) date_string = dt_object.strftime(string_format) return date_string def days_remaining_in_month(): # 获取当前日期 today = date.today() # 获取下个月的第一天 if today.month == 12: next_month = today.replace(year=today.year + 1, month=1, day=1) else: next_month = today.replace(month=today.month + 1, day=1) # 计算本月最后一天(下个月第一天减去1天) last_day_of_month = next_month - timedelta(days=1) # 计算剩余天数 remaining_days = (last_day_of_month - today).days return remaining_days def generate_task_trace_id(): random_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=16)) return f"Task-{datetime.now().strftime('%Y%m%d%H%M%S')}-{random_str}" def ci_lower(data: List[int], conf: float = 0.95) -> float: """ 计算data的置信区间下限 """ if len(data) < 2: raise ValueError("Sample length less than 2") n = len(data) mean = statistics.mean(data) std = statistics.stdev(data) / math.sqrt(n) # t 分位点(左侧):ppf 返回负值 t_left = t.ppf((1 - conf) / 2, df=n - 1) return mean + t_left * std def fetch_from_odps(query): client = ODPS( access_id="LTAIWYUujJAm7CbH", secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P", endpoint="http://service.cn.maxcompute.aliyun.com/api", project="loghubods", ) with client.execute_sql(query).open_reader() as reader: if reader: return [item for item in reader] else: return [] def init_odps_client(): return ODPS( access_id="LTAIWYUujJAm7CbH", secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P", endpoint="http://service.cn.maxcompute.aliyun.com/api", project="loghubods", ) def upload_to_oss(local_video_path, oss_key): """ 把视频上传到 oss :return: """ access_key_id = "LTAIP6x1l3DXfSxm" access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" endpoint = "oss-cn-hangzhou.aliyuncs.com" bucket_name = "art-pubbucket" bucket = oss2.Bucket( oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name ) bucket.put_object_from_file(key=oss_key, filename=local_video_path) return oss_key