Jelajahi Sumber

Agent迭代

xueyiming 5 hari lalu
induk
melakukan
7e2ffd48d2

+ 13 - 11
examples/demand/changwen_prepare.py

@@ -82,12 +82,12 @@ def _build_score_by_videoid(cluster_name: str):
         ext_data = item.get("ext_data") or {}
         if not isinstance(ext_data, dict):
             continue
-        realplay = _safe_float(ext_data.get("推荐realplay"))
-        exposure = _safe_float(ext_data.get("推荐曝光数"))
+        realplay_uv = _safe_float(ext_data.get("分发realplay_uv"))
+        exposure_uv = _safe_float(ext_data.get("当日分发曝光uv"))
         norm_videoid = _normalize_post_id(videoid)
         if not norm_videoid:
             continue
-        score_map[norm_videoid] = (realplay / exposure) if exposure > 0 else 0.0
+        score_map[norm_videoid] = (realplay_uv / exposure_uv) if exposure_uv > 0 else 0.0
     return score_map
 
 
@@ -151,14 +151,14 @@ def changwen_data_prepare(cluster_name) -> int:
     if not video_ids:
         raise ValueError(f"未在文件中解析到有效 videoid: {json_path}")
 
-    execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name)
+    execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name, platform='changwen')
     return execution_id
 
 
-def prepare_by_json_score(execution_id: int, cluster_name: str = "奇观妙技有乾坤"):
+def prepare_by_json_score(execution_id: int, cluster_name: str = ""):
     """
     与 prepare.py 的输出结构保持一致,但分数来源改为:
-    score = 推荐realplay / 推荐曝光数
+    score = score = 分发realplay_uv / 当日分发曝光uv
     """
     session = db.get_session()
     try:
