#!/usr/bin/env python3
"""
批量渲染并上传 trace 日志 HTML,然后回写 MySQL。
约定:
- output 目录下每个子目录名为 trace_id
- 每个 trace_id 目录下有 log 文件(优先 log.txt,其次 run_log_*.txt,再其次任意 *.txt)
流程(对每个 trace_id):
1) log.txt -> render_log_html.generate_html(...) 生成 HTML 到同目录
2) 上传 HTML 到阿里云 OSS,拿到公网 URL
3) UPDATE demand_find_content_result.web_html_url = WHERE trace_id =
安全默认:
- 默认只 dry-run 打印候选,不生成/上传/写库
- 加 --apply 才会执行生成 + 上传 + 写库
"""
from __future__ import annotations
import argparse
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class TraceJob:
trace_id: str
trace_dir: Path
log_path: Path
html_path: Path
def _load_env() -> None:
# 兼容从任意目录运行:优先加载 examples/content_finder/.env
load_dotenv(override=False)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
def _resolve_output_dir(output_dir: Optional[str]) -> Path:
"""
Resolve output directory.
- If --output-dir provided:
- absolute path: use it
- relative path: resolve against current working directory
- Else:
- ENV OUTPUT_DIR (absolute/relative-to-cwd)
- fallback to examples/content_finder/output (script sibling)
"""
if output_dir is not None and str(output_dir).strip() != "":
p = Path(output_dir).expanduser()
return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
raw_env = (os.getenv("OUTPUT_DIR") or "").strip()
if raw_env:
p = Path(raw_env).expanduser()
return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
base = Path(__file__).resolve().parent
return (base / "output").resolve()
def _iter_trace_dirs(output_dir: Path) -> Iterable[Path]:
if not output_dir.exists() or not output_dir.is_dir():
return []
return (p for p in output_dir.iterdir() if p.is_dir())
def _pick_log_file(trace_dir: Path) -> Optional[Path]:
preferred = trace_dir / "log.txt"
if preferred.exists() and preferred.is_file():
return preferred
candidates = sorted(
trace_dir.glob("run_log_*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if candidates:
return candidates[0]
candidates = sorted(
trace_dir.glob("*.txt"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if candidates:
return candidates[0]
return None
def _build_job(trace_dir: Path) -> Optional[TraceJob]:
trace_id = trace_dir.name
log_path = _pick_log_file(trace_dir)
if not log_path:
return None
html_path = trace_dir / "log.html"
return TraceJob(
trace_id=trace_id,
trace_dir=trace_dir,
log_path=log_path,
html_path=html_path,
)
def _render_html(job: TraceJob) -> None:
from render_log_html import (
COLLAPSE_ALL_FOLDS,
COLLAPSE_KEYWORDS,
COLLAPSE_PREFIXES,
generate_html,
)
generate_html(
input_path=job.log_path,
output_path=job.html_path,
collapse_prefixes=COLLAPSE_PREFIXES,
collapse_keywords=COLLAPSE_KEYWORDS,
collapse_all=COLLAPSE_ALL_FOLDS,
)
def _upload_html(job: TraceJob) -> str:
from utils.oss_upload import upload_html_to_oss
# object_key 由 upload_html_to_oss 内部用 prefix + task_id 拼接
return upload_html_to_oss(job.html_path, task_id=job.trace_id)
def _update_web_html_url(trace_id: str, url: str) -> int:
from db import update_web_html_url
return update_web_html_url(trace_id=trace_id, web_html_url=url)
def main() -> None:
_load_env()
parser = argparse.ArgumentParser()
parser.add_argument(
"--output-dir",
default=None,
help="Output directory containing trace_id subdirectories. Default: examples/content_finder/output",
)
parser.add_argument(
"--apply",
action="store_true",
help="Actually generate HTML, upload to OSS, and update MySQL. Without this flag, dry-run only.",
)
parser.add_argument(
"--limit",
type=int,
default=0,
help="Process at most N trace dirs (0 means no limit).",
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
output_dir = _resolve_output_dir(args.output_dir)
trace_dirs = list(_iter_trace_dirs(output_dir))
jobs = [j for d in trace_dirs if (j := _build_job(d)) is not None]
jobs = sorted(jobs, key=lambda x: x.trace_dir.stat().st_mtime, reverse=True)
if args.limit and args.limit > 0:
jobs = jobs[: args.limit]
print(f"[output_dir] {output_dir}")
print(f"[trace_dirs] {len(trace_dirs)}")
print(f"[jobs] {len(jobs)}")
if not jobs:
return
if not args.apply:
print("[dry-run] Add --apply to generate+upload+update.")
for j in jobs:
print(f"- trace_id={j.trace_id} log={j.log_path.name} -> html={j.html_path.name}")
return
ok = 0
failed = 0
for job in jobs:
try:
_render_html(job)
url = _upload_html(job)
rows = _update_web_html_url(trace_id=job.trace_id, url=url)
print(f"[ok] trace_id={job.trace_id} url={url} rows={rows}")
ok += 1
except Exception as e:
print(f"[failed] trace_id={job.trace_id} err={e}")
logger.exception("job failed: %s", job.trace_id)
failed += 1
print(f"[done] ok={ok} failed={failed}")
if __name__ == "__main__":
main()