Ver Fonte

增加同期需求

xueyiming há 1 semana atrás
pai
commit
05ea2c3b04

+ 82 - 0
examples/demand/data_query_tools.py

@@ -33,6 +33,59 @@ def get_odps_data(sql):
         return None
         return None
 
 
 
 
+def execute_odps_sql(sql) -> bool:
+    # 配置信息
+    access_id = 'LTAI9EBa0bd5PrDa'
+    access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
+    project = 'loghubods'
+    endpoint = 'http://service.odps.aliyun.com/api'
+
+    o = ODPS(access_id, access_key, project, endpoint=endpoint)
+    try:
+        instance = o.execute_sql(sql)
+        instance.wait_for_success()
+        return True
+    except ODPSError as e:
+        print(f"ODPS 错误: {e}")
+        return False
+
+
+def write_feature_point_data_to_hive(names: list[str]) -> int:
+    """
+    将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。
+    仅写入以下字段:
+    - 特征点
+    - 总分发曝光pv(固定 5000)
+    - 质bn_rovn(固定 0.1)
+    """
+    normalized_names = [str(name).strip() for name in names if name is not None and str(name).strip()]
+    if not normalized_names:
+        return 0
+
+    dt = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
+    select_parts = []
+    for name in normalized_names:
+        safe_name = name.replace("'", "''")
+        select_parts.append(
+            "SELECT "
+            f"'{safe_name}' AS `特征点`, "
+            "5000 AS `总分发曝光pv`, "
+            "0.1 AS `质bn_rovn`"
+        )
+
+    union_sql = "\nUNION ALL\n".join(select_parts)
+    insert_sql = f"""
+INSERT INTO TABLE feature_point_data
+PARTITION (dt='{dt}')
+(`特征点`, `总分发曝光pv`, `质bn_rovn`)
+{union_sql}
+"""
+    ok = execute_odps_sql(insert_sql)
+    if not ok:
+        return 0
+    return len(normalized_names)
+
+
 def get_demand_merge_level2_names():
 def get_demand_merge_level2_names():
     date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
     date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
     day = date_time.strftime("%Y%m%d")
     day = date_time.strftime("%Y%m%d")
@@ -157,6 +210,35 @@ GROUP BY
     return result_dict
     return result_dict
 
 
 
 
+def get_rov_by_tree_and_video_ids(video_ids):
+    video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
+    last_year_today = date.today() - timedelta(days=365)
+    start_date = last_year_today.strftime("%Y%m%d")
+    end_date = (last_year_today + timedelta(days=7)).strftime("%Y%m%d")
+    sql_query = f'''
+SELECT
+    CAST(t3.视频id AS STRING) AS 视频id_str,
+    CASE
+        WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
+        ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
+    END AS avg_rov_t0
+FROM
+loghubods.video_dimension_detail_add_column t3
+
+WHERE t3.视频id in ({video_ids_in_clause})
+AND t3.dt >= '{start_date}'
+AND t3.dt <= '{end_date}'
+GROUP BY
+    t3.视频id
+;
+    '''
+    data = get_odps_data(sql_query)
+    result_dict = {}
+    if data:
+        result_dict = {r[0]: r[1] for r in data}
+    return result_dict
+
+
 def get_changwen_weight(account_name):
 def get_changwen_weight(account_name):
     bizdatemax_date = date.today() - timedelta(days=1)
     bizdatemax_date = date.today() - timedelta(days=1)
     bizdatemin_date = bizdatemax_date - timedelta(days=30)
     bizdatemin_date = bizdatemax_date - timedelta(days=30)

+ 2 - 2
examples/demand/demand_build_agent_tools.py

@@ -154,11 +154,11 @@ def create_demand_items(demand_items: List[Dict[str, Any]] = None) -> str:
 
 
 
 
 @tool(
 @tool(
-    "写入本次执行总结(在所有分类完成后调用)。"
+    "写入本次执行总结(在所有任务执行完成后调用)。"
     "\n\n该工具用于把最终总结记录到本地/trace输出中(框架侧通过返回值与日志落盘)。"
     "\n\n该工具用于把最终总结记录到本地/trace输出中(框架侧通过返回值与日志落盘)。"
 )
 )
 def write_execution_summary(summary: str) -> str:
 def write_execution_summary(summary: str) -> str:
-    """写入本次执行总结。在所有分类完成后调用。
+    """写入本次执行总结。在所有任务执行完成后调用。
 
 
     Args:
     Args:
         summary: 执行总结(Markdown 格式)。
         summary: 执行总结(Markdown 格式)。

+ 6 - 4
examples/demand/piaoquan_prepare.py

@@ -2,7 +2,7 @@ import json
 from collections import defaultdict
 from collections import defaultdict
 from pathlib import Path
 from pathlib import Path
 
 
