#!/usr/bin/env python3
"""将 run_log 文本渲染为可折叠 HTML 页面。
直接在脚本内修改 INPUT_LOG_PATH / OUTPUT_HTML_PATH 后运行:
python examples/piaoquan_needs/render_log_html.py
"""
from __future__ import annotations
import argparse
import json
import html
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
# 加载 examples/content_finder/.env(不依赖你从哪个目录运行)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
@dataclass
class Node:
title: str | None = None
entries: list[str | "Node"] = field(default_factory=list)
@property
def is_fold(self) -> bool:
return self.title is not None
def parse_log(content: str) -> Node:
root = Node(title=None)
stack: list[Node] = [root]
for raw_line in content.splitlines():
line = raw_line.rstrip("\n")
tag = line.strip()
if tag.startswith("[FOLD:") and tag.endswith("]"):
title = tag[len("[FOLD:") : -1]
node = Node(title=title)
stack[-1].entries.append(node)
stack.append(node)
continue
if tag == "[/FOLD]":
# 容错:遇到多余的 [/FOLD] 时,忽略而不是把它当作正文
if len(stack) > 1:
stack.pop()
continue
stack[-1].entries.append(line)
while len(stack) > 1:
unclosed = stack.pop()
# 容错: 遇到缺失 [/FOLD] 时,保留原有内容,不丢日志
stack[-1].entries.append(unclosed)
return root
DEFAULT_COLLAPSE_PREFIXES = ["🔧", "📥", "📤"]
DEFAULT_COLLAPSE_KEYWORDS = ["调用参数", "返回内容"]
# 工具功能摘要(静态映射,用于日志可视化展示)
TOOL_DESCRIPTION_MAP: dict[str, str] = {
"think_and_plan": "系统化记录思考、计划与下一步行动(只记录不获取新信息)。",
"douyin_search": "通过关键词在抖音上搜索视频内容。",
"douyin_search_tikhub": "通过关键词在抖音上搜索视频内容(Tikhub 接口)。",
"douyin_user_videos": "通过账号/作者 sec_uid 获取其历史作品列表。",
"get_content_fans_portrait": "获取视频点赞用户画像(热点宝),判断 metadata.has_portrait。",
"get_account_fans_portrait": "获取作者粉丝画像(热点宝),用于内容画像缺失兜底。",
"store_results_mysql": "将 output.json 写入 MySQL(作者表与内容表)。",
"create_crawler_plan_by_douyin_content_id": "为入选视频生成 AIGC 爬取计划。",
"create_crawler_plan_by_douyin_account_id": "为入选账号生成 AIGC 爬取计划。",
}
# =========================
# 运行配置(默认从 .env 读取)
# =========================
INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
# 设为 None 则默认生成到输入文件同名 .html
OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None
# 产物输出目录(content_finder 的标准 output 目录)
OUTPUT_DIR = os.getenv("OUTPUT_DIR", ".cache/output")
# 置顶摘要表格数据源(可选)。不填则默认取 input_log 同目录下的 process_trace.json / output.json
PROCESS_TRACE_PATH: str | None = os.getenv("PROCESS_TRACE_PATH") or None
OUTPUT_JSON_PATH: str | None = os.getenv("OUTPUT_JSON_PATH") or None
# 如果未显式指定 PROCESS_TRACE_PATH/OUTPUT_JSON_PATH,且同目录不存在文件,则尝试从该 trace_id 推导 .cache/output/{trace_id}/...
TRACE_ID: str | None = os.getenv("TRACE_ID") or None
# 是否默认折叠所有 [FOLD] 块
COLLAPSE_ALL_FOLDS = False
# 命中这些前缀/关键词的折叠块默认收起
COLLAPSE_PREFIXES = DEFAULT_COLLAPSE_PREFIXES
COLLAPSE_KEYWORDS = DEFAULT_COLLAPSE_KEYWORDS
logger = logging.getLogger(__name__)
def resolve_config_path(path_str: str) -> Path:
"""解析配置中的路径,兼容从项目根目录或脚本目录运行。"""
raw = Path(path_str).expanduser()
if raw.is_absolute():
return raw.resolve()
cwd_candidate = (Path.cwd() / raw).resolve()
if cwd_candidate.exists():
return cwd_candidate
script_dir = Path(__file__).resolve().parent
script_candidate = (script_dir / raw).resolve()
if script_candidate.exists():
return script_candidate
project_root = script_dir.parent.parent
project_candidate = (project_root / raw).resolve()
if project_candidate.exists():
return project_candidate
# 如果都不存在,返回项目根拼接结果,便于报错信息更稳定
return project_candidate
def should_collapse(
title: str,
collapse_prefixes: list[str],
collapse_keywords: list[str],
collapse_all: bool,
) -> bool:
if collapse_all:
return True
if any(title.startswith(prefix) for prefix in collapse_prefixes):
return True
return any(keyword in title for keyword in collapse_keywords)
def render_text_block(lines: list[str]) -> str:
if not lines:
return ""
normalized = lines[:]
while normalized and normalized[0].strip() == "":
normalized.pop(0)
while normalized and normalized[-1].strip() == "":
normalized.pop()
if not normalized:
return ""
compact: list[str] = []
empty_streak = 0
for line in normalized:
if line.strip() == "":
empty_streak += 1
if empty_streak <= 1:
compact.append("")
else:
empty_streak = 0
compact.append(line)
escaped = html.escape("\n".join(compact))
return f'
{escaped}'
def enrich_fold_title(title: str) -> str:
"""为工具调用标题附加工具功能描述。"""
tool_prefix = "🔧 "
if not title.startswith(tool_prefix):
return title
tool_name = title[len(tool_prefix):].strip()
description = TOOL_DESCRIPTION_MAP.get(tool_name)
if not description:
return title
return f"{tool_prefix}{tool_name}({description})"
def render_node(
node: Node,
collapse_prefixes: list[str],
collapse_keywords: list[str],
collapse_all: bool,
) -> str:
parts: list[str] = []
text_buffer: list[str] = []
def flush_text_buffer() -> None:
if text_buffer:
parts.append(render_text_block(text_buffer))
text_buffer.clear()
for entry in node.entries:
if isinstance(entry, str):
text_buffer.append(entry)
continue
child = entry
if child.is_fold:
flush_text_buffer()
title = child.title or ""
is_collapsed = should_collapse(
title=title,
collapse_prefixes=collapse_prefixes,
collapse_keywords=collapse_keywords,
collapse_all=collapse_all,
)
folded_class = "fold tool-fold" if is_collapsed else "fold normal-fold"
open_attr = "" if is_collapsed else " open"
display_title = enrich_fold_title(title)
inner = render_node(
child,
collapse_prefixes=collapse_prefixes,
collapse_keywords=collapse_keywords,
collapse_all=collapse_all,
)
parts.append(
f''
f'{html.escape(display_title)}
'
f"{inner}"
" "
)
flush_text_buffer()
return "".join(parts)
def _safe_str(v: object) -> str:
if v is None:
return ""
if isinstance(v, (str, int, float, bool)):
return str(v)
return json.dumps(v, ensure_ascii=False)
def _truncate(s: str, max_len: int) -> str:
s = s or ""
if len(s) <= max_len:
return s
return s[: max(0, max_len - 1)] + "…"
def _read_json_file(path: Path) -> dict:
return json.loads(path.read_text(encoding="utf-8"))
def _build_aweme_id_to_video_url(output_json_path: Path) -> dict[str, str]:
"""
从 output.json 的 contents[] 构建 {aweme_id: video_url} 映射。
约定:
- output.json 中每条 content 都包含 aweme_id 与 video_url(字符串)
"""
data = _read_json_file(output_json_path)
contents = data.get("contents") or []
if not isinstance(contents, list):
return {}
mapping: dict[str, str] = {}
for item in contents:
if not isinstance(item, dict):
continue
aweme_id = _safe_str(item.get("aweme_id")).strip()
video_url = _safe_str(item.get("video_url")).strip()
if aweme_id and video_url:
mapping[aweme_id] = video_url
return mapping
def _build_process_trace_table_html(*, process_trace_path: Path, output_json_path: Path) -> str:
"""
生成置顶摘要表格。
数据来源:
- process_trace.json: rows[]
- output.json: contents[],按 aweme_id 补齐 video_url
"""
if not process_trace_path.exists() or not output_json_path.exists():
return ""
try:
trace_data = _read_json_file(process_trace_path)
except Exception as e:
logger.warning("read process_trace.json failed: path=%s err=%s", process_trace_path, e)
return ""
rows = trace_data.get("rows") or []
if not isinstance(rows, list) or not rows:
return ""
aweme_to_url: dict[str, str] = {}
try:
aweme_to_url = _build_aweme_id_to_video_url(output_json_path)
except Exception as e:
logger.warning("read output.json failed: path=%s err=%s", output_json_path, e)
headers: list[tuple[str, str]] = [
("input_features", "特征词"),
("aweme_id", "视频id"),
("title", "标题"),
("video_url", "视频链接"),
("author_nickname", "作者"),
("strategy_type", "策略"),
("from_case_point", "参考点"),
("channel", "渠道"),
("search_keyword", "搜索词"),
("decision_basis", "筛选依据"),
("decision_notes", "筛选理由"),
]
def td(text: str, *, muted: bool = False, title: str | None = None) -> str:
klass = "cell muted" if muted else "cell"
title_attr = f' title="{html.escape(title)}"' if title else ""
return f'{html.escape(text)} | '
body_parts: list[str] = []
for r in rows:
if not isinstance(r, dict):
continue
aweme_id = _safe_str(r.get("aweme_id")).strip()
video_url = aweme_to_url.get(aweme_id, "")
values: dict[str, str] = {
"strategy_type": _safe_str(r.get("strategy_type")),
"from_case_point": _safe_str(r.get("from_case_point")),
"search_keyword": _safe_str(r.get("search_keyword")),
"aweme_id": aweme_id,
"title": _safe_str(r.get("title")),
"author_nickname": _safe_str(r.get("author_nickname")),
"channel": _safe_str(r.get("channel")),
"decision_basis": _safe_str(r.get("decision_basis")),
"decision_notes": _safe_str(r.get("decision_notes")),
"input_features": _safe_str(r.get("input_features")),
"video_url": video_url,
}
tds: list[str] = []
for key, _label in headers:
val = values.get(key, "")
if key == "decision_notes":
full = val
val = _truncate(val, 80)
tds.append(td(val, title=full))
continue
if key == "title":
full = val
val = _truncate(val, 60)
tds.append(td(val, title=full))
continue
if key == "video_url":
if video_url:
safe_url = html.escape(video_url, quote=True)
tds.append(
''
f'打开'
" | "
)
else:
tds.append(td("", muted=True))
continue
tds.append(td(val))
body_parts.append("" + "".join(tds) + "
")
if not body_parts:
return ""
thead = "".join(f"{html.escape(label)} | " for _key, label in headers)
return (
''
'
过程追踪摘要
'
f'
{html.escape(process_trace_path.name)}
'
'
'
'
'
f"{thead}
"
f"{''.join(body_parts)}"
"
"
"
"
"
"
)
def build_html(body: str, source_name: str, *, summary_table_html: str = "") -> str:
return f"""
Run Log 可视化 - {html.escape(source_name)}
{summary_table_html}
{body}
"""
def generate_html(
input_path: Path,
output_path: Path,
collapse_prefixes: list[str],
collapse_keywords: list[str],
collapse_all: bool = False,
) -> None:
content = input_path.read_text(encoding="utf-8")
tree = parse_log(content)
body = render_node(
tree,
collapse_prefixes=collapse_prefixes,
collapse_keywords=collapse_keywords,
collapse_all=collapse_all,
)
if PROCESS_TRACE_PATH:
process_trace_path = resolve_config_path(PROCESS_TRACE_PATH)
else:
process_trace_path = input_path.with_name("process_trace.json")
if OUTPUT_JSON_PATH:
output_json_path = resolve_config_path(OUTPUT_JSON_PATH)
else:
output_json_path = input_path.with_name("output.json")
if TRACE_ID and (not process_trace_path.exists() or not output_json_path.exists()):
trace_dir = resolve_config_path(f".cache/output/{TRACE_ID}")
if not process_trace_path.exists():
process_trace_path = trace_dir / "process_trace.json"
if not output_json_path.exists():
output_json_path = trace_dir / "output.json"
summary_table_html = _build_process_trace_table_html(
process_trace_path=process_trace_path,
output_json_path=output_json_path,
)
html_content = build_html(body=body, source_name=input_path.name, summary_table_html=summary_table_html)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(html_content, encoding="utf-8")
def render_log_html_and_upload(*, trace_id: str, log_file_path: Path) -> str | None:
"""
将 log.txt 渲染为 HTML 并上传 OSS。
- 生成文件:与 log.txt 同目录的 log.html
- 上传:使用 utils/oss_upload.upload_html_to_oss
Returns:
上传成功返回公网 URL;失败返回 None(不抛出异常,便于上层不影响主流程)
"""
tid = (trace_id or "").strip()
if not tid:
return None
if not log_file_path.exists():
return None
html_path = log_file_path.with_name("log.html")
try:
generate_html(
input_path=log_file_path,
output_path=html_path,
collapse_prefixes=COLLAPSE_PREFIXES,
collapse_keywords=COLLAPSE_KEYWORDS,
collapse_all=COLLAPSE_ALL_FOLDS,
)
except Exception as e:
logger.warning("render log.html failed: trace_id=%s err=%s", tid, e)
return None
try:
from utils.oss_upload import upload_html_to_oss
url = upload_html_to_oss(html_path, task_id=tid)
# 回写 MySQL:demand_find_content_result.web_html_url
try:
from db import update_web_html_url
update_web_html_url(trace_id=tid, web_html_url=url)
except Exception as e:
logger.warning("update web_html_url failed: trace_id=%s err=%s", tid, e)
return url
except Exception as e:
logger.warning("upload log.html failed: trace_id=%s err=%s", tid, e)
return None
def _resolve_input_log_path_from_trace_id(*, trace_id: str, output_dir: Path) -> Path:
tid = (trace_id or "").strip()
if not tid:
raise ValueError("trace_id is required")
run_dir = (output_dir / tid).resolve()
if not run_dir.exists():
raise FileNotFoundError(f"OUTPUT_DIR 下未找到 trace_id 目录: {run_dir}")
log_path = run_dir / "log.txt"
if log_path.exists():
return log_path
# 兼容:部分任务可能用 run_log_*.txt 命名
candidates = sorted(
run_dir.glob("run_log_*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not candidates:
candidates = sorted(
run_dir.glob("*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not candidates:
raise FileNotFoundError(f"trace_id 目录下未找到可渲染日志文件: {run_dir}")
return candidates[0]
def _resolve_input_log_path_from_input_base(input_base: Path) -> Path:
if input_base.is_file():
return input_base
if input_base.is_dir():
# 优先渲染最新 run_log_*.txt,其次渲染任意 *.txt
candidates = sorted(
input_base.glob("run_log_*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not candidates:
candidates = sorted(
input_base.glob("*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not candidates:
raise FileNotFoundError(f"目录下未找到可渲染日志文件: {input_base}")
return candidates[0]
raise FileNotFoundError(f"输入日志路径不存在: {input_base}")
def main(argv: list[str] | None = None) -> None:
parser = argparse.ArgumentParser(description="Render run log text to collapsible HTML.")
parser.add_argument("--trace-id", dest="trace_id", default="", help="trace_id in OUTPUT_DIR//")
parser.add_argument("trace_id_pos", nargs="?", default="", help="trace_id (positional), same as --trace-id")
args = parser.parse_args(argv)
trace_id = ((args.trace_id or "").strip() or (args.trace_id_pos or "").strip())
if trace_id:
output_dir = resolve_config_path(OUTPUT_DIR)
input_path = _resolve_input_log_path_from_trace_id(trace_id=trace_id, output_dir=output_dir)
output_path = input_path.with_name("log.html")
else:
input_base = resolve_config_path(INPUT_LOG_PATH)
input_path = _resolve_input_log_path_from_input_base(input_base)
if OUTPUT_HTML_PATH:
output_path = resolve_config_path(OUTPUT_HTML_PATH)
else:
output_path = input_path.with_suffix(".html")
generate_html(
input_path=input_path,
output_path=output_path,
collapse_prefixes=COLLAPSE_PREFIXES,
collapse_keywords=COLLAPSE_KEYWORDS,
collapse_all=COLLAPSE_ALL_FOLDS,
)
print(f"HTML 已生成: {output_path}")
if __name__ == "__main__":
main()