Quellcode durchsuchen

增加增长相关需求,修改需求prompt

xueyiming vor 1 Monat
Ursprung
Commit
2e5616a4e5

+ 79 - 1
examples/demand/data_query_tools.py

@@ -4,7 +4,6 @@ from datetime import date, timedelta
 import json
 from pathlib import Path
 
-from agent import tool
 from examples.demand.mysql import mysql_db
 
 
@@ -373,6 +372,85 @@ ORDER BY 推荐曝光数 DESC
     return result_list
 
 
+def get_zengzhang_weight(account_name):
+    bizdatemax_date = date.today() - timedelta(days=1)
+    bizdatemin_date = bizdatemax_date - timedelta(days=30)
+    bizdatemax = bizdatemax_date.strftime("%Y%m%d")
+    bizdatemin = bizdatemin_date.strftime("%Y%m%d")
+
+    sql_query = f'''
+SELECT  合作方名
+        ,合作方简称
+        ,videoid
+        ,一级品类
+        ,二级品类
+        ,SUM(头部曝光) as 头部曝光
+        ,SUM(头部曝光uv) as 头部曝光uv
+        ,SUM(头部realplay) as  头部realplay
+        ,SUM(头部realplay_uv) as 头部realplay_uv
+        ,SUM(头部分享) as 头部分享
+        ,SUM(头部分享uv) as 头部分享uv
+        ,SUM(头部回流数)  as 头部回流数
+        ,SUM(推荐曝光数) as 推荐曝光数
+        ,SUM(当日分发曝光uv) as 当日分发曝光uv
+        ,SUM(推荐realplay) as 推荐realplay
+        ,SUM(分发realplay_uv) as 分发realplay_uv
+        ,SUM(推荐分享数) as 推荐分享数
+        ,SUM(当日分发分享uv) as 当日分发分享uv
+        ,SUM(推荐回流数) as 推荐回流数
+        ,SUM(vov分子) as vov分子
+FROM    loghubods.dws_growth_partner_vid_data
+WHERE   dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
+AND     合作方名 = '{account_name}'
+GROUP BY 合作方名
+         ,合作方简称
+         ,videoid
+         ,一级品类
+         ,二级品类
+ORDER BY SUM(推荐曝光数)
+;
+    '''
+    result_list = []
+    data = get_odps_data(sql_query)
+    if data:
+        for r in data:
+            result_list.append(
+                {
+                    "account_name": r[0],
+                    "合作方简称": r[1],
+                    "videoid": r[2],
+                    "一级品类": r[3],
+                    "二级品类": r[4],
+                    "ext_data": {
+                        "头部曝光": r[5],
+                        "头部曝光uv": r[6],
+                        "头部realplay": r[7],
+                        "头部realplay_uv": r[8],
+                        "头部分享": r[9],
+                        "头部分享uv": r[10],
+                        "头部回流数": r[11],
+                        "推荐曝光数": r[12],
+                        "当日分发曝光uv": r[13],
+                        "推荐realplay": r[14],
+                        "分发realplay_uv": r[15],
+                        "推荐分享数": r[16],
+                        "当日分发分享uv": r[17],
+                        "推荐回流数": r[18],
+                        "vov分子": r[19],
+                    },
+                }
+            )
+
+    # 输出到 examples/demand/data/zengzhang_data/
+    output_dir = Path(__file__).parent / "data" / "zengzhang_data"
+    output_dir.mkdir(parents=True, exist_ok=True)
+    output_file = output_dir / f"{account_name}.json"
+    with output_file.open("w", encoding="utf-8") as f:
+        json.dump(result_list, f, ensure_ascii=False, indent=2)
+
+    return result_list
+
+
 def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
     result = {}
     if not video_ids:

+ 4 - 3
examples/demand/demand.md

@@ -96,16 +96,17 @@ $user$
 
 ## 任务
 
-针对「%merge_level2%」,从"高权重叶子元素"出发完成需求生成
+针对「%merge_level2%」,从"高权重叶子元素","高权重分类节点"出发完成需求生成
 
-1. 单元素生成需求,需要先通过`get_weight_score_topn`工具查找高权重元素,判断是否能作为需求,给出理由。满足的进入需求池,不满足的给出丢弃的理由。
+1. 单元素生成需求,需要先通过`get_weight_score_topn`工具查找高权重元素/高权重分类,判断是否能作为需求,给出理由。满足的进入需求池,不满足的给出丢弃的理由。
 2. 组合需求,分类的起点必须需要先通过`get_weight_score_topn`工具查询高权重分类,通过共现查询,找到合适的组合,计算组合权重的平均分和帖子数,综合判断保留或者移除
 3. 组合需求,分类的起点必须需要先通过`get_frequent_itemsets`工具,搜索频繁出现的分类组合,根据支持度进行移除和保留
