Ver Fonte

增加merge_leve2字段

jihuaqiang há 4 dias atrás
pai
commit
112ea8c724
4 ficheiros alterados com 80 adições e 7 exclusões
  1. 66 1
      main.py
  2. 7 0
      models/decode_task_result.py
  3. 6 6
      tasks/decode.py
  4. 1 0
      utils/params.py

+ 66 - 1
main.py

@@ -1,9 +1,10 @@
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from fastapi.middleware.cors import CORSMiddleware
+from starlette.responses import Response, StreamingResponse
 from utils.params import DecodeContentParam, PatternContentParam, TopicSearchParam
 from dotenv import load_dotenv, find_dotenv
-from typing import List, Dict, Any, Optional
+from typing import Any, Dict, List, Optional
 
 from tasks.decode import begin_decode_task
 from tasks.detail import get_decode_detail_by_task_id
@@ -15,6 +16,18 @@ import sys
 
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
 
+# 接口访问日志(与仅 ERROR 的 sink 并存,默认 stderr 仍会输出 INFO)
+_MAX_ACCESS_LOG_BYTES = 8192
+
+
+def _preview_bytes(data: bytes) -> str:
+    if not data:
+        return ""
+    text = data.decode("utf-8", errors="replace")
+    if len(text) > _MAX_ACCESS_LOG_BYTES:
+        return text[:_MAX_ACCESS_LOG_BYTES] + f"...<truncated len={len(text)}>"
+    return text
+
 # 响应消息映射
 RESPONSE_MSG_MAP = {
     0: "success",
@@ -35,6 +48,56 @@ app.add_middleware(
     allow_headers=["*"],
 )
 
+
+@app.middleware("http")
+async def api_access_log_middleware(request: Request, call_next):
+    """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。"""
+    body_bytes = await request.body()
+
+    async def receive() -> dict:
+        return {"type": "http.request", "body": body_bytes, "more_body": False}
+
+    wrapped = Request(request.scope, receive)
+
+    req_body_preview = _preview_bytes(body_bytes) if body_bytes else ""
+    logger.info(
+        "api_request method={} path={} query={} body={}",
+        request.method,
+        request.url.path,
+        str(request.query_params),
+        req_body_preview if req_body_preview else "<empty>",
+    )
+
+    response = await call_next(wrapped)
+
+    if isinstance(response, StreamingResponse):
+        logger.info(
+            "api_response status={} path={} body=<streaming skipped>",
+            response.status_code,
+            request.url.path,
+        )
+        return response
+
+    resp_chunks: List[bytes] = []
+    async for chunk in response.body_iterator:
+        resp_chunks.append(chunk)
+    resp_body = b"".join(resp_chunks)
+    resp_preview = _preview_bytes(resp_body) if resp_body else ""
+    logger.info(
+        "api_response status={} path={} body={}",
+        response.status_code,
+        request.url.path,
+        resp_preview if resp_preview else "<empty>",
+    )
+
+    return Response(
+        content=resp_body,
+        status_code=response.status_code,
+        headers=dict(response.headers),
+        media_type=response.media_type,
+    )
+
+
 @app.exception_handler(HTTPException)
 async def http_exception_handler(request: Request, exc: HTTPException):
     """统一处理 HTTPException,保证返回结构与其它接口一致"""
@@ -76,6 +139,8 @@ def decode_content(param: DecodeContentParam):
     code = res.get("code", -1)
     task_id = res.get("task_id")
     reason = res.get("reason", "")
+
+    
     
     return _build_api_response(
         code=code,

+ 7 - 0
models/decode_task_result.py

@@ -23,6 +23,7 @@ class WorkflowDecodeTaskResult(BaseModel):
     channel_account_name: Annotated[Optional[str], Field(description='作者上下文', default=None)]
     body_text: Annotated[Optional[str], Field(description='内容上下文', default=None)]
     video_url: Annotated[Optional[str], Field(description='内容视频地址', default=None)]
+    merge_leve2: Annotated[Optional[str], Field(description='二级品类/标签', default=None)]
 
     def save(self):
         """保存结果到数据库"""
@@ -56,6 +57,11 @@ class WorkflowDecodeTaskResult(BaseModel):
             images_list.append(content.video_url)
         images_str = ','.join(images_list) if images_list else ''
         
+        ml2 = (content.merge_leve2 or "").strip()
+        merge_leve2_val: Optional[str] = None
+        if ml2:
+            merge_leve2_val = ml2[:128] if len(ml2) > 128 else ml2
+
         result = WorkflowDecodeTaskResult(
             task_id=task_id,
             channel_content_id=content.channel_content_id,
@@ -65,6 +71,7 @@ class WorkflowDecodeTaskResult(BaseModel):
             channel_account_name=content.channel_account_name[:64] if content.channel_account_name and len(content.channel_account_name) > 64 else (content.channel_account_name or ''),
             body_text=content.body_text or '',
             video_url=content.video_url[:100] if content.video_url and len(content.video_url) > 100 else (content.video_url or ''),
+            merge_leve2=merge_leve2_val,
             result_payload=None,
             result_size=0
         )

+ 6 - 6
tasks/decode.py

@@ -54,7 +54,7 @@ def _create_workflow_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Op
 
 
 def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
-    """初始化选题解构任务结果"""
+    """初始化选题解构任务结果(`content` 中 title、images、merge_leve2 等字段一并写入 workflow_decode_task_result)。"""
     try:
         result = WorkflowDecodeTaskResult.create_result(
             task_id=task_id,
@@ -68,7 +68,7 @@ def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTas
 
 
 def _initialize_script_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
-    """初始化创作解构任务结果(写入 workflow_script_task_result)"""
+    """初始化创作解构任务结果(`content` 含 merge_leve2 等字段,写入 workflow_script_task_result)"""
     try:
         result = WorkflowDecodeTaskResult.create_result(
             task_id=task_id,
@@ -164,7 +164,7 @@ def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.D
 
 
 def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
-    """选题解构方法"""
+    """选题解构:创建任务并将 `param.content`(含 `merge_leve2`)写入 `workflow_decode_task_result`。"""
     try:
         # 前置配额检查,用于超出每天解构次数时,直接返回错误
         # if not _check_quota(param.scene, CapabilityEnum.DECODE, param.content_type):
@@ -181,7 +181,7 @@ def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
                 "创建解构任务失败"
             )
         
-        # 步骤2: 初始化任务结果
+        # 步骤2: 初始化任务结果(merge_leve2 与 title/images 等同属 content,由 create_result 落库)
         result = _initialize_task_result(task.task_id, param.content)
         if not result or not result.task_id:
             return _build_error_response(
@@ -224,7 +224,7 @@ def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
 
 
 def decode_creation(param: DecodeContentParam) -> Dict[str, Any]:
-    """创作解构方法"""
+    """创作解构:创建任务并将 `param.content`(含 `merge_leve2`)写入 `workflow_script_task_result`。"""
     try:
         # 步骤1: 创建工作流task任务(scene 为 CREATION)
         task = _create_workflow_task(param.scene, param.content_type)
@@ -234,7 +234,7 @@ def decode_creation(param: DecodeContentParam) -> Dict[str, Any]:
                 "创建创作解构任务失败"
             )
 
-        # 步骤2: 初始化创作解构任务结果到 workflow_script_task_result
+        # 步骤2: 初始化创作解构任务结果(merge_leve2 等同 content 字段一并落库)
         result = _initialize_script_task_result(task.task_id, param.content)
         if not result or not result.task_id:
             return _build_error_response(

+ 1 - 0
utils/params.py

@@ -30,6 +30,7 @@ class ContentParam(BaseModel):
     channel_account_id: Optional[str] = None
     channel_account_name: Optional[str] = None
     weight_score: Optional[float] = None  # 表现力分数,聚类必传
+    merge_leve2: Optional[str] = None
 
 
 class DecodeContentParam(BaseModel):