#!/usr/bin/env python3 """ 批量渲染并上传 trace 日志 HTML,然后回写 MySQL。 约定: - output 目录下每个子目录名为 trace_id - 每个 trace_id 目录下有 log 文件(优先 log.txt,其次 run_log_*.txt,再其次任意 *.txt) 流程(对每个 trace_id): 1) log.txt -> render_log_html.generate_html(...) 生成 HTML 到同目录 2) 上传 HTML 到阿里云 OSS,拿到公网 URL 3) UPDATE demand_find_content_result.web_html_url = WHERE trace_id = 安全默认: - 默认只 dry-run 打印候选,不生成/上传/写库 - 加 --apply 才会执行生成 + 上传 + 写库 """ from __future__ import annotations import argparse import logging import os from dataclasses import dataclass from pathlib import Path from typing import Iterable, Optional from dotenv import load_dotenv logger = logging.getLogger(__name__) @dataclass(frozen=True) class TraceJob: trace_id: str trace_dir: Path log_path: Path html_path: Path def _load_env() -> None: # 兼容从任意目录运行:优先加载 examples/content_finder/.env load_dotenv(override=False) load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) def _resolve_output_dir(output_dir: Optional[str]) -> Path: """ Resolve output directory. - If --output-dir provided: - absolute path: use it - relative path: resolve against current working directory - Else: - ENV OUTPUT_DIR (absolute/relative-to-cwd) - fallback to examples/content_finder/output (script sibling) """ if output_dir is not None and str(output_dir).strip() != "": p = Path(output_dir).expanduser() return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve() raw_env = (os.getenv("OUTPUT_DIR") or "").strip() if raw_env: p = Path(raw_env).expanduser() return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve() base = Path(__file__).resolve().parent return (base / "output").resolve() def _iter_trace_dirs(output_dir: Path) -> Iterable[Path]: if not output_dir.exists() or not output_dir.is_dir(): return [] return (p for p in output_dir.iterdir() if p.is_dir()) def _pick_log_file(trace_dir: Path) -> Optional[Path]: preferred = trace_dir / "log.txt" if preferred.exists() and preferred.is_file(): return preferred candidates = sorted( trace_dir.glob("run_log_*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if candidates: return candidates[0] candidates = sorted( trace_dir.glob("*.txt"), key=lambda p: p.stat().st_mtime, reverse=True, ) if candidates: return candidates[0] return None def _build_job(trace_dir: Path) -> Optional[TraceJob]: trace_id = trace_dir.name log_path = _pick_log_file(trace_dir) if not log_path: return None html_path = trace_dir / "log.html" return TraceJob( trace_id=trace_id, trace_dir=trace_dir, log_path=log_path, html_path=html_path, ) def _render_html(job: TraceJob) -> None: from render_log_html import ( COLLAPSE_ALL_FOLDS, COLLAPSE_KEYWORDS, COLLAPSE_PREFIXES, generate_html, ) generate_html( input_path=job.log_path, output_path=job.html_path, collapse_prefixes=COLLAPSE_PREFIXES, collapse_keywords=COLLAPSE_KEYWORDS, collapse_all=COLLAPSE_ALL_FOLDS, ) def _upload_html(job: TraceJob) -> str: from utils.oss_upload import upload_html_to_oss # object_key 由 upload_html_to_oss 内部用 prefix + task_id 拼接 return upload_html_to_oss(job.html_path, task_id=job.trace_id) def _update_web_html_url(trace_id: str, url: str) -> int: from db import update_web_html_url return update_web_html_url(trace_id=trace_id, web_html_url=url) def main() -> None: _load_env() parser = argparse.ArgumentParser() parser.add_argument( "--output-dir", default=None, help="Output directory containing trace_id subdirectories. Default: examples/content_finder/output", ) parser.add_argument( "--apply", action="store_true", help="Actually generate HTML, upload to OSS, and update MySQL. Without this flag, dry-run only.", ) parser.add_argument( "--limit", type=int, default=0, help="Process at most N trace dirs (0 means no limit).", ) args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") output_dir = _resolve_output_dir(args.output_dir) trace_dirs = list(_iter_trace_dirs(output_dir)) jobs = [j for d in trace_dirs if (j := _build_job(d)) is not None] jobs = sorted(jobs, key=lambda x: x.trace_dir.stat().st_mtime, reverse=True) if args.limit and args.limit > 0: jobs = jobs[: args.limit] print(f"[output_dir] {output_dir}") print(f"[trace_dirs] {len(trace_dirs)}") print(f"[jobs] {len(jobs)}") if not jobs: return if not args.apply: print("[dry-run] Add --apply to generate+upload+update.") for j in jobs: print(f"- trace_id={j.trace_id} log={j.log_path.name} -> html={j.html_path.name}") return ok = 0 failed = 0 for job in jobs: try: _render_html(job) url = _upload_html(job) rows = _update_web_html_url(trace_id=job.trace_id, url=url) print(f"[ok] trace_id={job.trace_id} url={url} rows={rows}") ok += 1 except Exception as e: print(f"[failed] trace_id={job.trace_id} err={e}") logger.exception("job failed: %s", job.trace_id) failed += 1 print(f"[done] ok={ok} failed={failed}") if __name__ == "__main__": main()