| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- #!/usr/bin/env python3
- """
- 批量渲染并上传 trace 日志 HTML,然后回写 MySQL。
- 约定:
- - output 目录下每个子目录名为 trace_id
- - 每个 trace_id 目录下有 log 文件(优先 log.txt,其次 run_log_*.txt,再其次任意 *.txt)
- 流程(对每个 trace_id):
- 1) log.txt -> render_log_html.generate_html(...) 生成 HTML 到同目录
- 2) 上传 HTML 到阿里云 OSS,拿到公网 URL
- 3) UPDATE demand_find_content_result.web_html_url = <url> WHERE trace_id = <trace_id>
- 安全默认:
- - 默认只 dry-run 打印候选,不生成/上传/写库
- - 加 --apply 才会执行生成 + 上传 + 写库
- """
- from __future__ import annotations
- import argparse
- import logging
- import os
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Iterable, Optional
- from dotenv import load_dotenv
- logger = logging.getLogger(__name__)
- @dataclass(frozen=True)
- class TraceJob:
- trace_id: str
- trace_dir: Path
- log_path: Path
- html_path: Path
- def _load_env() -> None:
- # 兼容从任意目录运行:优先加载 examples/content_finder/.env
- load_dotenv(override=False)
- load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
- def _resolve_output_dir(output_dir: Optional[str]) -> Path:
- """
- Resolve output directory.
- - If --output-dir provided:
- - absolute path: use it
- - relative path: resolve against current working directory
- - Else:
- - ENV OUTPUT_DIR (absolute/relative-to-cwd)
- - fallback to examples/content_finder/output (script sibling)
- """
- if output_dir is not None and str(output_dir).strip() != "":
- p = Path(output_dir).expanduser()
- return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
- raw_env = (os.getenv("OUTPUT_DIR") or "").strip()
- if raw_env:
- p = Path(raw_env).expanduser()
- return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
- base = Path(__file__).resolve().parent
- return (base / "output").resolve()
- def _iter_trace_dirs(output_dir: Path) -> Iterable[Path]:
- if not output_dir.exists() or not output_dir.is_dir():
- return []
- return (p for p in output_dir.iterdir() if p.is_dir())
- def _pick_log_file(trace_dir: Path) -> Optional[Path]:
- preferred = trace_dir / "log.txt"
- if preferred.exists() and preferred.is_file():
- return preferred
- candidates = sorted(
- trace_dir.glob("run_log_*.txt"),
- key=lambda p: p.stat().st_mtime,
- reverse=True,
- )
- if candidates:
- return candidates[0]
- candidates = sorted(
- trace_dir.glob("*.txt"),
- key=lambda p: p.stat().st_mtime,
- reverse=True,
- )
- if candidates:
- return candidates[0]
- return None
- def _build_job(trace_dir: Path) -> Optional[TraceJob]:
- trace_id = trace_dir.name
- log_path = _pick_log_file(trace_dir)
- if not log_path:
- return None
- html_path = trace_dir / "log.html"
- return TraceJob(
- trace_id=trace_id,
- trace_dir=trace_dir,
- log_path=log_path,
- html_path=html_path,
- )
- def _render_html(job: TraceJob) -> None:
- from render_log_html import (
- COLLAPSE_ALL_FOLDS,
- COLLAPSE_KEYWORDS,
- COLLAPSE_PREFIXES,
- generate_html,
- )
- generate_html(
- input_path=job.log_path,
- output_path=job.html_path,
- collapse_prefixes=COLLAPSE_PREFIXES,
- collapse_keywords=COLLAPSE_KEYWORDS,
- collapse_all=COLLAPSE_ALL_FOLDS,
- )
- def _upload_html(job: TraceJob) -> str:
- from utils.oss_upload import upload_html_to_oss
- # object_key 由 upload_html_to_oss 内部用 prefix + task_id 拼接
- return upload_html_to_oss(job.html_path, task_id=job.trace_id)
- def _update_web_html_url(trace_id: str, url: str) -> int:
- from db import update_web_html_url
- return update_web_html_url(trace_id=trace_id, web_html_url=url)
- def main() -> None:
- _load_env()
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--output-dir",
- default=None,
- help="Output directory containing trace_id subdirectories. Default: examples/content_finder/output",
- )
- parser.add_argument(
- "--apply",
- action="store_true",
- help="Actually generate HTML, upload to OSS, and update MySQL. Without this flag, dry-run only.",
- )
- parser.add_argument(
- "--limit",
- type=int,
- default=0,
- help="Process at most N trace dirs (0 means no limit).",
- )
- args = parser.parse_args()
- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
- output_dir = _resolve_output_dir(args.output_dir)
- trace_dirs = list(_iter_trace_dirs(output_dir))
- jobs = [j for d in trace_dirs if (j := _build_job(d)) is not None]
- jobs = sorted(jobs, key=lambda x: x.trace_dir.stat().st_mtime, reverse=True)
- if args.limit and args.limit > 0:
- jobs = jobs[: args.limit]
- print(f"[output_dir] {output_dir}")
- print(f"[trace_dirs] {len(trace_dirs)}")
- print(f"[jobs] {len(jobs)}")
- if not jobs:
- return
- if not args.apply:
- print("[dry-run] Add --apply to generate+upload+update.")
- for j in jobs:
- print(f"- trace_id={j.trace_id} log={j.log_path.name} -> html={j.html_path.name}")
- return
- ok = 0
- failed = 0
- for job in jobs:
- try:
- _render_html(job)
- url = _upload_html(job)
- rows = _update_web_html_url(trace_id=job.trace_id, url=url)
- print(f"[ok] trace_id={job.trace_id} url={url} rows={rows}")
- ok += 1
- except Exception as e:
- print(f"[failed] trace_id={job.trace_id} err={e}")
- logger.exception("job failed: %s", job.trace_id)
- failed += 1
- print(f"[done] ok={ok} failed={failed}")
- if __name__ == "__main__":
- main()
|