Просмотр исходного кода

feat: 增加可视化自动上传并落库

jihuaqiang 15 часов назад
Родитель
Сommit
78c626d063

+ 208 - 0
examples/content_finder/build_html.py

@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+"""
+批量渲染并上传 trace 日志 HTML,然后回写 MySQL。
+
+约定:
+- output 目录下每个子目录名为 trace_id
+- 每个 trace_id 目录下有 log 文件(优先 log.txt,其次 run_log_*.txt,再其次任意 *.txt)
+
+流程(对每个 trace_id):
+1) log.txt -> render_log_html.generate_html(...) 生成 HTML 到同目录
+2) 上传 HTML 到阿里云 OSS,拿到公网 URL
+3) UPDATE demand_find_content_result.web_html_url = <url> WHERE trace_id = <trace_id>
+
+安全默认:
+- 默认只 dry-run 打印候选,不生成/上传/写库
+- 加 --apply 才会执行生成 + 上传 + 写库
+"""
+
+from __future__ import annotations
+
+import argparse
+import logging
+import os
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Iterable, Optional
+
+from dotenv import load_dotenv
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True)
+class TraceJob:
+    trace_id: str
+    trace_dir: Path
+    log_path: Path
+    html_path: Path
+
+
+def _load_env() -> None:
+    # 兼容从任意目录运行:优先加载 examples/content_finder/.env
+    load_dotenv(override=False)
+    load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
+
+
+def _resolve_output_dir(output_dir: Optional[str]) -> Path:
+    """
+    Resolve output directory.
+
+    - If --output-dir provided:
+      - absolute path: use it
+      - relative path: resolve against current working directory
+    - Else:
+      - ENV OUTPUT_DIR (absolute/relative-to-cwd)
+      - fallback to examples/content_finder/output (script sibling)
+    """
+    if output_dir is not None and str(output_dir).strip() != "":
+        p = Path(output_dir).expanduser()
+        return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
+
+    raw_env = (os.getenv("OUTPUT_DIR") or "").strip()
+    if raw_env:
+        p = Path(raw_env).expanduser()
+        return p.resolve() if p.is_absolute() else (Path.cwd() / p).resolve()
+
+    base = Path(__file__).resolve().parent
+    return (base / "output").resolve()
+
+
+def _iter_trace_dirs(output_dir: Path) -> Iterable[Path]:
+    if not output_dir.exists() or not output_dir.is_dir():
+        return []
+    return (p for p in output_dir.iterdir() if p.is_dir())
+
+
+def _pick_log_file(trace_dir: Path) -> Optional[Path]:
+    preferred = trace_dir / "log.txt"
+    if preferred.exists() and preferred.is_file():
+        return preferred
+
+    candidates = sorted(
+        trace_dir.glob("run_log_*.txt"),
+        key=lambda p: p.stat().st_mtime,
+        reverse=True,
+    )
+    if candidates:
+        return candidates[0]
+
+    candidates = sorted(
+        trace_dir.glob("*.txt"),
+        key=lambda p: p.stat().st_mtime,
+        reverse=True,
+    )
+    if candidates:
+        return candidates[0]
+
+    return None
+
+
+def _build_job(trace_dir: Path) -> Optional[TraceJob]:
+    trace_id = trace_dir.name
+    log_path = _pick_log_file(trace_dir)
+    if not log_path:
+        return None
+    html_path = trace_dir / "log.html"
+    return TraceJob(
+        trace_id=trace_id,
+        trace_dir=trace_dir,
+        log_path=log_path,
+        html_path=html_path,
+    )
+
+
+def _render_html(job: TraceJob) -> None:
+    from render_log_html import (
+        COLLAPSE_ALL_FOLDS,
+        COLLAPSE_KEYWORDS,
+        COLLAPSE_PREFIXES,
+        generate_html,
+    )
+
+    generate_html(
+        input_path=job.log_path,
+        output_path=job.html_path,
+        collapse_prefixes=COLLAPSE_PREFIXES,
+        collapse_keywords=COLLAPSE_KEYWORDS,
+        collapse_all=COLLAPSE_ALL_FOLDS,
+    )
+
+
+def _upload_html(job: TraceJob) -> str:
+    from utils.oss_upload import upload_html_to_oss
+
+    # object_key 由 upload_html_to_oss 内部用 prefix + task_id 拼接
+    return upload_html_to_oss(job.html_path, task_id=job.trace_id)
+
+
+def _update_web_html_url(trace_id: str, url: str) -> int:
+    from db import update_web_html_url
+
+    return update_web_html_url(trace_id=trace_id, web_html_url=url)
+
+
+def main() -> None:
+    _load_env()
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--output-dir",
+        default=None,
+        help="Output directory containing trace_id subdirectories. Default: examples/content_finder/output",
+    )
+    parser.add_argument(
+        "--apply",
+        action="store_true",
+        help="Actually generate HTML, upload to OSS, and update MySQL. Without this flag, dry-run only.",
+    )
+    parser.add_argument(
+        "--limit",
+        type=int,
+        default=0,
+        help="Process at most N trace dirs (0 means no limit).",
+    )
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
+
+    output_dir = _resolve_output_dir(args.output_dir)
+    trace_dirs = list(_iter_trace_dirs(output_dir))
+    jobs = [j for d in trace_dirs if (j := _build_job(d)) is not None]
+    jobs = sorted(jobs, key=lambda x: x.trace_dir.stat().st_mtime, reverse=True)
+    if args.limit and args.limit > 0:
+        jobs = jobs[: args.limit]
+
+    print(f"[output_dir] {output_dir}")
+    print(f"[trace_dirs] {len(trace_dirs)}")
+    print(f"[jobs] {len(jobs)}")
+
+    if not jobs:
+        return
+
+    if not args.apply:
+        print("[dry-run] Add --apply to generate+upload+update.")
+        for j in jobs:
+            print(f"- trace_id={j.trace_id} log={j.log_path.name} -> html={j.html_path.name}")
+        return
+
+    ok = 0
+    failed = 0
+    for job in jobs:
+        try:
+            _render_html(job)
+            url = _upload_html(job)
+            rows = _update_web_html_url(trace_id=job.trace_id, url=url)
+            print(f"[ok] trace_id={job.trace_id} url={url} rows={rows}")
+            ok += 1
+        except Exception as e:
+            print(f"[failed] trace_id={job.trace_id} err={e}")
+            logger.exception("job failed: %s", job.trace_id)
+            failed += 1
+
+    print(f"[done] ok={ok} failed={failed}")
+
+
+if __name__ == "__main__":
+    main()

