Explorar el Código

增加多品类需求

xueyiming hace 1 mes
padre
commit
f6e5ee3b52

+ 89 - 4
examples/demand/data_query_tools.py

@@ -1,6 +1,8 @@
+from zoneinfo import ZoneInfo
+
 from odps import ODPS
 from odps.errors import ODPSError
-from datetime import date, timedelta
+from datetime import date, datetime, timedelta
 import json
 from pathlib import Path
 
@@ -31,6 +33,89 @@ def get_odps_data(sql):
         return None
 
 
+def get_demand_merge_level2_names():
+    date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
+    day = date_time.strftime("%Y%m%d")
+    count = 500
+    sql_query = f'''
+select *
+from (
+select
+dt,
+merge二级品类,
+sum(当日分发曝光pv) as 分发曝光pv,
+sum(累计分享回流uv) AS bn_总回流,
+sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
+
+case when sum(当日分发曝光pv)>=10000 then
+    case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
+        then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
+        else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
+    end
+else 0 end AS 总供需分,
+
+case when sum(当日分发曝光pv)>=10000 then
+    case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
+        then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
+        else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
+    end
+else 0 end AS 新供需分,
+
+count(DISTINCT 视频id) as 分发视频量,
+count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
+
+case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
+then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
+end as 缺量,
+
+case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
+then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
+end as 控量,
+
+avg(总日回流uv) AS 总日回流uv,
+avg(总日分发视频数) AS 总日分发视频数,
+avg(总日推荐视频数) AS 总日推荐视频数,
+
+COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
+sum(当日分发回流uv) AS bn_当日分发回流,
+sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
+sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
+count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
+COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
+
+from loghubods.video_dimension_detail_add_column
+where dt = '{day}'
+group by dt, merge二级品类
+) t1 
+where t1.缺量>= {count}
+
+'''
+
+    data = get_odps_data(sql_query)
+    result_list = []
+    if data:
+        for r in data:
+            lack_count = r[9]
+            if lack_count > 1000:
+                count = 70
+            elif 500 < lack_count <= 1000:
+                count = 50
+            elif 100 < lack_count <= 500:
+                count = 30
+            elif 50 < lack_count <= 100:
+                count = 10
+            else:
+                count = 0
+            if count == 0:
+                continue
+            result_list.append({
+                "cluster_name": r[1],
+                "platform_type": "piaoquan",
+                "count": count,
+            })
+    return result_list
+
+
 def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
     merge_level_in_clause = f"'{merge_leve2}'"
     video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
@@ -527,6 +612,6 @@ def backfill_merge_leve2_for_decode_task_result():
         "skipped": skipped_count,
     }
 
-
-if __name__ == '__main__':
-    backfill_merge_leve2_for_decode_task_result()
+#
+# if __name__ == '__main__':
+#     backfill_merge_leve2_for_decode_task_result()

+ 1 - 1
examples/demand/demand.md

@@ -109,5 +109,5 @@ $user$
 1. 共现查询的地点必须来自于高权重分类,不能直接从树上寻找分类
 2. 分类的共现组合,必须来自于`get_weight_score_topn`查询到的分类作为起点
 3. 最终结果的保留,必须要有权重分或者支持度进行支持
-4. 尽可能多的产生需求,尽量保证最终产生的需求数量在30个左右
+4. 尽可能多的产生需求,尽量保证最终产生的需求数量在「%count%」个左右
 

+ 5 - 5
examples/demand/run.py

@@ -317,7 +317,7 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
     return len(rows)
 
 
-async def run_once(execution_id, merge_level2, task_id: Optional[int] = None) -> str:
+async def run_once(execution_id, merge_level2, count: int = 30, task_id: Optional[int] = None) -> str:
     task_log_text = ""
     task_status = 0
 
@@ -347,7 +347,7 @@ async def run_once(execution_id, merge_level2, task_id: Optional[int] = None) ->
     run_config.knowledge.enable_injection = False
     run_config.trace_id = None
 
