| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- """
- @author: luojunhui
- """
- import hashlib
- import json
- import re
- from typing import Dict, List, Optional
- from datetime import datetime, timezone, date, timedelta
- from requests import RequestException
- from urllib.parse import urlparse, parse_qs
- from tenacity import (
- stop_after_attempt,
- wait_exponential,
- retry_if_exception_type,
- )
- def str_to_md5(strings):
- """
- 字符串转化为 md5 值
- :param strings:
- :return:
- """
- # 将字符串转换为字节
- original_bytes = strings.encode("utf-8")
- # 创建一个md5 hash对象
- md5_hash = hashlib.md5()
- # 更新hash对象,传入原始字节
- md5_hash.update(original_bytes)
- # 获取16进制形式的MD5哈希值
- md5_value = md5_hash.hexdigest()
- return md5_value
- def proxy():
- """
- 快代理
- """
- # 隧道域名:端口号
- tunnel = "j685.kdltps.com:15818"
- # 用户名密码方式
- username = "t16899444538299"
- password = "5w5ersso"
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
- % {"user": username, "pwd": password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
- % {"user": username, "pwd": password, "proxy": tunnel},
- }
- return proxies
- def request_retry(retry_times, min_retry_delay, max_retry_delay):
- """
- :param retry_times:
- :param min_retry_delay:
- :param max_retry_delay:
- """
- common_retry = dict(
- stop=stop_after_attempt(retry_times),
- wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
- retry=retry_if_exception_type((RequestException, TimeoutError)),
- reraise=True, # 重试耗尽后重新抛出异常
- )
- return common_retry
- def yield_batch(data, batch_size):
- """
- 生成批次数据
- :param data:
- :param batch_size:
- :return:
- """
- for i in range(0, len(data), batch_size):
- yield data[i : i + batch_size]
- def extract_root_source_id(path: str) -> dict:
- """
- 提取path参数
- :param path:
- :return:
- """
- params = parse_qs(urlparse(path).query)
- jump_page = params.get("jumpPage", [None])[0]
- if jump_page:
- params2 = parse_qs(jump_page)
- res = {
- "video_id": params2["pages/user-videos?id"][0],
- "root_source_id": params2["rootSourceId"][0],
- }
- return res
- else:
- return {}
- def show_desc_to_sta(show_desc):
- def decode_show_v(show_v):
- """
- :param show_v:
- :return:
- """
- foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8")
- foo = eval(foo)
- return int(foo)
- def decode_show_k(show_k):
- """
- :param show_k:
- :return:
- """
- this_dict = {
- "阅读": "show_view_count", # 文章
- "看过": "show_view_count", # 图文
- "观看": "show_view_count", # 视频
- "赞": "show_like_count",
- "付费": "show_pay_count",
- "赞赏": "show_zs_count",
- }
- if show_k not in this_dict:
- print(f"error from decode_show_k, show_k not found: {show_k}")
- return this_dict.get(show_k, "show_unknown")
- show_desc = show_desc.replace("+", "")
- sta = {}
- for show_kv in show_desc.split("\u2004\u2005"):
- if not show_kv:
- continue
- show_k, show_v = show_kv.split("\u2006")
- k = decode_show_k(show_k)
- v = decode_show_v(show_v)
- sta[k] = v
- res = {
- "show_view_count": sta.get("show_view_count", 0),
- "show_like_count": sta.get("show_like_count", 0),
- "show_pay_count": sta.get("show_pay_count", 0),
- "show_zs_count": sta.get("show_zs_count", 0),
- }
- return res
- def generate_gzh_id(url):
- biz = url.split("biz=")[1].split("&")[0]
- idx = url.split("&idx=")[1].split("&")[0]
- sn = url.split("&sn=")[1].split("&")[0]
- url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
- md5_hash = hashlib.md5()
- md5_hash.update(url_bit)
- md5_value = md5_hash.hexdigest()
- return md5_value
- def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str:
- """
- :param string_format:
- :param timestamp:
- """
- dt_object = (
- datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
- )
- date_string = dt_object.strftime(string_format)
- return date_string
- def days_remaining_in_month():
- # 获取当前日期
- today = date.today()
- # 获取下个月的第一天
- if today.month == 12:
- next_month = today.replace(year=today.year + 1, month=1, day=1)
- else:
- next_month = today.replace(month=today.month + 1, day=1)
- # 计算本月最后一天(下个月第一天减去1天)
- last_day_of_month = next_month - timedelta(days=1)
- # 计算剩余天数
- remaining_days = (last_day_of_month - today).days
- return remaining_days
- def safe_json_parse(text: str) -> Optional[Dict | List]:
- """多层降级解析 JSON:直接解析 → 提取代码块 → 提取 JSON 对象/数组
- 模型有时返回 ```json ... ``` 包裹的文本,或文本中夹杂 markdown 前缀/后缀。
- 先尝试直接解析(最常见路径),失败后逐层降级提取。
- """
- if not text:
- return None
- # 降级 1:直接解析
- try:
- return json.loads(text)
- except (json.JSONDecodeError, TypeError):
- pass
- clean = text.strip()
- # 降级 2:提取最外层 json 代码块 ```json ... ```
- # 优先匹配带语言标注的,再退到任意 code fence
- m = re.search(r"```json\s*(.*?)\s*```", clean, re.DOTALL)
- if m:
- try:
- return json.loads(m.group(1))
- except (json.JSONDecodeError, TypeError):
- pass
- else:
- m = re.search(r"```\s*(.*?)\s*```", clean, re.DOTALL)
- if m:
- try:
- return json.loads(m.group(1))
- except (json.JSONDecodeError, TypeError):
- pass
- # 降级 3:在文本中查找第一个完整 JSON 对象 { ... } 或数组 [ ... ]
- # 逐字符扫描,维护字符串状态机,正确处理内嵌括号和转义引号
- for bracket_pair in [("{}", "{", "}"), ("[]", "[", "]")]:
- opener, closer = bracket_pair[1], bracket_pair[2]
- start = clean.find(opener)
- if start == -1:
- continue
- depth = 0
- in_string = False
- escape_next = False
- for i in range(start, len(clean)):
- ch = clean[i]
- if escape_next:
- escape_next = False
- continue
- if ch == "\\":
- escape_next = True
- continue
- if ch == '"' and not escape_next:
- in_string = not in_string
- continue
- if in_string:
- continue
- if ch == opener:
- depth += 1
- elif ch == closer:
- depth -= 1
- if depth == 0:
- try:
- return json.loads(clean[start : i + 1])
- except (json.JSONDecodeError, TypeError):
- return None
- # 数组或对象未闭合时也尝试下
- try:
- return json.loads(clean[start:])
- except (json.JSONDecodeError, TypeError):
- pass
- return None
|