#!/usr/bin/env python3 """将 run_log 文本渲染为可折叠 HTML 页面。 直接在脚本内修改 INPUT_LOG_PATH / OUTPUT_HTML_PATH 后运行: python examples/piaoquan_needs/render_log_html.py """ from __future__ import annotations import argparse import json import html import logging import os from dataclasses import dataclass, field from pathlib import Path from dotenv import load_dotenv # 加载 examples/content_finder/.env(不依赖你从哪个目录运行) load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) @dataclass class Node: title: str | None = None entries: list[str | "Node"] = field(default_factory=list) @property def is_fold(self) -> bool: return self.title is not None def parse_log(content: str) -> Node: root = Node(title=None) stack: list[Node] = [root] for raw_line in content.splitlines(): line = raw_line.rstrip("\n") tag = line.strip() if tag.startswith("[FOLD:") and tag.endswith("]"): title = tag[len("[FOLD:") : -1] node = Node(title=title) stack[-1].entries.append(node) stack.append(node) continue if tag == "[/FOLD]": # 容错:遇到多余的 [/FOLD] 时,忽略而不是把它当作正文 if len(stack) > 1: stack.pop() continue stack[-1].entries.append(line) while len(stack) > 1: unclosed = stack.pop() # 容错: 遇到缺失 [/FOLD] 时,保留原有内容,不丢日志 stack[-1].entries.append(unclosed) return root DEFAULT_COLLAPSE_PREFIXES = ["🔧", "📥", "📤"] DEFAULT_COLLAPSE_KEYWORDS = ["调用参数", "返回内容"] # 工具功能摘要(静态映射,用于日志可视化展示) TOOL_DESCRIPTION_MAP: dict[str, str] = { "think_and_plan": "系统化记录思考、计划与下一步行动(只记录不获取新信息)。", "douyin_search": "通过关键词在抖音上搜索视频内容。", "douyin_search_tikhub": "通过关键词在抖音上搜索视频内容(Tikhub 接口)。", "douyin_user_videos": "通过账号/作者 sec_uid 获取其历史作品列表。", "get_content_fans_portrait": "获取视频点赞用户画像(热点宝),判断 metadata.has_portrait。", "get_account_fans_portrait": "获取作者粉丝画像(热点宝),用于内容画像缺失兜底。", "store_results_mysql": "将 output.json 写入 MySQL(作者表与内容表)。", "create_crawler_plan_by_douyin_content_id": "为入选视频生成 AIGC 爬取计划。", "create_crawler_plan_by_douyin_account_id": "为入选账号生成 AIGC 爬取计划。", } # ========================= # 运行配置(默认从 .env 读取) # ========================= INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log") # 设为 None 则默认生成到输入文件同名 .html OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None # 产物输出目录(content_finder 的标准 output 目录) OUTPUT_DIR = os.getenv("OUTPUT_DIR", ".cache/output") # 置顶摘要表格数据源(可选)。不填则默认取 input_log 同目录下的 process_trace.json / output.json PROCESS_TRACE_PATH: str | None = os.getenv("PROCESS_TRACE_PATH") or None OUTPUT_JSON_PATH: str | None = os.getenv("OUTPUT_JSON_PATH") or None # 如果未显式指定 PROCESS_TRACE_PATH/OUTPUT_JSON_PATH,且同目录不存在文件,则尝试从该 trace_id 推导 .cache/output/{trace_id}/... TRACE_ID: str | None = os.getenv("TRACE_ID") or None # 是否默认折叠所有 [FOLD] 块 COLLAPSE_ALL_FOLDS = False # 命中这些前缀/关键词的折叠块默认收起 COLLAPSE_PREFIXES = DEFAULT_COLLAPSE_PREFIXES COLLAPSE_KEYWORDS = DEFAULT_COLLAPSE_KEYWORDS logger = logging.getLogger(__name__) def resolve_config_path(path_str: str) -> Path: """解析配置中的路径,兼容从项目根目录或脚本目录运行。""" raw = Path(path_str).expanduser() if raw.is_absolute(): return raw.resolve() cwd_candidate = (Path.cwd() / raw).resolve() if cwd_candidate.exists(): return cwd_candidate script_dir = Path(__file__).resolve().parent script_candidate = (script_dir / raw).resolve() if script_candidate.exists(): return script_candidate project_root = script_dir.parent.parent project_candidate = (project_root / raw).resolve() if project_candidate.exists(): return project_candidate # 如果都不存在,返回项目根拼接结果,便于报错信息更稳定 return project_candidate def should_collapse( title: str, collapse_prefixes: list[str], collapse_keywords: list[str], collapse_all: bool, ) -> bool: if collapse_all: return True if any(title.startswith(prefix) for prefix in collapse_prefixes): return True return any(keyword in title for keyword in collapse_keywords) def render_text_block(lines: list[str]) -> str: if not lines: return "" normalized = lines[:] while normalized and normalized[0].strip() == "": normalized.pop(0) while normalized and normalized[-1].strip() == "": normalized.pop() if not normalized: return "" compact: list[str] = [] empty_streak = 0 for line in normalized: if line.strip() == "": empty_streak += 1 if empty_streak <= 1: compact.append("") else: empty_streak = 0 compact.append(line) escaped = html.escape("\n".join(compact)) return f'
{escaped}
' def enrich_fold_title(title: str) -> str: """为工具调用标题附加工具功能描述。""" tool_prefix = "🔧 " if not title.startswith(tool_prefix): return title tool_name = title[len(tool_prefix):].strip() description = TOOL_DESCRIPTION_MAP.get(tool_name) if not description: return title return f"{tool_prefix}{tool_name}({description})" def render_node( node: Node, collapse_prefixes: list[str], collapse_keywords: list[str], collapse_all: bool, ) -> str: parts: list[str] = [] text_buffer: list[str] = [] def flush_text_buffer() -> None: if text_buffer: parts.append(render_text_block(text_buffer)) text_buffer.clear() for entry in node.entries: if isinstance(entry, str): text_buffer.append(entry) continue child = entry if child.is_fold: flush_text_buffer() title = child.title or "" is_collapsed = should_collapse( title=title, collapse_prefixes=collapse_prefixes, collapse_keywords=collapse_keywords, collapse_all=collapse_all, ) folded_class = "fold tool-fold" if is_collapsed else "fold normal-fold" open_attr = "" if is_collapsed else " open" display_title = enrich_fold_title(title) inner = render_node( child, collapse_prefixes=collapse_prefixes, collapse_keywords=collapse_keywords, collapse_all=collapse_all, ) parts.append( f'
' f'{html.escape(display_title)}' f"{inner}" "
" ) flush_text_buffer() return "".join(parts) def _safe_str(v: object) -> str: if v is None: return "" if isinstance(v, (str, int, float, bool)): return str(v) return json.dumps(v, ensure_ascii=False) def _truncate(s: str, max_len: int) -> str: s = s or "" if len(s) <= max_len: return s return s[: max(0, max_len - 1)] + "…" def _read_json_file(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def _build_aweme_id_to_video_url(output_json_path: Path) -> dict[str, str]: """ 从 output.json 的 contents[] 构建 {aweme_id: video_url} 映射。 约定: - output.json 中每条 content 都包含 aweme_id 与 video_url(字符串) """ data = _read_json_file(output_json_path) contents = data.get("contents") or [] if not isinstance(contents, list): return {} mapping: dict[str, str] = {} for item in contents: if not isinstance(item, dict): continue aweme_id = _safe_str(item.get("aweme_id")).strip() video_url = _safe_str(item.get("video_url")).strip() if aweme_id and video_url: mapping[aweme_id] = video_url return mapping def _build_process_trace_table_html(*, process_trace_path: Path, output_json_path: Path) -> str: """ 生成置顶摘要表格。 数据来源: - process_trace.json: rows[] - output.json: contents[],按 aweme_id 补齐 video_url """ if not process_trace_path.exists() or not output_json_path.exists(): return "" try: trace_data = _read_json_file(process_trace_path) except Exception as e: logger.warning("read process_trace.json failed: path=%s err=%s", process_trace_path, e) return "" rows = trace_data.get("rows") or [] if not isinstance(rows, list) or not rows: return "" aweme_to_url: dict[str, str] = {} try: aweme_to_url = _build_aweme_id_to_video_url(output_json_path) except Exception as e: logger.warning("read output.json failed: path=%s err=%s", output_json_path, e) headers: list[tuple[str, str]] = [ ("input_features", "特征词"), ("aweme_id", "视频id"), ("title", "标题"), ("video_url", "视频链接"), ("author_nickname", "作者"), ("strategy_type", "策略"), ("from_case_point", "参考点"), ("channel", "渠道"), ("search_keyword", "搜索词"), ("decision_basis", "筛选依据"), ("decision_notes", "筛选理由"), ] def td(text: str, *, muted: bool = False, title: str | None = None) -> str: klass = "cell muted" if muted else "cell" title_attr = f' title="{html.escape(title)}"' if title else "" return f'{html.escape(text)}' body_parts: list[str] = [] for r in rows: if not isinstance(r, dict): continue aweme_id = _safe_str(r.get("aweme_id")).strip() video_url = aweme_to_url.get(aweme_id, "") values: dict[str, str] = { "strategy_type": _safe_str(r.get("strategy_type")), "from_case_point": _safe_str(r.get("from_case_point")), "search_keyword": _safe_str(r.get("search_keyword")), "aweme_id": aweme_id, "title": _safe_str(r.get("title")), "author_nickname": _safe_str(r.get("author_nickname")), "channel": _safe_str(r.get("channel")), "decision_basis": _safe_str(r.get("decision_basis")), "decision_notes": _safe_str(r.get("decision_notes")), "input_features": _safe_str(r.get("input_features")), "video_url": video_url, } tds: list[str] = [] for key, _label in headers: val = values.get(key, "") if key == "decision_notes": full = val val = _truncate(val, 80) tds.append(td(val, title=full)) continue if key == "title": full = val val = _truncate(val, 60) tds.append(td(val, title=full)) continue if key == "video_url": if video_url: safe_url = html.escape(video_url, quote=True) tds.append( '' f'打开' "" ) else: tds.append(td("", muted=True)) continue tds.append(td(val)) body_parts.append("" + "".join(tds) + "") if not body_parts: return "" thead = "".join(f"{html.escape(label)}" for _key, label in headers) return ( '
' '
过程追踪摘要
' f'
{html.escape(process_trace_path.name)}
' '
' '' f"{thead}" f"{''.join(body_parts)}" "
" "
" "
" ) def build_html(body: str, source_name: str, *, summary_table_html: str = "") -> str: return f""" Run Log 可视化 - {html.escape(source_name)}
Run Log 可视化
{html.escape(source_name)}
{summary_table_html}
{body}
""" def generate_html( input_path: Path, output_path: Path, collapse_prefixes: list[str], collapse_keywords: list[str], collapse_all: bool = False, ) -> None: content = input_path.read_text(encoding="utf-8") tree = parse_log(content) body = render_node( tree, collapse_prefixes=collapse_prefixes, collapse_keywords=collapse_keywords, collapse_all=collapse_all, ) if PROCESS_TRACE_PATH: process_trace_path = resolve_config_path(PROCESS_TRACE_PATH) else: process_trace_path = input_path.with_name("process_trace.json") if OUTPUT_JSON_PATH: output_json_path = resolve_config_path(OUTPUT_JSON_PATH) else: output_json_path = input_path.with_name("output.json") if TRACE_ID and (not process_trace_path.exists() or not output_json_path.exists()): trace_dir = resolve_config_path(f".cache/output/{TRACE_ID}") if not process_trace_path.exists(): process_trace_path = trace_dir / "process_trace.json" if not output_json_path.exists(): output_json_path = trace_dir / "output.json" summary_table_html = _build_process_trace_table_html( process_trace_path=process_trace_path, output_json_path=output_json_path, ) html_content = build_html(body=body, source_name=input_path.name, summary_table_html=summary_table_html) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(html_content, encoding="utf-8") def render_log_html_and_upload(*, trace_id: str, log_file_path: Path) -> str | None: """ 将 log.txt 渲染为 HTML 并上传 OSS。 - 生成文件:与 log.txt 同目录的 log.html - 上传:使用 utils/oss_upload.upload_html_to_oss Returns: 上传成功返回公网 URL;失败返回 None(不抛出异常,便于上层不影响主流程) """ tid = (trace_id or "").strip() if not tid: return None if not log_file_path.exists(): return None html_path = log_file_path.with_name("log.html") try: generate_html( input_path=log_file_path, output_path=html_path, collapse_prefixes=COLLAPSE_PREFIXES, collapse_keywords=COLLAPSE_KEYWORDS, collapse_all=COLLAPSE_ALL_FOLDS, ) except Exception as e: logger.warning("render log.html failed: trace_id=%s err=%s", tid, e) return None try: from utils.oss_upload import upload_html_to_oss url = upload_html_to_oss(html_path, task_id=tid) # 回写 MySQL:demand_find_content_result.web_html_url try: from db import update_web_html_url update_web_html_url(trace_id=tid, web_html_url=url) except Exception as e: logger.warning("update web_html_url failed: trace_id=%s err=%s", tid, e) return url except Exception as e: logger.warning("upload log.html failed: trace_id=%s err=%s", tid, e) return None def _resolve_input_log_path_from_trace_id(*, trace_id: str, output_dir: Path) -> Path: tid = (trace_id or "").strip() if not tid: raise ValueError("trace_id is required") run_dir = (output_dir / tid).resolve() if not run_dir.exists(): raise FileNotFoundError(f"OUTPUT_DIR 下未找到 trace_id 目录: {run_dir}") log_path = run_dir / "log.txt" if log_path.exists(): return log_path # 兼容:部分任务可能用 run_log_*.txt 命名 candidates = sorted( run_dir.glob("run_log_*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if not candidates: candidates = sorted( run_dir.glob("*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if not candidates: raise FileNotFoundError(f"trace_id 目录下未找到可渲染日志文件: {run_dir}") return candidates[0] def _resolve_input_log_path_from_input_base(input_base: Path) -> Path: if input_base.is_file(): return input_base if input_base.is_dir(): # 优先渲染最新 run_log_*.txt,其次渲染任意 *.txt candidates = sorted( input_base.glob("run_log_*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if not candidates: candidates = sorted( input_base.glob("*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if not candidates: raise FileNotFoundError(f"目录下未找到可渲染日志文件: {input_base}") return candidates[0] raise FileNotFoundError(f"输入日志路径不存在: {input_base}") def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser(description="Render run log text to collapsible HTML.") parser.add_argument("--trace-id", dest="trace_id", default="", help="trace_id in OUTPUT_DIR//") parser.add_argument("trace_id_pos", nargs="?", default="", help="trace_id (positional), same as --trace-id") args = parser.parse_args(argv) trace_id = ((args.trace_id or "").strip() or (args.trace_id_pos or "").strip()) if trace_id: output_dir = resolve_config_path(OUTPUT_DIR) input_path = _resolve_input_log_path_from_trace_id(trace_id=trace_id, output_dir=output_dir) output_path = input_path.with_name("log.html") else: input_base = resolve_config_path(INPUT_LOG_PATH) input_path = _resolve_input_log_path_from_input_base(input_base) if OUTPUT_HTML_PATH: output_path = resolve_config_path(OUTPUT_HTML_PATH) else: output_path = input_path.with_suffix(".html") generate_html( input_path=input_path, output_path=output_path, collapse_prefixes=COLLAPSE_PREFIXES, collapse_keywords=COLLAPSE_KEYWORDS, collapse_all=COLLAPSE_ALL_FOLDS, ) print(f"HTML 已生成: {output_path}") if __name__ == "__main__": main()