+4. 生成的需求,必须要有实际的含义,要能体现出「%merge_level2%」,要从需求中可以了解到「%merge_level2%」,不符合要求的词,给出理由直接过滤掉
 
 ## 要求
 
 1. 共现查询的地点必须来自于高权重分类,不能直接从树上寻找分类
 2. 分类的共现组合,必须来自于`get_weight_score_topn`查询到的分类作为起点
 3. 最终结果的保留,必须要有权重分或者支持度进行支持
-4. 尽可能多的产生需求,尽量保证最终产生的需求数量在50个左右
+4. 尽可能多的产生需求,尽量保证最终产生的需求数量在30个左右
 

+ 5 - 0
examples/demand/pattern_builds/pattern_service.py

@@ -514,6 +514,11 @@ def run_mining(
                 post_ids=post_ids,
                 post_limit=post_limit
             )
+        elif platform == 'zengzhang':
+            result = export_post_elements(
+                post_ids=post_ids,
+                post_limit=post_limit
+            )
         elif platform == 'piaoquan':
             result = get_merge_level2_crawler_data(cluster_name)
 

+ 3 - 0
examples/demand/run.py

@@ -20,6 +20,7 @@ from examples.demand.models import TopicPatternExecution
 from examples.demand.piaoquan_prepare import prepare, piaoquan_prepare
 from examples.demand.demand_agent_context import TopicBuildAgentContext
 from examples.demand.mysql import mysql_db
+from examples.demand.zengzhang_prepare import zengzhang_prepare
 
 # Clash Verge TUN 模式兼容:禁止 httpx/urllib 自动检测系统 HTTP 代理
 os.environ.setdefault("no_proxy", "*")