-from examples.demand.data_query_tools import get_rov_by_merge_leve2_and_video_ids
+from examples.demand.data_query_tools import get_rov_by_merge_leve2_and_video_ids, get_rov_by_tree_and_video_ids
 from examples.demand.db_manager import DatabaseManager
 from examples.demand.db_manager import DatabaseManager
 from examples.demand.models import TopicPatternElement, TopicPatternExecution
 from examples.demand.models import TopicPatternElement, TopicPatternExecution
 from examples.demand.pattern_builds.pattern_service import run_mining
 from examples.demand.pattern_builds.pattern_service import run_mining
@@ -67,8 +67,10 @@ def prepare(execution_id):
 
 
         # 1) 去重 post_id 拉取 ROV
         # 1) 去重 post_id 拉取 ROV
         all_post_ids = sorted({r.post_id for r in rows if r.post_id})
         all_post_ids = sorted({r.post_id for r in rows if r.post_id})
-        rov_by_post_id = get_rov_by_merge_leve2_and_video_ids(merge_leve2, all_post_ids) if all_post_ids else {}
-
+        if merge_leve2 == '全局树':
+            rov_by_post_id = get_rov_by_tree_and_video_ids(all_post_ids) if all_post_ids else {}
+        else:
+            rov_by_post_id = get_rov_by_merge_leve2_and_video_ids(merge_leve2, all_post_ids) if all_post_ids else {}
         # 2) 按 element_type 分组,计算 name 的平均 ROV 分
         # 2) 按 element_type 分组,计算 name 的平均 ROV 分
         grouped = {
         grouped = {
             "实质": {
             "实质": {
@@ -172,7 +174,7 @@ if __name__ == '__main__':
     # cluster_name = '贪污腐败'
     # cluster_name = '贪污腐败'
     #
     #
     # execution_id = run_mining(cluster_name=cluster_name, merge_leve2=cluster_name)
     # execution_id = run_mining(cluster_name=cluster_name, merge_leve2=cluster_name)
-    prepare(8)
+    prepare(270)
     # execution_id = piaoquan_prepare(cluster_name=cluster_name)
     # execution_id = piaoquan_prepare(cluster_name=cluster_name)
     # print(execution_id)
     # print(execution_id)
 
 

+ 1 - 1
examples/demand/render_log_html.py

@@ -73,7 +73,7 @@ TOOL_DESCRIPTION_MAP: dict[str, str] = {
 # =========================
 # =========================
 # 运行配置(直接改变量即可)
 # 运行配置(直接改变量即可)
 # =========================
 # =========================
-INPUT_LOG_PATH = "examples/demand/output/12/run_log_12_20260327_205003.txt"
+INPUT_LOG_PATH = "examples/demand/output/229/run_log_229_20260423_142547.txt"
 # 设为 None 则默认生成到输入文件同名 .html
 # 设为 None 则默认生成到输入文件同名 .html
 OUTPUT_HTML_PATH: str | None = None
 OUTPUT_HTML_PATH: str | None = None
 # 是否默认折叠所有 [FOLD] 块
 # 是否默认折叠所有 [FOLD] 块

+ 68 - 5
examples/demand/run.py

@@ -325,6 +325,66 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
     return len(rows)
     return len(rows)
 
 
 
 
+def write_global_tree_demand_items_to_hive(execution_id: int, merge_level2: str) -> int:
+    """
+    把 result/{execution_id}/execution_id_{execution_id}_demand_items.json
+    写入 Hive 表 feature_point_data(仅全局树场景)。
+    """
+    demand_items_path = (
+            Path.cwd()
+            / "result"
+            / str(execution_id)
+            / f"execution_id_{execution_id}_demand_items.json"
+    )
+    if not demand_items_path.exists():
+        alt_path = (
+                Path(__file__).parent
+                / "result"
+                / str(execution_id)
+                / f"execution_id_{execution_id}_demand_items.json"
+        )
+        if alt_path.exists():
+            demand_items_path = alt_path
+        else:
+            log(f"[hive] 未找到需求 JSON:{demand_items_path}(也未找到 {alt_path}),跳过写入")
+            return 0
+
+    try:
+        with open(demand_items_path, "r", encoding="utf-8") as f:
+            loaded = json.load(f)
+    except Exception as e:
+        log(f"[hive] 读取需求 JSON 失败:{demand_items_path},error={e}")
+        return 0
+
+    items = loaded["items"] if isinstance(loaded, dict) and isinstance(loaded.get("items"), list) else loaded
+    if not isinstance(items, list):
+        log(f"[hive] 需求 JSON 非数组,跳过写入:type={type(items)}")
+        return 0
+
+    names: list[str] = []
+    for di in items:
+        if not isinstance(di, dict):
+            continue
+        name = _join_element_names_to_name(di.get("element_names"))
+        if name:
+            names.append(name)
+
+    if not names:
+        log("[hive] 无有效 name,跳过写入")
+        return 0
+
+    try:
+        # 按用户要求:Hive 写入逻辑放在 data_query_tools.py 中。
+        from examples.demand.data_query_tools import write_feature_point_data_to_hive
+
+        written = write_feature_point_data_to_hive(names=names)
+        log(f"[hive] 写入 feature_point_data 完成,rows={written}")
+        return written
+    except Exception as e:
+        log(f"[hive] 写入 feature_point_data 异常:{e}")
+        return 0
+
+
 async def run_once(execution_id, merge_level2, count: int = 30, task_id: Optional[int] = None) -> str:
 async def run_once(execution_id, merge_level2, count: int = 30, task_id: Optional[int] = None) -> str:
     task_log_text = ""
     task_log_text = ""
     task_status = 0
     task_status = 0
@@ -397,12 +457,15 @@ async def run_once(execution_id, merge_level2, count: int = 30, task_id: Optiona
 
 
             log(f"[cost] total_tokens={total_tokens}, total_cost=${total_cost:.6f}")
             log(f"[cost] total_tokens={total_tokens}, total_cost=${total_cost:.6f}")
 
 
-            # agent 执行完成后:把本地 result JSON 写入 MySQL 表 demand_content
-            # element_names -> name(逗号分隔);reason -> demand_content.reason;desc -> demand_content.suggestion;dt -> demand_content.dt
+            # agent 执行完成后:全局树写 Hive,其他写 MySQL
             try:
             try:
-                write_demand_items_to_mysql(execution_id=execution_id, merge_level2=merge_level2)
+                if str(merge_level2).strip() == "全局树":
+                    write_global_tree_demand_items_to_hive(execution_id=execution_id, merge_level2=merge_level2)
+                else:
+                    # element_names -> name(逗号分隔);reason -> demand_content.reason;desc -> demand_content.suggestion;dt -> demand_content.dt
+                    write_demand_items_to_mysql(execution_id=execution_id, merge_level2=merge_level2)
             except Exception as e:
             except Exception as e:
-                log(f"[mysql] 写入 demand_content 异常:{e}")
+                log(f"[result-write] 写入结果异常:{e}")
 
 
             task_log_text = log_buffer.getvalue()
             task_log_text = log_buffer.getvalue()
             task_status = 1
             task_status = 1
@@ -456,5 +519,5 @@ async def main(
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-    asyncio.run(main('贪污腐败', 'piaoquan'))
+    asyncio.run(main('全局树', 'piaoquan', 50))
     # write_demand_items_to_mysql(execution_id=8, merge_level2='贪污腐败')
     # write_demand_items_to_mysql(execution_id=8, merge_level2='贪污腐败')

+ 19 - 2
examples/demand/web_api.py

@@ -27,6 +27,12 @@ from examples.demand.run import _create_demand_task, main as run_demand
 
 
 app = FastAPI(title="demand web api")
 app = FastAPI(title="demand web api")
 
 
+GLOBAL_TREE_SCHEDULE_ITEM = {
+    "cluster_name": "全局树",
+    "platform_type": "piaoquan",
+    "count": 50,
+}
+
 # APScheduler:使用动态导入避免环境未安装时直接导入失败
 # APScheduler:使用动态导入避免环境未安装时直接导入失败
 try:
 try:
     _aps_asyncio_mod = importlib.import_module("apscheduler.schedulers.asyncio")
     _aps_asyncio_mod = importlib.import_module("apscheduler.schedulers.asyncio")
@@ -46,13 +52,24 @@ class DemandStartRequest(BaseModel):
 
 
 def _get_demand_schedule_cluster_platform_list() -> list[dict]:
 def _get_demand_schedule_cluster_platform_list() -> list[dict]:
     """动态获取定时任务列表。"""
     """动态获取定时任务列表。"""
+    schedule_items: list[dict] = []
     try:
     try:
         # 延迟导入:避免服务启动时因 ODPS 依赖缺失直接失败
         # 延迟导入:避免服务启动时因 ODPS 依赖缺失直接失败
         from examples.demand.data_query_tools import get_demand_merge_level2_names
         from examples.demand.data_query_tools import get_demand_merge_level2_names
-        return get_demand_merge_level2_names() or []
+        schedule_items = get_demand_merge_level2_names() or []
     except Exception as e:  # pragma: no cover
     except Exception as e:  # pragma: no cover
         print(f"获取定时任务列表失败: {e}")
         print(f"获取定时任务列表失败: {e}")
-        return []
+
+    # 固定补充:全局树每天都要产出 50 条需求(以固定值覆盖动态值)
+    schedule_items = [
+        item for item in schedule_items
+        if not (
+            item.get("cluster_name") == GLOBAL_TREE_SCHEDULE_ITEM["cluster_name"]
+            and item.get("platform_type") == GLOBAL_TREE_SCHEDULE_ITEM["platform_type"]
+        )
+    ]
+    schedule_items.append(GLOBAL_TREE_SCHEDULE_ITEM.copy())
+    return schedule_items
 
 
 # 是否开启定时任务(可选,通过环境变量覆盖)
 # 是否开启定时任务(可选,通过环境变量覆盖)
 DEMAND_SCHEDULER_ENABLED: bool = os.getenv("DEMAND_SCHEDULER_ENABLED", "1").strip() == "1"
 DEMAND_SCHEDULER_ENABLED: bool = os.getenv("DEMAND_SCHEDULER_ENABLED", "1").strip() == "1"