Parcourir la source

Merge branch 'dev_api_init' of weapp/video_decode into master

jihuaqiang il y a 1 jour
Parent
commit
cd39d2ebdf
5 fichiers modifiés avec 202 ajouts et 3 suppressions
  1. 6 0
      .gitignore
  2. 10 2
      main.py
  3. 8 0
      requirements.txt
  4. 172 0
      tasks/topic_search.py
  5. 6 1
      utils/params.py

+ 6 - 0
.gitignore

@@ -127,3 +127,9 @@ dmypy.json
 .idea/
 .vscode/
 .DS_Store
+
+mysql_work/
+output/
+video_detail_output/
+*.xlsx
+视频分品类解构数据/

+ 10 - 2
main.py

@@ -1,13 +1,14 @@
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from fastapi.middleware.cors import CORSMiddleware
-from utils.params import DecodeContentParam, PatternContentParam
+from utils.params import DecodeContentParam, PatternContentParam, TopicSearchParam
 from dotenv import load_dotenv, find_dotenv
 from typing import List, Dict, Any, Optional
 
 from tasks.decode import begin_decode_task
 from tasks.detail import get_decode_detail_by_task_id
 from tasks.pattern import begin_pattern_task
+from tasks.topic_search import search_topics
 
 from loguru import logger
 import sys
@@ -106,4 +107,11 @@ def pattern_content(param: PatternContentParam):
         code=code,
         data={"task_id": task_id} if task_id else None,
         reason=reason
-    )
+    )
+
+
+@app.post("/api/v1/content/topics/search")
+def search_content_topics(param: TopicSearchParam):
+    """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5"""
+    results = search_topics(param)
+    return _build_api_response(code=0, data=results)

+ 8 - 0
requirements.txt

@@ -19,3 +19,11 @@ cryptography>=41.0.0
 
 # HTTP 客户端
 requests>=2.31.0
+
+# Excel 读写(mysql_work 等脚本)
+pandas>=2.0.0
+openpyxl>=3.1.0
+
+# 高性能 JSON 解析
+orjson>=3.9.0
+ijson>=3.2.0

+ 172 - 0
tasks/topic_search.py