-    initial_messages = prompt.build_messages(merge_level2=merge_level2)
+    initial_messages = prompt.build_messages(merge_level2=merge_level2, count=count)
 
     store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
     runner = AgentRunner(
@@ -427,6 +427,7 @@ async def run_once(execution_id, merge_level2, task_id: Optional[int] = None) ->
 async def main(
         cluster_name: str,
         platform_type: str,
+        count,
         execution_id: Optional[int] = None,
         task_id: Optional[int] = None,
 ) -> dict:
@@ -442,11 +443,10 @@ async def main(
     if not execution_id:
         return {"execution_id": None, "final_text": ""}
 
-    final_text = await run_once(execution_id, cluster_name, task_id=task_id)
+    final_text = await run_once(execution_id, cluster_name, count=count, task_id=task_id)
     return {"execution_id": execution_id, "final_text": final_text}
 
 
 if __name__ == "__main__":
-    asyncio.run(main('贪污腐败','piaoquan'))
+    asyncio.run(main('贪污腐败', 'piaoquan'))
     # write_demand_items_to_mysql(execution_id=8, merge_level2='贪污腐败')
-

+ 66 - 44
examples/demand/web_api.py

@@ -39,14 +39,18 @@ except Exception:  # pragma: no cover
 class DemandStartRequest(BaseModel):
     cluster_name: str
     platform_type: Literal["piaoquan", "changwen"]
+    count: int
 
 
-# 定时任务配置:请按需修改/补齐
-# 说明:平台映射关系由 platform_type 决定;cluster_name 将用于匹配 demand_task.name
-DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST: list[dict] = [{
-    "cluster_name":"贪污腐败",
-    "platform_type":"piaoquan",
-}]
+def _get_demand_schedule_cluster_platform_list() -> list[dict]:
+    """动态获取定时任务列表。"""
+    try:
+        # 延迟导入:避免服务启动时因 ODPS 依赖缺失直接失败
+        from examples.demand.data_query_tools import get_demand_merge_level2_names
+        return get_demand_merge_level2_names() or []
+    except Exception as e:  # pragma: no cover
+        print(f"获取定时任务列表失败: {e}")
+        return []
 
 # 是否开启定时任务(可选,通过环境变量覆盖)
 DEMAND_SCHEDULER_ENABLED: bool = os.getenv("DEMAND_SCHEDULER_ENABLED", "1").strip() == "1"
@@ -64,39 +68,50 @@ def _get_today_time_window(now: datetime) -> tuple[datetime, datetime]:
     return start_of_today, end_of_today
 
 
-async def demand_start_sync(cluster_name: str, platform_type: Literal["piaoquan", "changwen"]) -> dict:
+async def demand_start_sync(cluster_name: str, platform_type: Literal["piaoquan", "changwen"], count) -> dict:
     """
     与 /demand/start 同一执行链路,但不创建后台任务:prepare -> create demand_task -> 串行 await run_demand。
     """
     # prepare 阶段是同步的(当前示例代码为 sync),这里保持同步串行语义
     execution_id = None
-    if platform_type == "piaoquan":
-        execution_id = piaoquan_prepare(cluster_name)
-    elif platform_type == "changwen":
-        execution_id = changwen_prepare(cluster_name)
-    elif platform_type == "zengzhang":
-        execution_id = zengzhang_prepare(cluster_name)
-
-    if not execution_id:
-        raise HTTPException(status_code=400, detail="获取 execution_id 失败")
-
-    task_name = cluster_name[:32] if cluster_name else None
-    task_id = _create_demand_task(
-        execution_id=execution_id,
-        name=task_name,
-        platform=platform_type,
-    )
-    if not task_id:
-        raise HTTPException(status_code=500, detail="创建 demand_task 失败")
-
-    # run_once 内部 finally 会把 task 状态写回 MySQL
-    result = await run_demand(
-        cluster_name,
-        platform_type,
-        execution_id=execution_id,
-        task_id=task_id,
-    )
-    return {"ok": True, "message": "调用成功", "task_id": task_id, "execution_id": execution_id, "result": result}
+    task_id: Optional[int] = None
+    try:
+        if platform_type == "piaoquan":
+            execution_id = piaoquan_prepare(cluster_name)
+        elif platform_type == "changwen":
+            execution_id = changwen_prepare(cluster_name)
+        elif platform_type == "zengzhang":
+            execution_id = zengzhang_prepare(cluster_name)
+
+        if not execution_id:
+            raise ValueError("获取 execution_id 失败")
+
+        task_name = cluster_name[:32] if cluster_name else None
+        task_id = _create_demand_task(
+            execution_id=execution_id,
+            name=task_name,
+            platform=platform_type,
+        )
+        if not task_id:
+            raise ValueError("创建 demand_task 失败")
+
+        # run_once 内部 finally 会把 task 状态写回 MySQL
+        result = await run_demand(
+            cluster_name,
+            platform_type,
+            count,
+            execution_id=execution_id,
+            task_id=task_id,
+        )
+        return {"ok": True, "message": "调用成功", "task_id": task_id, "execution_id": execution_id, "result": result}
+    except Exception as e:
+        return {
+            "ok": False,
+            "message": f"执行失败: {e}",
+            "task_id": task_id,
+            "execution_id": execution_id,
+            "result": None,
+        }
 
 
 def _today_has_status_0_or_1(cluster_name: str, platform_type: str, now: datetime) -> bool:
@@ -134,13 +149,16 @@ async def demand_scheduled_run_once() -> None:
     遍历配置列表 -> 查当天 demand_task -> 匹配 cluster_name/name & platform_type/platform
     若存在 status=0 或 1 的记录则跳过;否则执行一次 demand_start_sync。
     """
-    if not DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST:
+    # ODPS 查询是阻塞调用,放到线程里避免阻塞事件循环
+    demand_schedule_cluster_platform_list = await asyncio.to_thread(_get_demand_schedule_cluster_platform_list)
+    if not demand_schedule_cluster_platform_list:
         return
 
     now = datetime.now(BEIJING_TZ)
-    for item in DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST:
+    for item in demand_schedule_cluster_platform_list:
         cluster_name = item.get("cluster_name")
         platform_type = item.get("platform_type")
+        count = item.get("count")
         if not cluster_name or platform_type not in ("piaoquan", "changwen"):
             continue
 
@@ -148,8 +166,8 @@ async def demand_scheduled_run_once() -> None:
             print(f"[scheduler] skip: cluster={cluster_name}, platform={platform_type} (today has status 0/1)")
             continue
 
-        print(f"[scheduler] run: cluster={cluster_name}, platform={platform_type}")
-        await demand_start_sync(cluster_name=cluster_name, platform_type=platform_type)  # 串行执行
+        print(f"[scheduler] run: cluster={cluster_name}, platform={platform_type}, count={count}")
+        await demand_start_sync(cluster_name=cluster_name, platform_type=platform_type, count=count)  # 串行执行
 
 
 _demand_scheduler: Optional[Any] = None
@@ -164,6 +182,10 @@ async def _demand_scheduler_job() -> None:
     """
     if _demand_scheduler_lock.locked():
         return
+    # 指定日期跳过:北京时间 2026-04-08 不执行定时任务
+    if datetime.now(BEIJING_TZ).date() == datetime(2026, 4, 8).date():
+        print("[scheduler] skip: 2026-04-08")
+        return
     async with _demand_scheduler_lock:
         await demand_scheduled_run_once()
 
@@ -193,6 +215,7 @@ async def demand_start(req: DemandStartRequest):
         await run_demand(
             req.cluster_name,
             req.platform_type,
+            req.count,
             execution_id=execution_id,
             task_id=task_id,
         )
@@ -277,11 +300,11 @@ def demand_task_status(task_id: int, max_log_chars: int = 2000):
 
 @app.get("/demand/tasks")
 def demand_tasks(
-    status: Optional[int] = None,
-    name: Optional[str] = None,
-    platform_type: Optional[str] = None,
-    page: int = 1,
-    page_size: int = 20,
+        status: Optional[int] = None,
+        name: Optional[str] = None,
+        platform_type: Optional[str] = None,
+        page: int = 1,
+        page_size: int = 20,
 ):
     where_parts: list[str] = []
     where_params: list = []
@@ -331,4 +354,3 @@ def run_server():
 
 if __name__ == "__main__":
     run_server()
-