+ 18 - 4
examples/content_finder/core.py

@@ -90,7 +90,7 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "毛泽东"
+DEFAULT_QUERY = "正能量"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -112,6 +112,7 @@ async def run_agent(
     query: Optional[str] = None,
     demand_id: Optional[int] = None,
     stream_output: bool = True,
+    log_assistant_text: bool = True,
 ) -> Dict[str, Any]:
     """
     执行 agent 任务
@@ -119,7 +120,8 @@ async def run_agent(
     Args:
         query: 查询内容(搜索词),None 则使用默认值
         demand_id: 本次搜索任务 id(int,关联 demand_content 表)
-        stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要)
+        stream_output: 是否输出到 stdout(run.py 需要,server.py 不需要)
+        log_assistant_text: 是否将 assistant 文本写入 log.txt(server 建议开启)
 
     Returns:
         {
@@ -235,10 +237,12 @@ async def run_agent(
                         }
                         break
 
-                elif isinstance(item, Message) and stream_output:
+                elif isinstance(item, Message):
                     text = extract_assistant_text(item)
-                    if text:
+                    if text and log_assistant_text:
                         log(f"[assistant] {text}")
+                    if text and stream_output:
+                        print(text)
 
             if run_result is None:
                 run_result = {
@@ -258,6 +262,16 @@ async def run_agent(
             with open(log_file_path, "w", encoding="utf-8") as f:
                 f.write(full_log)
 
+            try:
+                from render_log_html import render_log_html_and_upload
+
+                if trace_id:
+                    url = render_log_html_and_upload(trace_id=trace_id, log_file_path=log_file_path)
+                    if url:
+                        logger.info(f"log.html 已上传: trace_id={trace_id}, url={url}")
+            except Exception as e:
+                logger.warning(f"渲染/上传 log.html 失败: trace_id={trace_id}, err={e}")
+
         return run_result
 
     except KeyboardInterrupt:

+ 7 - 1
examples/content_finder/db/__init__.py

@@ -15,7 +15,12 @@ from .schedule import (
     update_task_status,
     update_task_on_complete,
 )
-from .store_results import upsert_good_authors, insert_contents, update_content_plan_ids
+from .store_results import (
+    upsert_good_authors,
+    insert_contents,
+    update_content_plan_ids,
+    update_web_html_url,
+)
 
 __all__ = [
     "get_connection",
@@ -28,4 +33,5 @@ __all__ = [
     "upsert_good_authors",
     "insert_contents",
     "update_content_plan_ids",
+    "update_web_html_url",
 ]

+ 28 - 0
examples/content_finder/db/store_results.py

@@ -198,3 +198,31 @@ def update_content_plan_ids(
         return rows
     finally:
         conn.close()
+
+
+def update_web_html_url(trace_id: str, web_html_url: str) -> int:
+    """
+    根据 trace_id 回写 demand_find_content_result.web_html_url。
+
+    约定:
+    - trace_id 为 output 子目录名
+    - web_html_url 为 OSS 公网 URL
+    - 同一 trace_id 可能对应多条内容,统一更新
+    """
+    t = (trace_id or "").strip()
+    url = (web_html_url or "").strip()
+    if not t or not url:
+        return 0
+
+    sql = """
+    UPDATE demand_find_content_result
+    SET web_html_url = %s
+    WHERE trace_id = %s
+    """
+    conn = get_connection()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(sql, (url, t))
+            return cur.rowcount
+    finally:
+        conn.close()

+ 51 - 0
examples/content_finder/render_log_html.py

@@ -8,6 +8,7 @@
 from __future__ import annotations
 
 import html
+import logging
 import os
 from dataclasses import dataclass, field
 from pathlib import Path
@@ -82,6 +83,8 @@ COLLAPSE_ALL_FOLDS = False
 COLLAPSE_PREFIXES = DEFAULT_COLLAPSE_PREFIXES
 COLLAPSE_KEYWORDS = DEFAULT_COLLAPSE_KEYWORDS
 
+logger = logging.getLogger(__name__)
+
 
 def resolve_config_path(path_str: str) -> Path:
     """解析配置中的路径,兼容从项目根目录或脚本目录运行。"""
@@ -361,6 +364,54 @@ def generate_html(
     output_path.write_text(html_content, encoding="utf-8")
 
 
+def render_log_html_and_upload(*, trace_id: str, log_file_path: Path) -> str | None:
+    """
+    将 log.txt 渲染为 HTML 并上传 OSS。
+
+    - 生成文件:与 log.txt 同目录的 log.html
+    - 上传:使用 utils/oss_upload.upload_html_to_oss
+
+    Returns:
+        上传成功返回公网 URL;失败返回 None(不抛出异常,便于上层不影响主流程)
+    """
+    tid = (trace_id or "").strip()
+    if not tid:
+        return None
+    if not log_file_path.exists():
+        return None
+
+    html_path = log_file_path.with_name("log.html")
+    try:
+        generate_html(
+            input_path=log_file_path,
+            output_path=html_path,
+            collapse_prefixes=COLLAPSE_PREFIXES,
+            collapse_keywords=COLLAPSE_KEYWORDS,
+            collapse_all=COLLAPSE_ALL_FOLDS,
+        )
+    except Exception as e:
+        logger.warning("render log.html failed: trace_id=%s err=%s", tid, e)
+        return None
+
+    try:
+        from utils.oss_upload import upload_html_to_oss
+
+        url = upload_html_to_oss(html_path, task_id=tid)
+
+        # 回写 MySQL:demand_find_content_result.web_html_url
+        try:
+            from db import update_web_html_url
+
+            update_web_html_url(trace_id=tid, web_html_url=url)
+        except Exception as e:
+            logger.warning("update web_html_url failed: trace_id=%s err=%s", tid, e)
+
+        return url
+    except Exception as e:
+        logger.warning("upload log.html failed: trace_id=%s err=%s", tid, e)
+        return None
+
+
 def main() -> None:
     input_base = resolve_config_path(INPUT_LOG_PATH)
     if input_base.is_file():

+ 1 - 1
examples/content_finder/server.py

@@ -122,7 +122,7 @@ async def execute_task(
 
         try:
             result = await core.run_agent(
-                query, demand_id=demand_id, stream_output=False
+                query, demand_id=demand_id, stream_output=False, log_assistant_text=True
             )
             duration = (datetime.now() - start_time).total_seconds()
 

+ 12 - 20
examples/content_finder/skills/content_filtering_strategy.md

@@ -5,11 +5,12 @@ description: 内容筛选方法论
 
 # 内容筛选方法论
 
-## 核心流程:基础筛选 -> 基于高赞case筛选 → 画像匹配 → 账号扩展 → 去重排序
+## 核心流程
+基础筛选 -> 基于高赞case筛选 → 画像匹配 → 去重排序
 
 ---
-
-## 阶段零:基础筛选
+## 筛选步骤
+### 阶段1:基础筛选
 
 在获取画像前先快速过滤,减少不必要的 API 调用。
 
@@ -24,15 +25,13 @@ description: 内容筛选方法论
 
 评估维度:digg_count(点赞)、comment_count(评论)、share_count(分享)
 
----
-
-## 阶段一:**基于高赞case筛选**
+### 阶段2:**基于高赞case筛选**
 
 使用 `demand_analysis` 的结果里的**筛选方案**做高赞case筛选,目的在于:
 - 先把“明显不满足筛选方案”的内容尽早淘汰,减少无效画像调用
 - 在每条内容的 `reason` 中给出“需求对齐的依据”(来自标题/描述可读信息 + goodcase 选题点)
 
-### 需要使用`demand_analysis` 的结果里的**筛选方案**的字段对每条内容进行以下评估
+#### 需要使用`demand_analysis` 的结果里的**筛选方案**的字段对每条内容进行以下评估
 > 若 `demand_analysis` 输出的筛选方案为空,请承认不确定性,不要编造匹配结论。
 1. 目的点对齐(必须项)
    - 按照**筛选方案.目的点对齐规则**执行
@@ -50,22 +49,19 @@ description: 内容筛选方法论
    - 命中:加分或作为排序
    - 不命中:不直接淘汰,但在 `reason` 中标注“不匹配/不确定”
 
-### 在输出 reason 中必须包含的要素
+#### 在输出 reason 中必须包含的要素
 
 对于进入后续画像阶段的候选,在其 `reason` 中至少写明:
 至少包含四项:命中的 `目的点` 状态;命中的 `灵感点` 状态;`关键点`(命中/部分/缺失)与缺失说明或不确定点;形式规则是命中还是不确定(如无法从标题/描述判断)
 
----
-
-## 阶段二:画像匹配筛选
+### 阶段3:画像匹配筛选
 
 **分批处理**:先处理前 10 条候选内容,筛选后 >= M 则停止,不足再继续下一批。  
 **并行限制**:每次最多并行调用 3 个画像工具。  
 **停止条件**:已获取画像数量 >= M × 1.5 时,立即停止,进入下一阶段。
 不要无限循环获取画像,避免陷入"一直获取画像"的状态
 
-
-### 画像获取优先级
+#### 画像获取优先级
 
 **优先级 1:内容点赞用户画像**
 - 调用 `get_content_fans_portrait(content_id=aweme_id)`
@@ -78,15 +74,13 @@ description: 内容筛选方法论
 **优先级 3:无画像**
 - 两者均无画像,仅基于热度和相关性评估,标注来源 `none`
 
-### 画像评估标准
+#### 画像评估标准
 
 - **占比(ratio)**:目标人群在总体中的占比
 - **偏好度(tgi)**:> 100 高于平均,= 100 平均,< 100 低于平均
 - 示例:"适合50岁以上老年人" → 年龄分布"50岁以上"占比 > 40% 且 tgi > 100 视为符合
 
----
-
-## 阶段三:去重与排序
+### 阶段4:去重与排序
 
 **去重**:按 aweme_id 去重,保留第一次出现的版本。
 
@@ -96,11 +90,9 @@ description: 内容筛选方法论
 3. 热度(点赞、评论、分享综合)
 4. 数据来源可靠性(content_like > account_fans > none)
 
----
-
 ## 关键原则
 **标准来自需求**:评估维度随需求变化,不固化。
-**分阶段筛选**:先快速过滤,再精细评估,提高效率。
+**分阶段筛选**:先基础筛选,再基于高赞case筛选,最后画像匹配,提高效率。
 **画像兜底策略**:优先内容画像,缺失时用账号画像,确保数据覆盖。
 **说明评估逻辑**:让用户理解为什么这条内容被推荐或排除。
 **承认不确定性**:数据不足以判断时,如实说明,而不是强行打分。

+ 127 - 0
examples/content_finder/utils/oss_upload.py

@@ -0,0 +1,127 @@
+import logging
+import os
+from pathlib import Path
+from typing import Optional
+try:
+    import alibabacloud_oss_v2 as oss
+except Exception:  # pragma: no cover - import guard for optional dependency
+    oss = None
+
+logger = logging.getLogger("oss.upload")
+
+
+class OssUploadError(RuntimeError):
+    pass
+
+
+def _get_env(*keys: str, required: bool = False) -> Optional[str]:
+    for key in keys:
+        value = os.getenv(key)
+        if value:
+            return value
+    if required:
+        raise OssUploadError(f"missing env: {keys[0]}")
+    return None
+
+
+def _build_object_key(
+    html_path: Path,
+    object_key: Optional[str],
+    prefix: Optional[str],
+    task_id: Optional[str],
+) -> str:
+    if object_key:
+        key = object_key
+    elif task_id:
+        key = f"{task_id}.html"
+    else:
+        key = html_path.name
+    if prefix:
+        prefix_clean = prefix.strip().strip("/")
+        if prefix_clean:
+            return f"{prefix_clean}/{key.lstrip('/')}"
+    return key.lstrip("/")
+
+
+def upload_html_to_oss(
+    html_path: Path,
+    *,
+    object_key: Optional[str] = None,
+    task_id: Optional[str] = None,
+) -> str:
+    """
+    Upload a generated HTML file to OSS and return a public URL.
+    """
+    if not html_path.exists():
+        raise OssUploadError(f"html not found: {html_path}")
+
+    if html_path.suffix.lower() != ".html":
+        raise OssUploadError(f"invalid html suffix: {html_path}")
+
+    if oss is None:
+        raise OssUploadError("alibabacloud-oss-v2 not installed")
+
+    access_key_id = _get_env("ALIYUN_OSS_ACCESS_KEY_ID")
+    access_key_secret = _get_env("ALIYUN_OSS_ACCESS_KEY_SECRET")
+
+    if not access_key_id:
+        raise OssUploadError("missing env: ALIYUN_OSS_ACCESS_KEY_ID")
+    if not access_key_secret:
+        raise OssUploadError("missing env: ALIYUN_OSS_ACCESS_KEY_SECRET")
+    bucket_name = _get_env("ALIYUN_OSS_BUCKET")
+    if not bucket_name:
+        raise OssUploadError("missing env: ALIYUN_OSS_BUCKET")
+    region = _get_env("ALIYUN_OSS_REGION")
+    if not region:
+        raise OssUploadError("missing env: ALIYUN_OSS_REGION")
+    prefix = _get_env("ALIYUN_OSS_PREFIX")
+    if not prefix:
+        raise OssUploadError("missing env: ALIYUN_OSS_PREFIX")
+    public_base_url = _get_env("ALIYUN_OSS_PUBLIC_BASE_URL")
+    if not public_base_url:
+        raise OssUploadError("missing env: ALIYUN_OSS_PUBLIC_BASE_URL")
+
+    key = _build_object_key(html_path, object_key, prefix, task_id)
+    logger.info(
+        "oss upload start",
+        extra={
+            "bucket": bucket_name,
+            "region": region,
+            "key": key,
+            "html_path": str(html_path),
+        },
+    )
+
+    credentials_provider = oss.credentials.StaticCredentialsProvider(
+        access_key_id=access_key_id,
+        access_key_secret=access_key_secret,
+    )
+    cfg = oss.config.load_default()
+    cfg.credentials_provider = credentials_provider
+    cfg.region = region
+    client = oss.Client(cfg)
+    request = oss.PutObjectRequest(bucket=bucket_name, key=key)
+    result = client.put_object_from_file(request, str(html_path))
+    if result.status_code >= 300:
+        logger.error(
+            "oss upload failed",
+            extra={
+                "bucket": bucket_name,
+                "region": region,
+                "key": key,
+                "status_code": result.status_code,
+            },
+        )
+        raise OssUploadError(f"oss upload failed: status={result.status_code}")
+    logger.info(
+        "oss upload success",
+        extra={
+            "bucket": bucket_name,
+            "region": region,
+            "key": key,
+            "status_code": result.status_code,
+        },
+    )
+
+    base = public_base_url.rstrip("/")
+    return f"{base}/{key}"

+ 4 - 1
requirements.txt

@@ -29,4 +29,7 @@ requests>=2.31.0
 aiohttp>=3.9.0
 
 # Image processing
-Pillow>=10.0.0
+Pillow>=10.0.0
+
+# Aliyun OSS (for HTML upload)
+alibabacloud-oss-v2