@@ -0,0 +1,172 @@
+"""视频选题检索:根据关键词在解构结果中匹配,返回 top5"""
+
+import json
+from typing import List, Dict, Any, Optional, Tuple
+
+from utils.sync_mysql_help import mysql
+from utils.params import TopicSearchParam
+
+TOP_N = 5
+SEARCH_FIELDS = ("inspiration_points", "purpose_points", "key_points")
+FALLBACK_LIMIT = 3000  # 降级时单次最多拉取条数
+
+
+def _to_points_list(val: Any) -> List[str]:
+    """将逗号分隔字符串或列表转为列表格式"""
+    if val is None:
+        return []
+    if isinstance(val, list):
+        return [str(v).strip() for v in val if v]
+    if isinstance(val, str):
+        return [s.strip() for s in val.split(",") if s.strip()]
+    return [str(val)]
+
+
+def _extract_search_text(val: Any) -> str:
+    """从字段值提取文本:支持字符串或列表(逗号分隔)"""
+    if val is None:
+        return ""
+    if isinstance(val, str):
+        return val.strip()
+    if isinstance(val, list):
+        return ",".join(str(v).strip() for v in val if v)
+    return str(val)
+
+
+def _concat_search_fields(row: Dict[str, Any]) -> str:
+    """将检索字段拼接为待匹配文本"""
+    parts = []
+    for field in SEARCH_FIELDS:
+        text = _extract_search_text(row.get(field))
+        if text:
+            parts.append(text)
+    return ",".join(parts)
+
+
+def _calc_match_score(text: str, keywords: List[str]) -> int:
+    """计算匹配度:关键词在文本中出现的次数(不区分大小写)"""
+    if not text or not keywords:
+        return 0
+    text_lower = text.lower()
+    score = 0
+    for kw in keywords:
+        if kw and kw.lower() in text_lower:
+            score += 1
+    return score
+
+
+PAYLOAD_FIELDS = (*SEARCH_FIELDS, "topic_fusion_result")
+
+
+def _escape_like(kw: str) -> str:
+    """转义 LIKE 中的特殊字符:% _ \\"""
+    return kw.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
+
+
+def _build_keyword_like_conds(keywords: List[str]) -> Tuple[str, list]:
+    """构造关键词的 LIKE 条件,返回 (SQL 片段, 参数列表)"""
+    if not keywords:
+        return "1=0", []
+    placeholders = []
+    params = []
+    for kw in keywords:
+        kw = kw.strip()
+        if not kw:
+            continue
+        escaped = _escape_like(kw.lower())
+        like_val = f"%{escaped}%"
+        for field in SEARCH_FIELDS:
+            placeholders.append(f"(LOWER({field}) LIKE LOWER(%s))")
+            params.append(like_val)
+    if not placeholders:
+        return "1=0", []
+    return "(" + " OR ".join(placeholders) + ")", params
+
+
+def _parse_result_payload(payload: Any) -> Dict[str, Any]:
+    """从 result_payload 解析出检索字段及 topic_fusion_result"""
+    if not payload:
+        return {}
+    if isinstance(payload, str):
+        try:
+            payload = json.loads(payload)
+        except json.JSONDecodeError:
+            return {}
+    if not isinstance(payload, dict):
+        return {}
+    return {f: payload.get(f) for f in PAYLOAD_FIELDS}
+
+
+def _fetch_decode_results(keywords: List[str]) -> List[Dict[str, Any]]:
+    """获取有检索字段且匹配关键词的解构结果。优先用独立列,否则从 result_payload 解析"""
+    kw_cond, kw_params = _build_keyword_like_conds(keywords)
+    base_cond = """
+        (inspiration_points IS NOT NULL AND inspiration_points != '')
+        OR (purpose_points IS NOT NULL AND purpose_points != '')
+        OR (key_points IS NOT NULL AND key_points != '')
+    """
+    try:
+        fields = ", ".join(SEARCH_FIELDS) + ", topic_fusion_result, task_id, channel_content_id, title, images, video_url"
+        sql = f"""
+            SELECT {fields}
+            FROM workflow_decode_task_result
+            WHERE ({base_cond}) AND ({kw_cond})
+        """
+        rows = mysql.fetchall(sql, tuple(kw_params) if kw_params else None)
+        return list(rows) if rows else []
+    except Exception:
+        pass
+    # 降级:从 result_payload 解析,限制条数减少全表扫描
+    sql = f"""
+        SELECT task_id, channel_content_id, title, images, video_url, result_payload
+        FROM workflow_decode_task_result
+        WHERE result_payload IS NOT NULL AND result_payload != ''
+        LIMIT {FALLBACK_LIMIT}
+    """
+    rows = mysql.fetchall(sql)
+    if not rows:
+        return []
+    out = []
+    for r in rows:
+        parsed = _parse_result_payload(r.get("result_payload"))
+        merged = {**r, **parsed}
+        if _concat_search_fields(merged):
+            text = _concat_search_fields(merged)
+            if _calc_match_score(text, keywords) > 0:
+                out.append(merged)
+    return out
+
+
+def _build_result_item(row: Dict[str, Any], score: int) -> Dict[str, Any]:
+    """构建单条返回结果,*_points 转为列表格式"""
+    return {
+        "inspiration_points": _to_points_list(row.get("inspiration_points")),
+        "purpose_points": _to_points_list(row.get("purpose_points")),
+        "key_points": _to_points_list(row.get("key_points")),
+        "topic_fusion_result": row.get("topic_fusion_result"),
+        "score": score,
+    }
+
+
+def search_topics(param: TopicSearchParam) -> List[Dict[str, Any]]:
+    """
+    根据关键词检索视频选题,返回匹配度最高的 top5。
+    无匹配时返回空数组。
+    """
+    keywords = [k.strip() for k in param.keywords if k and isinstance(k, str)]
+    if not keywords:
+        return []
+
+    rows = _fetch_decode_results(keywords)
+    scored: List[tuple] = []
+
+    for row in rows:
+        text = _concat_search_fields(row)
+        score = _calc_match_score(text, keywords)
+        if score > 0:
+            scored.append((row, score))
+
+    scored.sort(key=lambda x: x[1], reverse=True)
+    top = scored[:TOP_N]
+
+    return [_build_result_item(row, score) for row, score in top]

+ 6 - 1
utils/params.py

@@ -40,4 +40,9 @@ class PatternContentParam(BaseModel):
     scene: SceneEnum  # 业务场景:0选题 1创作 2制作
     content_type: ContentTypeEnum  # 1 文本 2图片 3 视频
     pattern_name: str  # 聚类名称
-    contents: List[ContentParam]
+    contents: List[ContentParam]
+
+
+class TopicSearchParam(BaseModel):
+    """视频选题检索参数"""
+    keywords: List[str]  # 关键词列表