@@ -275,17 +275,19 @@ def prepare_by_json_score(execution_id: int, cluster_name: str = "奇观妙技
     finally:
         session.close()
 
+
 def changwen_prepare(cluster_name):
     get_changwen_weight(cluster_name)
     filter_low_exposure_records(cluster_name=cluster_name)
     execution_id = changwen_data_prepare(cluster_name)
-    print(f"execution_id={execution_id}")
-    print(prepare_by_json_score(execution_id, cluster_name))
-    return execution_id
+    if execution_id:
+        print(f"execution_id={execution_id}")
+        print(prepare_by_json_score(execution_id, cluster_name))
+        return execution_id
 
 
 if __name__ == "__main__":
-    cluster_name = '小阳看天下'
+    # 奇观妙技有乾坤 青史铁事漫谈
+    cluster_name = '青史铁事漫谈'
     execution_id = changwen_prepare(cluster_name=cluster_name)
     print(execution_id)
-

+ 1 - 1
examples/demand/config.py

@@ -10,7 +10,7 @@ from agent.core.runner import RunConfig
 
 RUN_CONFIG = RunConfig(
     # 模型配置
-    model="claude-opus-4-6",
+    model="claude-sonnet-4.5",
     temperature=0.3,
     max_iterations=1000,
 

+ 125 - 13
examples/demand/data_query_tools.py

@@ -5,6 +5,7 @@ import json
 from pathlib import Path
 
 from agent import tool
+from examples.demand.mysql import mysql_db
 
 
 def get_odps_data(sql):
@@ -79,17 +80,24 @@ def get_changwen_weight(account_name):
     bizdatemin = bizdatemin_date.strftime("%Y%m%d")
 
     sql_query = f'''
-    SELECT  公众号名
+SELECT 
+        公众号名
         ,videoid
         ,一级品类
         ,二级品类
         ,头部曝光
+        ,头部曝光uv
         ,头部realplay
+        ,头部realplay_uv
         ,头部分享
+        ,头部分享uv
         ,头部回流人数 AS 头部回流数
         ,推荐曝光数
+        ,当日分发曝光uv
         ,推荐realplay
+        ,分发realplay_uv
         ,推荐分享数
+        ,当日分发分享uv
         ,推荐回流数
         ,当日回流进入分发曝光次数 AS vov分子
 FROM    (
@@ -116,6 +124,12 @@ FROM    (
                     ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
                     ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
                     ,d.头部回流人数
+                    ,当日分发曝光uv
+                    ,头部曝光uv
+                    ,当日分发分享uv
+                    ,头部分享uv
+                    ,分发realplay_uv
+                    ,头部realplay_uv
             FROM    (
                         SELECT  account_name AS 公众号名
                                 ,videoid
@@ -124,9 +138,15 @@ FROM    (
                                 ,COUNT(
                                       CASE    WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
                                 ) AS 当日分发曝光pv
+                                ,COUNT(DISTINCT 
+                                      CASE    WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
+                                ) AS 当日分发曝光uv
                                 ,COUNT(
                                       CASE    WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
                                 ) AS 头部曝光pv
+                                ,COUNT(DISTINCT 
+                                      CASE    WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
+                                ) AS 头部曝光uv
                                 ,COUNT(
                                       CASE    WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
                                 ) AS 当日分发播放pv
@@ -136,9 +156,15 @@ FROM    (
                                 ,COUNT(
                                       CASE    WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
                                 ) AS 当日分发分享pv
+                                 ,COUNT(DISTINCT 
+                                      CASE    WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
+                                ) AS 当日分发分享uv
                                 ,COUNT(
                                       CASE    WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
                                 ) AS 头部分享pv
+                                 ,COUNT(DISTINCT 
+                                      CASE    WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
+                                ) AS 头部分享uv
                         FROM    (
                                     SELECT  DISTINCT a.mid
                                             ,a.videoid
@@ -238,12 +264,16 @@ FROM    (
                                           CASE    WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
                                     ) AS 分发realplay_pv
                                     ,COUNT(CASE    WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
+                                     ,COUNT(DISTINCT 
+                                          CASE    WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
+                                    ) AS 分发realplay_uv
+                                    ,COUNT(DISTINCT CASE    WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
                             FROM    loghubods.ods_video_play_log_day a
                             LEFT JOIN   (
                                             SELECT  DISTINCT open_id
                                                     ,union_id
                                             FROM    loghubods.user_wechat_identity_info_ha
-                                            WHERE      dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
+                                            WHERE   dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
                                         ) b
                             ON      a.mid = CONCAT('weixin_openid_',b.open_id)
                             LEFT JOIN loghubods.gzh_fans_info d
@@ -251,7 +281,7 @@ FROM    (
                             AND     d.dt = MAX_PT("loghubods.gzh_fans_info")
                             WHERE   a.dt >= '{bizdatemin}'
                             AND     a.dt <= '{bizdatemax}'
-                            AND     a.businesstype = 'videoRealPlay' 
+                            AND     a.businesstype = 'videoRealPlay'
                             AND     d.user_create_time IS NOT NULL
                             AND     d.account_name = '{account_name}'
                             GROUP BY d.account_name
@@ -315,14 +345,20 @@ ORDER BY 推荐曝光数 DESC
                     "二级品类": r[3],
                     "ext_data": {
                         "头部曝光": r[4],
-                        "头部realplay": r[5],
-                        "头部分享": r[6],
-                        "头部回流数": r[7],
-                        "推荐曝光数": r[8],
-                        "推荐realplay": r[9],
-                        "推荐分享数": r[10],
-                        "推荐回流数": r[11],
-                        "vov分子": r[12],
+                        "头部曝光uv": r[5],
+                        "头部realplay": r[6],
+                        "头部realplay_uv": r[7],
+                        "头部分享": r[8],
+                        "头部分享uv": r[9],
+                        "头部回流数": r[10],
+                        "推荐曝光数": r[11],
+                        "当日分发曝光uv": r[12],
+                        "推荐realplay": r[13],
+                        "分发realplay_uv": r[14],
+                        "推荐分享数": r[15],
+                        "当日分发分享uv": r[16],
+                        "推荐回流数": r[17],
+                        "vov分子": r[18],
                     },
                 }
             )
@@ -337,6 +373,82 @@ ORDER BY 推荐曝光数 DESC
     return result_list
 
 
+def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
+    result = {}
+    if not video_ids:
+        return result
+
+    normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
+    for i in range(0, len(normalized_ids), batch_size):
+        batch_ids = normalized_ids[i:i + batch_size]
+        escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
+        video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
+        sql_query = f'''
+            SELECT videoid, merge_leve2
+            FROM loghubods.video_merge_tag
+            WHERE videoid IN ({video_ids_in_clause})
+        '''
+        data = get_odps_data(sql_query)
+        if not data:
+            continue
+
+        for row in data:
+            result[str(row[0])] = row[1]
+
+    return result
+
+
+def get_all_decode_task_result_rows():
+    return mysql_db.select(
+        "workflow_decode_task_result",
+        columns="id, channel_content_id, merge_leve2",
+    )
+
+
+def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
+    return mysql_db.update(
+        "workflow_decode_task_result",
+        {"merge_leve2": str(merge_leve2)},
+        "channel_content_id = %s",
+        (str(channel_content_id),),
+    )
+
+
+def backfill_merge_leve2_for_decode_task_result():
+    rows = get_all_decode_task_result_rows()
+    updated_count = 0
+    skipped_count = 0
+    valid_content_ids = []
+
+    for row in rows:
+        channel_content_id = row.get("channel_content_id")
+        if channel_content_id is None:
+            skipped_count += 1
+            continue
+
+        channel_content_id = str(channel_content_id)
+        if len(channel_content_id) > 8:
+            skipped_count += 1
+            continue
+
+        valid_content_ids.append(channel_content_id)
+
+    merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
+
+    for channel_content_id in valid_content_ids:
+        merge_leve2 = merge_leve2_map.get(channel_content_id)
+        if not merge_leve2:
+            continue
+        affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
+        if affected > 0:
+            updated_count += affected
+
+    return {
+        "total": len(rows),
+        "updated": updated_count,
+        "skipped": skipped_count,
+    }
+
+
 if __name__ == '__main__':
-    result_list = get_changwen_weight('青史铁事漫谈')
-    print(result_list)
+    backfill_merge_leve2_for_decode_task_result()

+ 1 - 1
examples/demand/demand.md

@@ -1,5 +1,5 @@
 ---
-model: anthropic/claude-opus-4-6
+model: anthropic/claude-sonnet-4.5
 temperature: 0.5
 max_iterations: 200
 ---

+ 3 - 3
examples/demand/demand_agent_context.py

@@ -34,7 +34,7 @@ class TopicBuildAgentContext:
     @classmethod
     def set_execution_id(cls, execution_id: int):
         _execution_id_var.set(execution_id)
-        from log_capture import log
+        from examples.demand.log_capture import log
         log(f"[TopicBuildAgentContext] 设置 execution_id = {execution_id}")
 
     @classmethod
@@ -44,7 +44,7 @@ class TopicBuildAgentContext:
     @classmethod
     def set_topic_build_id(cls, topic_build_id: int):
         _topic_build_id_var.set(topic_build_id)
-        from log_capture import log
+        from examples.demand.log_capture import log
         log(f"[TopicBuildAgentContext] 设置 topic_build_id = {topic_build_id}")
 
     @classmethod
@@ -89,7 +89,7 @@ class TopicBuildAgentContext:
 
     @classmethod
     def clear(cls):
-        from log_capture import log
+        from examples.demand.log_capture import log
         log(f"[TopicBuildAgentContext] 清除上下文")
         _execution_id_var.set(None)
         _topic_build_id_var.set(None)

+ 10 - 3
examples/demand/demand_build_agent_tools.py

@@ -6,29 +6,33 @@ from agent import tool
 from examples.demand.demand_agent_context import TopicBuildAgentContext
 from examples.demand.demand_pattern_tools import _log_tool_output, _log_tool_input
 
+
 def _get_result_base_dir() -> Path:
     """输出到“当前工作目录/result/”下。"""
     return Path.cwd() / "result"
 
 
 @tool(
-    "存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
+    "存储需求到结果集。 - element_names - score(权重) - reason(原因)- desc(需求描述)"
 )
 def create_demand_item(
         element_names: List[str] = None,
+        score: float = 0.0,
         reason: str = None,
         desc: str = None) -> str:
     """
     每次调用向“execution_id 对应的本地 JSON 文件”追加一条记录。
 
-    写入对象仅包含三个字段:
+    写入对象包含以下字段:
       - element_names
+      - score(权重)
       - reason(原因)
       - desc(需求描述)
     """
     execution_id: Optional[int] = TopicBuildAgentContext.get_execution_id()
     params: Dict[str, Any] = {
         "execution_id": execution_id,
+        "score": score,
         "element_names": element_names,
         "reason": reason,
         "desc": desc,
@@ -40,6 +44,7 @@ def create_demand_item(
 
     record: Dict[str, Any] = {
         "element_names": element_names,
+        "score": score,
         "reason": reason,
         "desc": desc,
     }
@@ -79,7 +84,7 @@ def create_demand_item(
 
 
 @tool(
-    "批量存储需求到结果集。 - element_names - reason(原因)- desc(需求描述)"
+    "批量存储需求到结果集。 - element_names - score(权重) - reason(原因)- desc(需求描述)"
 )
 def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
     """
@@ -87,6 +92,7 @@ def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
 
     每条记录字段:
       - element_names
+      - score(权重)
       - reason(原因)
       - desc(需求描述)
     """
@@ -125,6 +131,7 @@ def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
             return _log_tool_output("create_demand_items", f"错误: demand_items[{i}] 必须为对象(dict)")
         record = {
             "element_names": di.get("element_names"),
+            "score": di.get("score", 0.0),
             "reason": di.get("reason"),
             "desc": di.get("desc"),
         }

+ 1 - 1
examples/demand/demand_pattern_tools.py

@@ -24,7 +24,7 @@ from typing import Any
 
 from agent import tool
 from examples.demand.demand_agent_context import TopicBuildAgentContext
-from log_capture import log
+from examples.demand.log_capture import log
 import pattern_service
 
 

+ 18 - 13
examples/demand/mysql/db_config.py

@@ -38,12 +38,13 @@ class DatabaseConfig:
                         env_path = path
                         break
 
+                # 兜底:若 .env 不存在,继续使用运行时环境变量,不在此处直接抛错
                 if not env_path:
-                    searched_paths = '\n'.join(possible_paths)
-                    raise FileNotFoundError(f"配置文件 {self.env_file} 不存在,已搜索以下路径:\n{searched_paths}")
+                    env_path = ""
 
             # 加载环境变量
-            load_dotenv(env_path)
+            if env_path:
+                load_dotenv(env_path)
 
         # 优先读取 DB_INFO(JSON 字符串)
         db_info_str = os.getenv('DB_INFO')
@@ -54,13 +55,13 @@ class DatabaseConfig:
             except json.JSONDecodeError as e:
                 raise ValueError(f"DB_INFO配置格式错误: {e}")
         else:
-            # 兼容分项环境变量(DB_HOST/DB_PORT/DB_USER/DB_PASSWORD/DB_NAME/DB_CHARSET
-            host = self._clean_env_value(os.getenv("DB_HOST"))
-            port_raw = self._clean_env_value(os.getenv("DB_PORT")) or "3306"
-            user = self._clean_env_value(os.getenv("DB_USER"))
-            passwd = self._clean_env_value(os.getenv("DB_PASSWORD"))
-            database = self._clean_env_value(os.getenv("DB_NAME"))
-            charset = self._clean_env_value(os.getenv("DB_CHARSET")) or "utf8mb4"
+            # 兼容分项环境变量(DB_*)和常见别名(MYSQL_*
+            host = self._clean_env_value(os.getenv("DB_HOST") or os.getenv("MYSQL_HOST"))
+            port_raw = self._clean_env_value(os.getenv("DB_PORT") or os.getenv("MYSQL_PORT")) or "3306"
+            user = self._clean_env_value(os.getenv("DB_USER") or os.getenv("MYSQL_USER"))
+            passwd = self._clean_env_value(os.getenv("DB_PASSWORD") or os.getenv("MYSQL_PASSWORD"))
+            database = self._clean_env_value(os.getenv("DB_NAME") or os.getenv("MYSQL_DATABASE"))
+            charset = self._clean_env_value(os.getenv("DB_CHARSET") or os.getenv("MYSQL_CHARSET")) or "utf8mb4"
 
             missing = [k for k, v in {
                 "DB_HOST": host,
@@ -70,7 +71,10 @@ class DatabaseConfig:
             }.items() if not v]
             if missing:
                 raise ValueError(
-                    "未找到DB_INFO配置,且分项配置不完整,缺少: " + ", ".join(missing)
+                    "未找到 DB_INFO,且数据库环境变量不完整,缺少: "
+                    + ", ".join(missing)
+                    + "。请在 --env-file 中配置 DB_HOST/DB_USER/DB_PASSWORD/DB_NAME"
+                    + "(或 MYSQL_HOST/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE)。"
                 )
 
             try:
@@ -105,8 +109,9 @@ class DatabaseConfig:
         if os.getenv("DB_INFO"):
             return True
 
-        required_keys = ("DB_HOST", "DB_USER", "DB_PASSWORD", "DB_NAME")
-        return all(os.getenv(k) for k in required_keys)
+        db_keys = ("DB_HOST", "DB_USER", "DB_PASSWORD", "DB_NAME")
+        mysql_keys = ("MYSQL_HOST", "MYSQL_USER", "MYSQL_PASSWORD", "MYSQL_DATABASE")
+        return all(os.getenv(k) for k in db_keys) or all(os.getenv(k) for k in mysql_keys)
 
     @staticmethod
     def _clean_env_value(value: str) -> str:

+ 1 - 1
examples/demand/pattern_builds/db_manager.py

@@ -9,7 +9,6 @@ class DatabaseManager1:
     def __init__(self):
         connection_string = (
             f"mysql+pymysql://wx2016_longvideo:wx2016_longvideoP%40assword1234@rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com:3306/open_aigc?charset=utf8mb4"
-            # f"mysql+pymysql://root:aigc_admin@127.0.0.1:3306/open_aigc_pattern?charset=utf8mb4"
         )
         self.engine = create_engine(connection_string, pool_pre_ping=True, pool_recycle=3600)
         self.SessionLocal = sessionmaker(bind=self.engine, autoflush=False, autocommit=False)
@@ -33,3 +32,4 @@ class DatabaseManager2:
     def get_session(self) -> Session:
         """获取数据库会话"""
         return self.SessionLocal()
+

+ 161 - 9
examples/demand/pattern_builds/pattern_service.py

@@ -5,7 +5,7 @@ import traceback
 from datetime import datetime
 from typing import List
 
-from sqlalchemy import insert, select, bindparam
+from sqlalchemy import insert, select, bindparam, text
 
 from .db_manager import DatabaseManager2
 from .models2 import (
@@ -314,6 +314,154 @@ def _store_category_tree_snapshot(session, execution_id: int, categories: list,
     return path_to_cat
 
 
+def get_merge_level2_crawler_data(merge_level2) -> dict:
+    """用 merge_level2 匹配 crawler_execution.name,取最新成功记录的 id 作为 execution_id,再查 global_* 三表。"""
+    session = db.get_session()
+
+    empty = {
+        "success": False,
+        "message": "",
+        "post_count": 0,
+        "data": {},
+        "post_metadata": {},
+        "categories": [],
+        "elements": [],
+    }
+
+    try:
+        if merge_level2 is None or (isinstance(merge_level2, str) and not merge_level2.strip()):
+            empty["message"] = "merge_level2 为空"
+            return empty
+
+        exec_row = session.execute(
+            text("""
+                SELECT id
+                FROM crawler_execution
+                WHERE name LIKE :name AND status = 2
+                ORDER BY create_time DESC
+                LIMIT 1
+            """),
+            {"name": f"{merge_level2}%"},
+        ).mappings().first()
+
+        if not exec_row:
+            empty["message"] = (
+                f"未找到 crawler_execution: name={merge_level2!r} 且 status=2(按 create_time 最新)"
+            )
+            return empty
+
+        execution_id = exec_row["id"]
+
+        category_rows = session.execute(
+            text("""
+                SELECT
+                    id,
+                    execution_id,
+                    name,
+                    description,
+                    parent_id,
+                    source_type,
+                    category_nature,
+                    level,
+                    path
+                FROM global_category
+                WHERE execution_id = :execution_id
+                ORDER BY id
+            """),
+            {"execution_id": execution_id}
+        ).mappings().all()
+
+        element_rows = session.execute(
+            text("""
+                SELECT
+                    id,
+                    execution_id,
+                    name,
+                    description,
+                    category_id,
+                    source_type,
+                    element_sub_type,
+                    occurrence_count
+                FROM global_element
+                WHERE execution_id = :execution_id
+                ORDER BY id
+            """),
+            {"execution_id": execution_id}
+        ).mappings().all()
+
+        post_rows = session.execute(
+            text("""
+                SELECT
+                    id,
+                    execution_id,
+                    post_id,
+                    point_data
+                FROM global_post
+                WHERE execution_id = :execution_id
+                ORDER BY id
+            """),
+            {"execution_id": execution_id}
+        ).mappings().all()
+
+        categories = []
+        for c in category_rows:
+            categories.append({
+                "stable_id": c["id"],
+                "name": c["name"],
+                "description": c["description"] or "",
+                "category_nature": c["category_nature"],
+                "source_type": c["source_type"],
+                "path": c["path"],
+                "level": c["level"],
+                "parent_stable_id": c["parent_id"],
+            })
+
+        elements = []
+        for ge in element_rows:
+            elements.append({
+                "id": ge["id"],
+                "name": ge["name"],
+                "description": ge["description"] or "",
+                "element_type": ge["source_type"],
+                "element_sub_type": ge["element_sub_type"],
+                "belong_category_stable_id": ge["category_id"],
+                "occurrence_count": ge["occurrence_count"] or 1,
+            })
+
+        result = {}
+        post_obj_map = {}
+        for post in post_rows:
+            post_id = post["post_id"]
+            point_data = post["point_data"]
+            if isinstance(point_data, str):
+                try:
+                    point_data = json.loads(point_data)
+                except json.JSONDecodeError:
+                    point_data = {}
+
+            post_data = point_data if isinstance(point_data, dict) else {}
+            result[post_id] = post_data
+            post_obj_map[post_id] = post
+
+        return {
+            "success": True,
+            "post_count": len(result),
+            "data": result,
+            "post_metadata": {
+                post_id: {
+                    "account_name": '',
+                    "merge_leve2": '',
+                    "platform": '',
+                }
+                for post_id, post in post_obj_map.items()
+            },
+            "categories": categories,
+            "elements": elements,
+        }
+    finally:
+        session.close()
+
+
 def run_mining(
         post_ids: List[str] = None,
         cluster_name: str = None,
@@ -324,7 +472,7 @@ def run_mining(
         mining_configs: list = None,
         min_absolute_support: int = 3,
         classify_execution_id: int = None,
-) -> int:
+):
     """执行一次 pattern 挖掘,返回 execution_id
 
     Args:
@@ -360,13 +508,17 @@ def run_mining(
         # 2. 获取源数据(含分类树+元素快照)
         t0 = time.time()
         print(f"[Execution {execution_id}] 正在获取数据...")
-        result = export_post_elements(
-            post_ids,
-            merge_leve2=merge_leve2,
-            platform=platform,
-            account_name=account_name,
-            post_limit=post_limit,
-        )
+        result = None
+        if platform == 'changwen':
+            result = export_post_elements(
+                post_ids=post_ids,
+                post_limit=post_limit
+            )
+        elif platform == 'piaoquan':
+            result = get_merge_level2_crawler_data(cluster_name)
+
+        if result is None:
+            return None
         source_data = result['data']
         post_count = result['post_count']
         print(f"[Execution {execution_id}] 获取到 {post_count} 个帖子, 耗时 {time.time() - t0:.2f}s")

+ 9 - 5
examples/demand/piaoquan_prepare.py

@@ -163,12 +163,16 @@ def prepare(execution_id):
         session.close()
 
 def piaoquan_prepare(cluster_name):
-    execution_id = run_mining(cluster_name=cluster_name, merge_leve2=cluster_name)
-    prepare(execution_id)
-    return execution_id
+    execution_id = run_mining(cluster_name=cluster_name, merge_leve2=cluster_name, platform='piaoquan')
+    if execution_id:
+        prepare(execution_id)
+        return execution_id
 
 if __name__ == '__main__':
     cluster_name = '历史名人'
-    execution_id = piaoquan_prepare(cluster_name=cluster_name)
-    print(execution_id)
+
+    execution_id = run_mining(cluster_name=cluster_name, merge_leve2=cluster_name)
+
+    # execution_id = piaoquan_prepare(cluster_name=cluster_name)
+    # print(execution_id)
 

+ 7 - 6
examples/demand/run.py

@@ -1,5 +1,5 @@
 """demand 示例的最小可运行入口。"""
-
+import asyncio
 import copy
 import importlib
 import json
@@ -172,9 +172,9 @@ def _avg_score_for_joined_name(name: str, score_map: dict) -> float:
 
 
 def _create_demand_task(
-    execution_id: int,
-    name: Optional[str] = None,
-    platform: Optional[str] = None,
+        execution_id: int,
+        name: Optional[str] = None,
+        platform: Optional[str] = None,
 ) -> Optional[int]:
     """创建 demand_task 记录,返回任务ID。"""
     try:
@@ -269,7 +269,9 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
         if not name:
             continue
 
-        score = _avg_score_for_joined_name(name, score_map)
+        score = di.get("score")
+        if score is None or score == 0.0:
+            score = _avg_score_for_joined_name(name, score_map)
         reason = di.get("reason")
         desc_value = di.get("desc")
         ext_data = {"reason": reason, "desc": desc_value}
@@ -297,7 +299,6 @@ async def run_once(execution_id, merge_level2, task_id: Optional[int] = None) ->
     task_status = 0
 
     TopicBuildAgentContext.set_execution_id(execution_id)
-    prepare(execution_id)
 
     base_dir = Path(__file__).parent
     output_dir = base_dir / "output"

+ 12 - 2
examples/demand/web_api.py

@@ -16,7 +16,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 from examples.demand.changwen_prepare import changwen_prepare
 from examples.demand.mysql import mysql_db
 from examples.demand.piaoquan_prepare import piaoquan_prepare
-from examples.demand.run import _create_demand_task, main
+from examples.demand.run import _create_demand_task, main as run_demand
 
 app = FastAPI(title="demand web api")
 
@@ -48,7 +48,7 @@ async def demand_start(req: DemandStartRequest):
 
     async def _job():
         # run_once 内部会在 finally 里把 task 状态写回 MySQL。
-        await main(
+        await run_demand(
             req.cluster_name,
             req.platform_type,
             execution_id=execution_id,
@@ -147,3 +147,13 @@ def demand_tasks(
     # 返回分页结构(data + pagination),便于前端直接展示
     return data
 
+
+def run_server():
+    import uvicorn
+
+    uvicorn.run(app, host="0.0.0.0", port=7000)
+
+
+if __name__ == "__main__":
+    run_server()
+