@@ -424,6 +425,8 @@ async def main(
             execution_id = piaoquan_prepare(cluster_name)
         elif platform_type == "changwen":
             execution_id = changwen_prepare(cluster_name)
+        elif platform_type == "zengzhang":
+            execution_id = zengzhang_prepare(cluster_name)
         else:
             execution_id = None
     if not execution_id:

+ 5 - 1
examples/demand/web_api.py

@@ -17,6 +17,7 @@ from pydantic import BaseModel
 # 添加项目根目录到 Python 路径(与 run.py 保持一致)
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
+from examples.demand.zengzhang_prepare import zengzhang_prepare
 from examples.demand.changwen_prepare import changwen_prepare
 from examples.demand.mysql import mysql_db
 from examples.demand.piaoquan_prepare import piaoquan_prepare
@@ -68,10 +69,13 @@ async def demand_start_sync(cluster_name: str, platform_type: Literal["piaoquan"
     与 /demand/start 同一执行链路,但不创建后台任务:prepare -> create demand_task -> 串行 await run_demand。
     """
     # prepare 阶段是同步的(当前示例代码为 sync),这里保持同步串行语义
+    execution_id = None
     if platform_type == "piaoquan":
         execution_id = piaoquan_prepare(cluster_name)
-    else:
+    elif platform_type == "changwen":
         execution_id = changwen_prepare(cluster_name)
+    elif platform_type == "zengzhang":
+        execution_id = zengzhang_prepare(cluster_name)
 
     if not execution_id:
         raise HTTPException(status_code=400, detail="获取 execution_id 失败")

+ 293 - 0
examples/demand/zengzhang_prepare.py

@@ -0,0 +1,293 @@
+import json
+from collections import defaultdict
+from pathlib import Path
+
+from examples.demand.data_query_tools import get_changwen_weight, get_zengzhang_weight
+from examples.demand.db_manager import DatabaseManager
+from examples.demand.models import TopicPatternElement, TopicPatternExecution
+from examples.demand.pattern_builds.pattern_service import run_mining
+
+db = DatabaseManager()
+
+ZENGZHANG_DATA_DIR = Path(__file__).parent / "data" / "zengzhang_data"
+
+
+def _safe_float(value):
+    if value is None:
+        return 0.0
+    try:
+        return float(value)
+    except (TypeError, ValueError):
+        return 0.0
+
+
+def _build_category_scores(name_scores, name_paths, name_post_ids):
+    node_scores = defaultdict(float)
+    node_post_ids = defaultdict(set)
+    for name, score in name_scores.items():
+        paths = name_paths.get(name, set())
+        post_ids = name_post_ids.get(name, set())
+        for category_path in paths:
+            if not category_path:
+                continue
+            nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()]
+            for idx in range(len(nodes)):
+                prefix = ">".join(nodes[: idx + 1])
+                node_scores[prefix] += score
+                if post_ids:
+                    node_post_ids[prefix].update(post_ids)
+    return node_scores, node_post_ids
+
+
+def _write_json(path, payload):
+    with open(path, "w", encoding="utf-8") as f:
+        json.dump(payload, f, ensure_ascii=False, indent=2)
+
+
+def _normalize_post_id(value):
+    """
+    统一 post_id/videoid 格式,降低源数据格式差异导致的匹配失败。
+    """
+    if value is None:
+        return ""
+    s = str(value).strip()
+    if not s:
+        return ""
+    if s.endswith(".0"):
+        s = s[:-2]
+    return s
+
+
+def _extract_digits(value: str) -> str:
+    if not value:
+        return ""
+    return "".join(ch for ch in value if ch.isdigit())
+
+
+def _build_score_by_videoid(cluster_name: str):
+    json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json"
+    with open(json_path, "r", encoding="utf-8") as f:
+        payload = json.load(f)
+
+    if not isinstance(payload, list):
+        raise ValueError(f"数据格式错误,期望数组: {json_path}")
+
+    score_map = {}
+    for item in payload:
+        if not isinstance(item, dict):
+            continue
+        videoid = item.get("videoid")
+        if videoid is None:
+            continue
+        ext_data = item.get("ext_data") or {}
+        if not isinstance(ext_data, dict):
+            continue
+        realplay_uv = _safe_float(ext_data.get("分发realplay_uv"))
+        exposure_uv = _safe_float(ext_data.get("当日分发曝光uv"))
+        norm_videoid = _normalize_post_id(videoid)
+        if not norm_videoid:
+            continue
+        score_map[norm_videoid] = (realplay_uv / exposure_uv) if exposure_uv > 0 else 0.0
+    return score_map
+
+
+def filter_low_exposure_records(
+        cluster_name: str = None,
+        min_exposure: float = 1000,
+):
+    """
+    过滤 JSON 中推荐曝光数小于阈值的记录,并写回原文件。
+    默认过滤阈值: 1000
+    """
+    json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json"
+    with open(json_path, "r", encoding="utf-8") as f:
+        payload = json.load(f)
+
+    if not isinstance(payload, list):
+        raise ValueError(f"数据格式错误,期望数组: {json_path}")
+
+    filtered = []
+    for item in payload:
+        if not isinstance(item, dict):
+            continue
+        ext_data = item.get("ext_data") or {}
+        exposure = _safe_float(ext_data.get("推荐曝光数")) if isinstance(ext_data, dict) else 0.0
+        if exposure >= float(min_exposure):
+            filtered.append(item)
+
+    with open(json_path, "w", encoding="utf-8") as f:
+        json.dump(filtered, f, ensure_ascii=False, indent=2)
+
+    return {
+        "file": str(json_path),
+        "before_count": len(payload),
+        "after_count": len(filtered),
+        "removed_count": len(payload) - len(filtered),
+        "min_exposure": float(min_exposure),
+    }
+
+
+def zengzhang_data_prepare(cluster_name) -> int:
+    json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json"
+    with open(json_path, "r", encoding="utf-8") as f:
+        payload = json.load(f)
+
+    if not isinstance(payload, list):
+        raise ValueError(f"数据格式错误,期望数组: {json_path}")
+
+    video_ids = []
+    for item in payload:
+        if not isinstance(item, dict):
+            continue
+        video_id = item.get("videoid")
+        if video_id is None:
+            continue
+        video_id_str = str(video_id).strip()
+        if video_id_str:
+            video_ids.append(video_id_str)
+
+    # 去重并保持原有顺序,避免重复挖掘同一视频
+    video_ids = list(dict.fromkeys(video_ids))
+    if not video_ids:
+        raise ValueError(f"未在文件中解析到有效 videoid: {json_path}")
+
+    execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name, platform='zengzhang')
+    return execution_id
+
+
+def prepare_by_json_score(execution_id: int, cluster_name: str = ""):
+    """
+    与 prepare.py 的输出结构保持一致,但分数来源改为:
+    score = score = 分发realplay_uv / 当日分发曝光uv
+    """
+    session = db.get_session()
+    try:
+        execution = session.query(TopicPatternExecution).filter(
+            TopicPatternExecution.id == execution_id
+        ).first()
+        if not execution:
+            raise ValueError(f"execution_id 不存在: {execution_id}")
+
+        score_by_post_id = _build_score_by_videoid(cluster_name)
+        rows = session.query(TopicPatternElement).filter(
+            TopicPatternElement.execution_id == execution_id
+        ).all()
+        if not rows:
+            return {"message": "没有可处理的数据", "execution_id": execution_id}
+
+        grouped = {
+            "实质": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
+            "形式": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
+            "意图": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
+        }
+
+        for r in rows:
+            element_type = (r.element_type or "").strip()
+            if element_type not in grouped:
+                continue
+            name = (r.name or "").strip()
+            if not name:
+                continue
+            if r.post_id:
+                grouped[element_type]["name_post_ids"][name].add(str(r.post_id))
+            if r.category_path:
+                grouped[element_type]["name_paths"][name].add(r.category_path.strip())
+
+        output_dir = Path(__file__).parent / "data" / str(execution_id)
+        output_dir.mkdir(parents=True, exist_ok=True)
+        match_stats = {
+            "post_ids_total": 0,
+            "post_ids_scored_direct": 0,
+            "post_ids_scored_by_digits": 0,
+            "post_ids_missing_score": 0,
+        }
+        summary = {
+            "execution_id": execution_id,
+            "merge_leve2": execution.merge_leve2,
+            "files": {},
+            "score_match_stats": match_stats,
+        }
+
+        for element_type, data in grouped.items():
+            name_post_ids = data["name_post_ids"]
+            name_paths = data["name_paths"]
+
+            name_scores = {}
+            for name, post_ids in name_post_ids.items():
+                scores = []
+                for raw_pid in post_ids:
+                    match_stats["post_ids_total"] += 1
+                    pid = _normalize_post_id(raw_pid)
+                    score = score_by_post_id.get(pid)
+                    if score is not None:
+                        match_stats["post_ids_scored_direct"] += 1
+                        scores.append(_safe_float(score))
+                        continue
+
+                    # 兜底:当 post_id 含前后缀时,尝试仅用数字部分匹配 videoid
+                    digits_pid = _extract_digits(pid)
+                    if digits_pid and digits_pid in score_by_post_id:
+                        match_stats["post_ids_scored_by_digits"] += 1
+                        scores.append(_safe_float(score_by_post_id[digits_pid]))
+                    else:
+                        match_stats["post_ids_missing_score"] += 1
+                        scores.append(0.0)
+                name_scores[name] = (sum(scores) / len(scores)) if scores else 0.0
+
+            raw_elements = []
+            for name, score in name_scores.items():
+                post_ids_set = name_post_ids.get(name, set())
+                raw_elements.append(
+                    {
+                        "name": name,
+                        "score": round(score, 6),
+                        "post_ids_count": len(post_ids_set),
+                        "category_paths": sorted(list(name_paths.get(name, set()))),
+                    }
+                )
+
+            element_payload = sorted(raw_elements, key=lambda x: (-x["score"], x["name"]))
+            category_scores, category_post_ids = _build_category_scores(
+                name_scores, name_paths, name_post_ids
+            )
+            category_payload = sorted(
+                [
+                    {
+                        "category_path": path,
+                        "category": path.split(">")[-1].strip() if path else "",
+                        "score": round(score, 6),
+                        "post_ids_count": len(category_post_ids.get(path, set())),
+                    }
+                    for path, score in category_scores.items()
+                ],
+                key=lambda x: x["score"],
+                reverse=True,
+            )
+
+            element_file = output_dir / f"{element_type}_元素.json"
+            category_file = output_dir / f"{element_type}_分类.json"
+            _write_json(element_file, element_payload)
+            _write_json(category_file, category_payload)
+            summary["files"][f"{element_type}_元素"] = str(element_file)
+            summary["files"][f"{element_type}_分类"] = str(category_file)
+
+        return summary
+    finally:
+        session.close()
+
+
+def zengzhang_prepare(cluster_name):
+    get_zengzhang_weight(cluster_name)
+    filter_low_exposure_records(cluster_name=cluster_name)
+    execution_id = zengzhang_data_prepare(cluster_name)
+    if execution_id:
+        print(f"execution_id={execution_id}")
+        print(prepare_by_json_score(execution_id, cluster_name))
+        return execution_id
+
+
+if __name__ == "__main__":
+    # 奇观妙技有乾坤 青史铁事漫谈
+    cluster_name = '新余'
+    execution_id = zengzhang_prepare(cluster_name=cluster_name)
+    print(execution_id)