#16 agent单元测试

Open
xueyiming wants to merge 63 commits from Server/dev-xym-add-test-task into Server/master

+ 20 - 0
pqai_agent/data_models/agent_test_task.py

@@ -0,0 +1,20 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class AgentTestTask(Base):
+    __tablename__ = "agent_test_task"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    agent_id = Column(BigInteger, nullable=False, comment="agent主键")
+    module_id = Column(BigInteger, nullable=False, comment="model主键")
+    create_user = Column(String(32), nullable=True, comment="创建用户")
+    update_user = Column(String(32), nullable=True, comment="更新用户")
+    dataset_ids = Column(Text, nullable=True, comment="数据集ids")
+    evaluate_type = Column(Integer, nullable=False, default=0, comment="数据集ids")
+    status = Column(Integer, nullable=True, comment="状态(0:未开始, 1:进行中, 2:已完成, 3:已取消)")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")

+ 22 - 0
pqai_agent/data_models/agent_test_task_conversations.py

@@ -0,0 +1,22 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class AgentTestTaskConversations(Base):
+    __tablename__ = "agent_test_task_conversations"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    task_id = Column(BigInteger, nullable=False, comment="任务主键")
+    agent_id = Column(BigInteger, nullable=False, comment="agent主键")
+    dataset_id = Column(BigInteger, nullable=False, comment="数据集主键")
+    conversation_id = Column(BigInteger, nullable=False, comment="对话主键")
+    input = Column(Text, nullable=False, comment="输入内容")
+    output = Column(Text, nullable=False, comment="输出内容")
+    score = Column(Text, nullable=False, comment="得分")
+    status = Column(Integer, default=0, nullable=False,
+                    comment="状态(0:待执行, 1:执行中, 2:执行成功, 3:执行失败, 4:已取消, 5:消息失败, 6:打分失败)")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")

+ 17 - 0
pqai_agent/data_models/dataset_model.py

@@ -0,0 +1,17 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class DatasetModule(Base):
+    __tablename__ = "dataset_module"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    dataset_id = Column(BigInteger, nullable=False, comment="数据集id")
+    module_id = Column(BigInteger, nullable=False, comment="模型id")
+    is_default = Column(Integer, nullable=False, default=0, comment="是否为该模块的默认数据集")
+    is_delete = Column(Integer, nullable=False, default=0, comment="是否删除 1-删除 0-未删除")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")

+ 18 - 0
pqai_agent/data_models/datasets.py

@@ -0,0 +1,18 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class Datasets(Base):
+    __tablename__ = "datasets"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    name = Column(String(64), nullable=True, comment="数据集名称")
+    type = Column(Integer, default=0, nullable=False, comment="数据集类型 0-内部 1-外部")
+    description = Column(String(256), nullable=True, comment="数据集描述")
+    is_delete = Column(Integer, nullable=False, default=False, comment="是否删除 1-删除 0-未删除")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")
+

+ 23 - 0
pqai_agent/data_models/internal_conversation_data.py

@@ -0,0 +1,23 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP, Float
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class InternalConversationData(Base):
+    __tablename__ = "internal_conversation_data"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键")
+    dataset_id = Column(BigInteger, nullable=False, comment="数据集id")
+    staff_id = Column(String(64), nullable=True, comment="员工画像id")
+    user_id = Column(String(64), nullable=True, comment="用户画像id")
+    version_date = Column(String(16), nullable=True, comment="日期版本")
+    conversation = Column(Text, nullable=True, comment="输入内容")
+    content = Column(Text, nullable=True, comment="回复消息内容")
+    send_time = Column(BigInteger, nullable=False, comment="回复时间戳")
+    send_type = Column(Integer, nullable=False, comment="回复类型 0: reply 1: push")
+    user_active_rate = Column(Float, nullable=False, comment="用户活跃度")
+    is_delete = Column(Integer, nullable=False, default=False, comment="是否删除 1-删除 0-未删除")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")

+ 22 - 0
pqai_agent/data_models/qywx_chat_history.py

@@ -0,0 +1,22 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class QywxChatHistory(Base):
+    __tablename__ = "qywx_chat_history"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    guid = Column(String(64), nullable=False, comment="设备唯一标识")
+    appinfo = Column(String(128), nullable=True)
+    quote_appinfo = Column(String(128), nullable=True)
+    sender =  Column(String(64), nullable=False, comment="发送者 ID")
+    receiver = Column(String(64), nullable=False, comment="接收者 ID")
+    roomid = Column(String(64), nullable=False,default='0', comment="聊天室id,私聊:private前缀,群聊:group前缀")
+    sendtime = Column(BigInteger, nullable=True, default=0, comment="单位ms")
+    sender_name = Column(String(255), nullable=False, comment="发送者昵称")
+    msg_type = Column(Integer, nullable=False, default=0, comment="消息类型 枚举参考代码")
+    attachment = Column(Text, nullable=True, comment="附件:图片、视频等")
+    origin_msg = Column(Text, nullable=True, comment="原始消息")
+    content = Column(Text, nullable=True)

+ 22 - 0
pqai_agent/data_models/qywx_employee.py

@@ -0,0 +1,22 @@
+from sqlalchemy import Column, Integer, Text, BigInteger, String, SmallInteger, Boolean, TIMESTAMP
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class QywxEmployee(Base):
+    __tablename__ = "qywx_employee"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    third_party_user_id = Column(String(32), nullable=True, comment="员工在三方平台ID,唯一")
+    name = Column(String(50), nullable=False, comment="员工姓名")
+    wxid = Column(String(32), nullable=False, comment="员工在企业微信的ID,唯一")
+    status = Column(Integer, nullable=False, comment="员工状态(0: 离职, 1: 在职)")
+    create_time = Column(BigInteger, nullable=True, default=0, comment="创建时间(时间戳)")
+    update_time = Column(BigInteger, nullable=True, default=0, comment="更新时间(时间戳)")
+    agent_name = Column(String(50), nullable=True, comment="作为服务助手时的名字")
+    agent_gender = Column(SmallInteger, nullable=True, comment="作为服务助手时的性别")
+    agent_age = Column(SmallInteger, nullable=True, comment="作为服务助手时的年龄")
+    agent_region = Column(String(50), nullable=True, comment="作为服务助手时的地区")
+    agent_profile = Column(Text, nullable=True, comment="服务助手的画像,JSON字符串")
+    guid = Column(String(50), nullable=True, comment="设备ID")

+ 72 - 0
pqai_agent/toolkit/lunar_festival_mapper.py

@@ -0,0 +1,72 @@
+import lunardate
+import datetime
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.base import BaseToolkit
+from pqai_agent.toolkit.function_tool import FunctionTool
+from collections import defaultdict
+
+
+class LunarFestivalMapper(BaseToolkit):
+    # 常见农历节日定义(月份, 日期)
+    FESTIVALS = {
+        (1, 1): "春节",
+        (1, 15): "元宵节",
+        (2, 2): "龙抬头",
+        (5, 5): "端午节",
+        (7, 7): "七夕",
+        (7, 15): "中元节",
+        (8, 15): "中秋节",
+        (9, 9): "重阳节",
+        (12, 8): "腊八节",
+        (12, 23): "小年",
+        (12, 30): "除夕"
+    }
+
+    def __init__(self, year=2025):
+        super().__init__()
+        self.year = year
+        self.festival_dates = self._calculate_festivals()
+
+    def _calculate_festivals(self):
+        """计算指定年份的农历节日对应的公历日期"""
+        results = defaultdict(list)
+
+        # 遍历整年的每一天
+        start_date = datetime.date(self.year, 1, 1)
+        end_date = datetime.date(self.year, 12, 31)
+        current_date = start_date
+
+        while current_date <= end_date:
+            try:
+                # 将公历转换为农历
+                lunar = lunardate.LunarDate.fromSolarDate(
+                    current_date.year,
+                    current_date.month,
+                    current_date.day
+                )
+                # 检查是否为农历节日(非闰月)
+                festival_key = (lunar.month, lunar.day)
+                if festival_key in self.FESTIVALS:
+                    festival_name = self.FESTIVALS[festival_key]
+                    results[festival_name].append(current_date)
+
+            except ValueError:
+                # 跳过无效日期(如2月30日等)
+                pass
+
+            # 下一天
+            current_date += datetime.timedelta(days=1)
+
+        # 处理结果(每个节日只取第一个出现的日期)
+        return {name: dates[0] for name, dates in results.items()}
+
+    def get_festival_date(self, festival_name):
+        """获取指定节日的公历日期"""
+        return self.festival_dates.get(festival_name, "节日未找到或不在该年")
+
+    def get_all_festivals(self):
+        """获取该年所有农历节日日期"""
+        return self.festival_dates
+
+    def get_tools(self):
+        return [FunctionTool(self.get_festival_date)]

+ 151 - 7
pqai_agent_server/api_server.py

@@ -1,18 +1,15 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
-import time
 import logging
-import werkzeug.exceptions
-from flask import Flask, request, jsonify
 from argparse import ArgumentParser
 
+import werkzeug.exceptions
+from flask import Flask, request, jsonify
 from sqlalchemy.orm import sessionmaker
 
 from pqai_agent import configs
-
 from pqai_agent import logging_service, chat_service, prompt_templates
-from pqai_agent.agents.message_reply_agent import MessageReplyAgent
 from pqai_agent.data_models.agent_configuration import AgentConfiguration
 from pqai_agent.data_models.service_module import ServiceModule
 from pqai_agent.history_dialogue_service import HistoryDialogueService
@@ -20,18 +17,22 @@ from pqai_agent.user_manager import MySQLUserManager, MySQLUserRelationManager
 from pqai_agent.utils.db_utils import create_ai_agent_db_engine
 from pqai_agent.utils.prompt_utils import format_agent_profile, format_user_profile
 from pqai_agent_server.const import AgentApiConst
+from pqai_agent_server.const.status_enum import TestTaskStatus
+from pqai_agent_server.dataset_service import DatasetService
 from pqai_agent_server.models import MySQLSessionManager
-from pqai_agent_server.utils import wrap_response, quit_human_intervention_status
+from pqai_agent_server.task_server import TaskManager
 from pqai_agent_server.utils import (
     run_extractor_prompt,
     run_chat_prompt,
     run_response_type_prompt,
 )
+from pqai_agent_server.utils import wrap_response, quit_human_intervention_status
 
 app = Flask('agent_api_server')
 logger = logging_service.logger
 const = AgentApiConst()
 
+
 @app.route('/api/listStaffs', methods=['GET'])
 def list_staffs():
     staff_data = app.user_relation_manager.list_staffs()
@@ -178,6 +179,7 @@ def run_prompt():
         logger.error(e)
         return wrap_response(500, msg='Error: {}'.format(e))
 
+
 @app.route('/api/formatForPrompt', methods=['POST'])
 def format_data_for_prompt():
     try:
@@ -312,6 +314,7 @@ def quit_human_interventions_status():
 
     return wrap_response(200, data=response)
 
+
 ## Agent管理接口
 @app.route("/api/getNativeAgentList", methods=["GET"])
 def get_native_agent_list():
@@ -348,6 +351,7 @@ def get_native_agent_list():
     ]
     return wrap_response(200, data=ret_data)
 
+
 @app.route("/api/getNativeAgentConfiguration", methods=["GET"])
 def get_native_agent_configuration():
     """
@@ -379,6 +383,7 @@ def get_native_agent_configuration():
         }
         return wrap_response(200, data=data)
 
+
 @app.route("/api/saveNativeAgentConfiguration", methods=["POST"])
 def save_native_agent_configuration():
     """
@@ -432,6 +437,7 @@ def save_native_agent_configuration():
         session.commit()
         return wrap_response(200, msg='Agent configuration saved successfully', data={'id': agent.id})
 
+
 @app.route("/api/getModuleList", methods=["GET"])
 def get_module_list():
     """
@@ -456,6 +462,7 @@ def get_module_list():
     ]
     return wrap_response(200, data=ret_data)
 
+
 @app.route("/api/getModuleConfiguration", methods=["GET"])
 def get_module_configuration():
     """
@@ -482,6 +489,7 @@ def get_module_configuration():
         }
         return wrap_response(200, data=data)
 
+
 @app.route("/api/saveModuleConfiguration", methods=["POST"])
 def save_module_configuration():
     """
@@ -520,6 +528,135 @@ def save_module_configuration():
         session.commit()
         return wrap_response(200, msg='Module configuration saved successfully', data={'id': module.id})
 
+
+@app.route("/api/getTestTaskList", methods=["GET"])
+def get_test_task_list():
+    """
+       获取单元测试任务列表
+       :return:
+    """
+    page_num = request.args.get("pageNum", const.DEFAULT_PAGE_ID)
+    page_size = request.args.get("pageSize", const.DEFAULT_PAGE_SIZE)
+    try:
+        page_num = int(page_num)
+        page_size = int(page_size)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
+    response = app.task_manager.get_test_task_list(page_num, page_size)
+    return wrap_response(200, data=response)
+
+
+@app.route("/api/getTestTaskConversations", methods=["GET"])
+def get_test_task_conversations():
+    """
+       获取单元测试对话任务列表
+       :return:
+    """
+    task_id = request.args.get("taskId", None)
+    if not task_id:
+        return wrap_response(404, msg='task_id is required')
+    page_num = request.args.get("pageNum", const.DEFAULT_PAGE_ID)
+    page_size = request.args.get("pageSize", const.DEFAULT_PAGE_SIZE)
+    try:
+        page_num = int(page_num)
+        page_size = int(page_size)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
+    response = app.task_manager.get_test_task_conversations(int(task_id), page_num, page_size)
+    return wrap_response(200, data=response)
+
+
+@app.route("/api/createTestTask", methods=["POST"])
+def create_test_task():
+    """
+       创建单元测试任务
+       :return:
+    """
+    req_data = request.json
+    agent_id = req_data.get('agentId', None)
+    module_id = req_data.get('moduleId', None)
+    evaluate_type = req_data.get('evaluateType', None)
+    if not agent_id:
+        return wrap_response(404, msg='agent id is required')
+    if not module_id:
+        return wrap_response(404, msg='module id is required')
+    if not evaluate_type:
+        return wrap_response(404, msg='evaluate_type id is required')
+    app.task_manager.create_task(agent_id, module_id, evaluate_type)
+    return wrap_response(200)
+
+
+@app.route("/api/stopTestTask", methods=["POST"])
+def stop_test_task():
+    """
+       停止单元测试任务
+       :return:
+    """
+    req_data = request.json
+    task_id = req_data.get('taskId', None)
+    if not task_id:
+        return wrap_response(400, msg='task id is required')
+    task = app.task_manager.get_task(task_id)
+    if task.status not in (TestTaskStatus.NOT_STARTED.value, TestTaskStatus.IN_PROGRESS.value):
+        return wrap_response(400, msg='task status is invalid')
+    app.task_manager.cancel_task(task_id)
+    return wrap_response(200)
+
+
+@app.route("/api/resumeTestTask", methods=["POST"])
+def resume_test_task():
+    """
+       恢复停止的单元测试任务
+       :return:
+    """
+    req_data = request.json
+    task_id = req_data.get('taskId', None)
+    if not task_id:
+        return wrap_response(400, msg='task id is required')
+    task = app.task_manager.get_task(task_id)
+    if task.status != TestTaskStatus.CANCELLED.value:
+        return wrap_response(400, msg='task status is invalid')
+    app.task_manager.resume_task(task_id)
+    return wrap_response(200)
+
+
+@app.route("/api/getDatasetList", methods=["GET"])
+def get_dataset_list():
+    """
+       获取数据集列表
+       :return:
+    """
+    page_num = request.args.get("pageNum", const.DEFAULT_PAGE_ID)
+    page_size = request.args.get("pageSize", const.DEFAULT_PAGE_SIZE)
+    try:
+        page_num = int(page_num)
+        page_size = int(page_size)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
+    response = app.dataset_service.get_dataset_list(page_num, page_size)
+    return wrap_response(200, data=response)
+
+
+@app.route("/api/getConversationDataList", methods=["GET"])
+def get_conversation_data_list():
+    """
+       获取对话列表
+       :return:
+    """
+    dataset_id = request.args.get("datasetId", None)
+    if not dataset_id:
+        return wrap_response(404, msg='dataset_id is required')
+    page_num = request.args.get("pageNum", const.DEFAULT_PAGE_ID)
+    page_size = request.args.get("pageSize", const.DEFAULT_PAGE_SIZE)
+    try:
+        page_num = int(page_num)
+        page_size = int(page_size)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
+    response = app.dataset_service.get_conversation_data_list(int(dataset_id), page_num, page_size)
+    return wrap_response(200, data=response)
+
+
 @app.errorhandler(werkzeug.exceptions.BadRequest)
 def handle_bad_request(e):
     logger.error(e)
@@ -559,9 +696,16 @@ if __name__ == '__main__':
         chat_history_table=chat_history_db_config['table']
     )
     app.session_manager = session_manager
-    agent_db_engine = create_ai_agent_db_engine(config['database']['ai_agent'])
+    agent_db_engine = create_ai_agent_db_engine()
     app.session_maker = sessionmaker(bind=agent_db_engine)
 
+    dataset_service = DatasetService(session_maker=sessionmaker(bind=agent_db_engine))
+    app.dataset_service = dataset_service
+
+    task_manager = TaskManager(session_maker=sessionmaker(bind=agent_db_engine), dataset_service=dataset_service)
+    app.task_manager = task_manager
+    task_manager.recover_tasks()
+
     wecom_db_config = config['storage']['user_relation']
     user_relation_manager = MySQLUserRelationManager(
         agent_db_config, growth_db_config,

+ 66 - 0
pqai_agent_server/const/status_enum.py

@@ -0,0 +1,66 @@
+from enum import Enum
+
+
+class TestTaskStatus(Enum):
+    NOT_STARTED = 0
+    IN_PROGRESS = 1
+    COMPLETED = 2
+    CANCELLED = 3
+    FAILED = 4
+    CREATING = 5
+    CREATED_FAIL = 6
+
+    @property
+    def description(self):
+        descriptions = {
+            self.NOT_STARTED: "未开始",
+            self.IN_PROGRESS: "进行中",
+            self.COMPLETED: "已完成",
+            self.CANCELLED: "已取消",
+            self.FAILED: "已失败",
+            self.CREATING: "生成任务中",
+            self.CREATED_FAIL: "生成任务失败"
+        }
+        return descriptions.get(self)
+
+
+# 使用示例
+def get_test_task_status_desc(status_code):
+    try:
+        status = TestTaskStatus(status_code)
+        return status.description
+    except ValueError:
+        return f"未知状态: {status_code}"
+
+
+class TestTaskConversationsStatus(Enum):
+    """任务状态枚举类"""
+    PENDING = 0  # 待执行
+    RUNNING = 1  # 执行中
+    SUCCESS = 2  # 执行成功
+    FAILED = 3  # 执行失败
+    CANCELLED = 4  # 已取消
+    MESSAGE_FAILED = 5  # 消息失败
+    SCORE_FAILED = 6  # 打分失败
+
+    @property
+    def description(self):
+        descriptions = {
+            self.PENDING: "待执行",
+            self.RUNNING: "执行中",
+            self.SUCCESS: "执行成功",
+            self.FAILED: "执行失败",
+            self.CANCELLED: "已取消",
+            self.MESSAGE_FAILED: "消息失败",
+            self.SCORE_FAILED: "打分失败"
+        }
+        return descriptions.get(self)
+
+
+# 使用示例
+def get_test_task_conversations_status_desc(status_code):
+    try:
+        status = TestTaskConversationsStatus(status_code)
+        return status.description
+    except ValueError:
+        return f"未知状态: {status_code}"

+ 22 - 0
pqai_agent_server/const/type_enum.py

@@ -0,0 +1,22 @@
+from enum import Enum
+
+
+class DatasetType(Enum):
+    INTERNAL = 0
+    EXTERNAL = 1
+
+    @property
+    def description(self):
+        descriptions = {
+            self.INTERNAL: "内部",
+            self.EXTERNAL: "外部"
+        }
+        return descriptions.get(self)
+
+# 使用示例
+def get_dataset_type_desc(type_code):
+    try:
+        type = DatasetType(type_code)
+        return type.description
+    except ValueError:
+        return f"未知类型: {type_code}"

+ 160 - 0
pqai_agent_server/dataset_service.py

@@ -0,0 +1,160 @@
+import json
+from cgitb import reset
+from typing import List
+
+from sqlalchemy import func
+
+from pqai_agent.data_models.dataset_model import DatasetModule
+from pqai_agent.data_models.datasets import Datasets
+from pqai_agent.data_models.internal_conversation_data import InternalConversationData
+from pqai_agent.data_models.qywx_chat_history import QywxChatHistory
+from pqai_agent.data_models.qywx_employee import QywxEmployee
+from pqai_agent_server.const.type_enum import get_dataset_type_desc
+from pqai_agent_server.utils.odps_utils import ODPSUtils
+
+
+class DatasetService:
+    def __init__(self, session_maker):
+        self.session_maker = session_maker
+        odps_utils = ODPSUtils()
+        self.odps_utils = odps_utils
+
+    def get_user_profile_data(self, third_party_user_id: str, date_version: str):
+        sql = f"""
+           SELECT * FROM third_party_user_date_version
+           WHERE dt >= '20250612' and dt < {date_version}  -- 添加分区条件
+           and third_party_user_id = {third_party_user_id}
+           and profile_data_v1 is not null 
+           order by dt desc 
+           limit 1
+           """
+        result_df = self.odps_utils.execute_sql(sql)
+
+        if not result_df.empty:
+            return result_df.iloc[0].to_dict()  # 获取第一行
+        return None
+
+    def get_dataset_module_list_by_module(self, module_id: int):
+        with self.session_maker() as session:
+            return session.query(DatasetModule).filter(DatasetModule.module_id == module_id).filter(
+                DatasetModule.is_delete == 0).all()
+
+    def get_conversation_data_list_by_dataset(self, dataset_id: int):
+        with self.session_maker() as session:
+            return session.query(InternalConversationData).filter(
+                InternalConversationData.dataset_id == dataset_id).filter(
+                InternalConversationData.is_delete == 0).order_by(
+                InternalConversationData.id.asc()
+            ).all()
+
+    def get_conversation_data_by_id(self, conversation_data_id: int):
+        with self.session_maker() as session:
+            return session.query(InternalConversationData).filter(
+                InternalConversationData.id == conversation_data_id).one()
+
+    def get_staff_profile_data(self, third_party_user_id: str):
+        with self.session_maker() as session:
+            return session.query(QywxEmployee).filter(
+                QywxEmployee.third_party_user_id == third_party_user_id).one()
+
+    def get_conversation_list_by_ids(self, conversation_ids: List[int]):
+        with self.session_maker() as session:
+            conversations = session.query(QywxChatHistory).filter(QywxChatHistory.id.in_(conversation_ids)).order_by(
+                QywxChatHistory.id.asc()).all()
+            result = []
+            for conversation in conversations:
+                data = {}
+                data["id"] = conversation.id
+                data["sender"] = conversation.sender
+                data["receiver"] = conversation.receiver
+                data["roomid"] = conversation.roomid
+                data["sendtime"] = conversation.sendtime / 1000
+                data["msg_type"] = conversation.msg_type
+                data["content"] = conversation.content
+                result.append(data)
+        return result
+
+    def get_chat_conversation_list_by_ids(self, conversation_ids: List[int], staff_id):
+        result = self.get_conversation_list_by_ids(conversation_ids)
+        conversations = [
+            {
+                "content": conversation['content'],
+                "role": "assistant" if conversation['sender'] == staff_id else "user",
+                "timestamp": conversation['sendtime']
+            } for conversation in result
+        ]
+        return conversations
+
+
+
+    def get_dataset_list(self, page_num: int, page_size: int):
+        with self.session_maker() as session:
+            # 计算偏移量
+            offset = (page_num - 1) * page_size
+            # 查询分页数据
+            result = (session.query(Datasets)
+                      .filter(Datasets.is_delete == 0)
+                      .limit(page_size).offset(offset).all())
+            # 查询总记录数
+            total = session.query(func.count(Datasets.id)).filter(Datasets.is_delete == 0).scalar()
+
+            total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
+            total_page = 1 if total_page <= 0 else total_page
+            response_data = [
+                {
+                    "id": dataset.id,
+                    "name": dataset.name,
+                    "type": get_dataset_type_desc(dataset.type),
+                    "description": dataset.description,
+                    "createTime": dataset.create_time.strftime("%Y-%m-%d %H:%M:%S"),
+                    "updateTime": dataset.update_time.strftime("%Y-%m-%d %H:%M:%S")
+                }
+                for dataset in result
+            ]
+            return {
+                "currentPage": page_num,
+                "pageSize": page_size,
+                "totalSize": total_page,
+                "total": total,
+                "list": response_data,
+            }
+
+    def get_conversation_data_list(self, dataset_id: int, page_num: int, page_size: int):
+        with self.session_maker() as session:
+            # 计算偏移量
+            offset = (page_num - 1) * page_size
+            # 查询分页数据
+            result = (session.query(InternalConversationData)
+                      .filter(InternalConversationData.dataset_id == dataset_id)
+                      .filter(InternalConversationData.is_delete == 0)
+                      .limit(page_size).offset(offset).all())
+            # 查询总记录数
+            total = session.query(func.count(InternalConversationData.id)).filter(
+                InternalConversationData.is_delete == 0).scalar()
+
+            total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
+            total_page = 1 if total_page <= 0 else total_page
+            response_data = []
+            for conversation_data in result:
+                data = {}
+                data["id"] = conversation_data.id
+                data["datasetId"] = conversation_data.dataset_id
+                data["staff"] = self.get_staff_profile_data(conversation_data.staff_id).agent_profile
+                data["user"] = self.get_user_profile_data(conversation_data.user_id,
+                                                          conversation_data.version_date.replace("-", ""))[
+                    'profile_data_v1']
+                data["conversation"] = self.get_conversation_list_by_ids(json.loads(conversation_data.conversation))
+                data["content"] = conversation_data.content
+                data["sendTime"] = conversation_data.send_time
+                data["sendType"] = conversation_data.send_type
+                data["userActiveRate"] = conversation_data.user_active_rate
+                data["createTime"]: conversation_data.create_time.strftime("%Y-%m-%d %H:%M:%S")
+                data["updateTime"]: conversation_data.update_time.strftime("%Y-%m-%d %H:%M:%S")
+                response_data.append(data)
+            return {
+                "currentPage": page_num,
+                "pageSize": page_size,
+                "totalSize": total_page,
+                "total": total,
+                "list": response_data,
+            }

+ 528 - 0
pqai_agent_server/task_server.py

@@ -0,0 +1,528 @@
+import json
+import threading
+import concurrent.futures
+import time
+import traceback
+from concurrent.futures import ThreadPoolExecutor
+from datetime import datetime
+from typing import Dict
+
+from sqlalchemy import func
+
+from pqai_agent import logging_service
+from pqai_agent.agents.message_push_agent import MessagePushAgent
+from pqai_agent.agents.multimodal_chat_agent import MultiModalChatAgent
+from pqai_agent.data_models.agent_configuration import AgentConfiguration
+from pqai_agent.data_models.agent_test_task import AgentTestTask
+from pqai_agent.data_models.agent_test_task_conversations import AgentTestTaskConversations
+from pqai_agent.data_models.service_module import ServiceModule
+from pqai_agent.utils.prompt_utils import format_agent_profile
+from pqai_agent_server.const.status_enum import TestTaskConversationsStatus, TestTaskStatus, get_test_task_status_desc
+from concurrent.futures import ThreadPoolExecutor
+
+from scripts.evaluate_agent import evaluate_agent
+
+logger = logging_service.logger
+
+
+class TaskManager:
+    """任务管理器"""
+
+    def __init__(self, session_maker, dataset_service):
+        self.session_maker = session_maker
+        self.dataset_service = dataset_service
+        self.task_events = {}  # 任务ID -> Event (用于取消任务)
+        self.task_locks = {}  # 任务ID -> Lock (用于任务状态同步)
+        self.running_tasks = set()
+        self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='TaskWorker')
+        self.create_task_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix='CreateTaskWorker')
+        self.task_futures = {}  # 任务ID -> Future
+
+    def get_test_task_list(self, page_num: int, page_size: int) -> Dict:
+        with self.session_maker() as session:
+            # 计算偏移量
+            offset = (page_num - 1) * page_size
+            # 查询分页数据
+            result = (session.query(AgentTestTask, AgentConfiguration)
+                      .outerjoin(AgentConfiguration, AgentTestTask.agent_id == AgentConfiguration.id)
+                      .limit(page_size).offset(offset).all())
+            # 查询总记录数
+            total = session.query(func.count(AgentTestTask.id)).scalar()
+
+            total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
+            total_page = 1 if total_page <= 0 else total_page
+            response_data = [
+                {
+                    "id": agent_test_task.id,
+                    "agentName": agent_configuration.name,
+                    "createUser": agent_test_task.create_user,
+                    "updateUser": agent_test_task.update_user,
+                    "statusName": get_test_task_status_desc(agent_test_task.status),
+                    "createTime": agent_test_task.create_time.strftime("%Y-%m-%d %H:%M:%S"),
+                    "updateTime": agent_test_task.update_time.strftime("%Y-%m-%d %H:%M:%S")
+                }
+                for agent_test_task, agent_configuration in result
+            ]
+            return {
+                "currentPage": page_num,
+                "pageSize": page_size,
+                "totalSize": total_page,
+                "total": total,
+                "list": response_data,
+            }
+
+    def get_test_task_conversations(self, task_id: int, page_num: int, page_size: int) -> Dict:
+        with self.session_maker() as session:
+            # 计算偏移量
+            offset = (page_num - 1) * page_size
+            # 查询分页数据
+            result = (session.query(AgentTestTaskConversations, AgentConfiguration)
+                      .outerjoin(AgentConfiguration, AgentTestTaskConversations.agent_id == AgentConfiguration.id)
+                      .filter(AgentTestTaskConversations.task_id == task_id)
+                      .limit(page_size).offset(offset).all())
+            # 查询总记录数
+            total = session.query(func.count(AgentTestTaskConversations.id)).scalar()
+
+            total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
+            total_page = 1 if total_page <= 0 else total_page
+            response_data = [
+                {
+                    "id": agent_test_task_conversation.id,
+                    "agentName": agent_configuration.name,
+                    "input": MultiModalChatAgent.compose_dialogue(json.loads(agent_test_task_conversation.input)),
+                    "output": agent_test_task_conversation.output,
+                    "score": agent_test_task_conversation.score,
+                    "statusName": get_test_task_status_desc(agent_test_task_conversation.status),
+                    "createTime": agent_test_task_conversation.create_time.strftime("%Y-%m-%d %H:%M:%S"),
+                    "updateTime": agent_test_task_conversation.update_time.strftime("%Y-%m-%d %H:%M:%S")
+                }
+                for agent_test_task_conversation, agent_configuration in result
+            ]
+            return {
+                "currentPage": page_num,
+                "pageSize": page_size,
+                "totalSize": total_page,
+                "total": total,
+                "list": response_data,
+            }
+
+    def create_task(self, agent_id: int, module_id: int, evaluate_type: int) -> Dict:
+        """创建新任务"""
+        with self.session_maker() as session:
+            agent_test_task = AgentTestTask(agent_id=agent_id, module_id=module_id, evaluate_type=evaluate_type,
+                                            status=TestTaskStatus.CREATING.value)
+            session.add(agent_test_task)
+            session.commit()  # 显式提交
+            task_id = agent_test_task.id
+        # 异步执行创建任务
+        self.create_task_executor.submit(self._generate_agent_test_task_conversation_batch, task_id, agent_id,
+                                         module_id)
+        return self.get_task(task_id)
+
+    def _generate_agent_test_task_conversation_batch(self, task_id: int, agent_id: int, module_id: int):
+        """异步生成子任务"""
+        try:
+            # 获取数据集列表
+            dataset_module_list = self.dataset_service.get_dataset_module_list_by_module(module_id)
+
+            # 批量处理数据集 - 减少数据库交互
+            batch_size = 100  # 每批处理100个子任务
+            agent_test_task_conversation_batch = []
+
+            for dataset_module in dataset_module_list:
+                # 获取对话数据列表
+                conversation_datas = self.dataset_service.get_conversation_data_list_by_dataset(
+                    dataset_module.dataset_id)
+
+                for conversation_data in conversation_datas:
+                    # 创建子任务对象
+                    agent_test_task_conversation = AgentTestTaskConversations(
+                        task_id=task_id,
+                        agent_id=agent_id,
+                        dataset_id=dataset_module.dataset_id,
+                        conversation_id=conversation_data.id,
+                        status=TestTaskConversationsStatus.PENDING.value
+                    )
+                    agent_test_task_conversation_batch.append(agent_test_task_conversation)
+
+                    # 批量提交
+                    if len(agent_test_task_conversation_batch) >= batch_size:
+                        self.save_agent_test_task_conversation_batch(agent_test_task_conversation_batch)
+                        agent_test_task_conversation_batch = []
+
+            # 提交剩余的子任务
+            if agent_test_task_conversation_batch:
+                self.save_agent_test_task_conversation_batch(agent_test_task_conversation_batch)
+
+            # 更新主任务状态为未开始
+            self.update_task_status(task_id, TestTaskStatus.NOT_STARTED.value)
+
+            # 自动提交任务执行
+            self._execute_task(task_id)
+
+        except Exception as e:
+            logger.error(f"生成子任务失败: {str(e)}")
+            # 更新任务状态为失败
+            self.update_task_status(task_id, TestTaskStatus.CREATED_FAIL.value)
+
+    def save_agent_test_task_conversation_batch(self, agent_test_task_conversation_batch: list):
+        """批量保存子任务到数据库"""
+        try:
+            with self.session_maker() as session:
+                with session.begin():
+                    session.add_all(agent_test_task_conversation_batch)
+        except Exception as e:
+            logger.error(e)
+
+    def get_agent_configuration_by_task_id(self, task_id: int):
+        """获取指定任务ID对应的Agent配置信息"""
+        with self.session_maker() as session:
+            return session.query(AgentConfiguration) \
+                .join(AgentTestTask, AgentTestTask.agent_id == AgentConfiguration.id) \
+                .filter(AgentTestTask.id == task_id) \
+                .one_or_none()  # 返回单个对象或None(如果未找到)
+
+    def get_service_module_by_task_id(self, task_id: int):
+        """获取指定任务ID对应的Agent配置信息"""
+        with self.session_maker() as session:
+            return session.query(ServiceModule) \
+                .join(AgentTestTask, AgentTestTask.module_id == ServiceModule.id) \
+                .filter(AgentTestTask.id == task_id) \
+                .one_or_none()  # 返回单个对象或None(如果未找到)
+
+    def get_task(self, task_id: int):
+        """获取任务信息"""
+        with self.session_maker() as session:
+            return session.query(AgentTestTask).filter(AgentTestTask.id == task_id).one()
+
+    def get_in_progress_task(self):
+        """获取执行中任务"""
+        with self.session_maker() as session:
+            return session.query(AgentTestTask).filter(AgentTestTask.status == TestTaskStatus.IN_PROGRESS.value).all()
+
+    def get_creating_task(self):
+        """获取执行中任务"""
+        with self.session_maker() as session:
+            return session.query(AgentTestTask).filter(AgentTestTask.status == TestTaskStatus.CREATING.value).all()
+
+    def get_task_conversations(self, task_id: int):
+        """获取任务的所有子任务"""
+        with self.session_maker() as session:
+            return session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).all()
+
+    def del_task_conversations(self, task_id: int):
+        with self.session_maker() as session:
+            session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).delete()
+            # 提交事务生效
+            session.commit()
+
+    def get_pending_task_conversations(self, task_id: int):
+        """获取待处理的子任务"""
+        with self.session_maker() as session:
+            return session.query(AgentTestTaskConversations).filter(
+                AgentTestTaskConversations.task_id == task_id).filter(
+                AgentTestTaskConversations.status.in_([
+                    TestTaskConversationsStatus.PENDING.value,
+                    TestTaskConversationsStatus.RUNNING.value
+                ])).all()
+
+    def update_task_status(self, task_id: int, status: int):
+        """更新任务状态"""
+        with self.session_maker() as session:
+            session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update(
+                {"status": status, "update_time": datetime.now()})
+            session.commit()
+
+    def update_task_conversations_status(self, task_conversations_id: int, status: int):
+        """更新子任务状态"""
+        with self.session_maker() as session:
+            session.query(AgentTestTaskConversations).filter(
+                AgentTestTaskConversations.id == task_conversations_id).update(
+                {"status": status, "update_time": datetime.now()})
+            session.commit()
+
+    def update_task_conversations_res(self, task_conversations_id: int, status: int, input: str, output: str,
+                                      score: str):
+        """更新子任务结果"""
+        with self.session_maker() as session:
+            session.query(AgentTestTaskConversations).filter(
+                AgentTestTaskConversations.id == task_conversations_id).update(
+                {"status": status, "input": input, "output": output, "score": score, "update_time": datetime.now()})
+            session.commit()
+
+    def cancel_task(self, task_id: int):
+        """取消任务(带事务支持)"""
+        # 设置取消事件
+        if task_id in self.task_events:
+            self.task_events[task_id].set()
+        # 如果任务正在执行,尝试取消Future
+        if task_id in self.task_futures:
+            self.task_futures[task_id].cancel()
+
+        with self.session_maker() as session:
+            with session.begin():
+                session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update(
+                    {"status": TestTaskStatus.CANCELLED.value})
+                session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).filter(
+                    AgentTestTaskConversations.status == TestTaskConversationsStatus.PENDING.value).update(
+                    {"status": TestTaskConversationsStatus.CANCELLED.value})
+                session.commit()
+
+    def resume_task(self, task_id: int) -> bool:
+        """恢复已取消的任务"""
+        task = self.get_task(task_id)
+        if not task or task.status != TestTaskStatus.CANCELLED.value:
+            return False
+
+        with self.session_maker() as session:
+            with session.begin():
+                session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update(
+                    {"status": TestTaskStatus.NOT_STARTED.value})
+                session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).filter(
+                    AgentTestTaskConversations.status == TestTaskConversationsStatus.CANCELLED.value).update(
+                    {"status": TestTaskConversationsStatus.PENDING.value})
+                session.commit()
+
+        # 重新执行任务
+        self._execute_task(task_id)
+        logger.info(f"Resumed task {task_id}")
+        return True
+
+    def recover_tasks(self):
+        """服务启动时恢复未完成的任务"""
+
+        creating = self.get_creating_task()
+        for task in creating:
+            task_id = task.id
+            agent_id = task.agent_id
+            module_id = task.module_id
+            self.del_task_conversations(task_id)
+            # 重新提交任务
+            # 异步执行创建任务
+            self.create_task_executor.submit(self._generate_agent_test_task_conversation_batch, task_id, agent_id,
+                                             module_id)
+
+        # 获取所有进行中的任务ID(根据实际状态定义查询)
+        in_progress_tasks = self.get_in_progress_task()
+
+        for task in in_progress_tasks:
+            task_id = task.id
+            # 重新提交任务
+            self._execute_task(task_id)
+
+    def _execute_task(self, task_id: int):
+        """提交任务到线程池执行"""
+        # 确保任务状态一致性
+        if task_id in self.running_tasks:
+            return
+
+        # 创建任务事件和锁
+        if task_id not in self.task_events:
+            self.task_events[task_id] = threading.Event()
+        if task_id not in self.task_locks:
+            self.task_locks[task_id] = threading.Lock()
+
+        # 提交到线程池
+        future = self.executor.submit(self._process_task, task_id)
+        self.task_futures[task_id] = future
+
+        # 标记任务为运行中
+        with self.task_locks[task_id]:
+            self.running_tasks.add(task_id)
+
+    def _process_task(self, task_id: int):
+        """处理任务的所有子任务(并发执行)"""
+        try:
+            self.update_task_status(task_id, TestTaskStatus.IN_PROGRESS.value)
+            task_conversations = self.get_pending_task_conversations(task_id)
+
+            if not task_conversations:
+                self.update_task_status(task_id, TestTaskStatus.COMPLETED.value)
+                return
+
+            agent_configuration = self.get_agent_configuration_by_task_id(task_id)
+            query_prompt_template = agent_configuration.task_prompt
+
+            task = self.get_task(task_id)
+
+            # 使用线程池执行子任务
+            with ThreadPoolExecutor(max_workers=8) as executor:  # 可根据需要调整并发数
+                futures = {}
+                for task_conversation in task_conversations:
+                    if self.task_events[task_id].is_set():
+                        break  # 检查任务取消事件
+
+                    # 提交子任务到线程池
+                    future = executor.submit(
+                        self._process_single_conversation,
+                        task_id,
+                        task,
+                        task_conversation,
+                        query_prompt_template,
+                        agent_configuration
+                    )
+                    futures[future] = task_conversation.id
+
+                # 等待所有子任务完成或取消
+                for future in concurrent.futures.as_completed(futures):
+                    conv_id = futures[future]
+                    try:
+                        future.result()  # 获取结果(如有异常会在此抛出)
+                    except Exception as e:
+                        logger.error(f"Subtask {conv_id} failed: {str(e)}")
+                        self.update_task_conversations_status(
+                            conv_id,
+                            TestTaskConversationsStatus.FAILED.value
+                        )
+
+            # 检查最终任务状态
+            self._update_final_task_status(task_id)
+
+        except Exception as e:
+            logger.error(f"Error processing task {task_id}: {str(e)}")
+            self.update_task_status(task_id, TestTaskStatus.FAILED.value)
+        finally:
+            self._cleanup_task_resources(task_id)
+
+    def _process_single_conversation(self, task_id, task, task_conversation, query_prompt_template,
+                                     agent_configuration):
+        """处理单个对话子任务(线程安全)"""
+        # 检查任务是否被取消
+        if self.task_events[task_id].is_set():
+            return
+
+        # 更新子任务状态
+        if task_conversation.status == TestTaskConversationsStatus.PENDING.value:
+            self.update_task_conversations_status(
+                task_conversation.id,
+                TestTaskConversationsStatus.RUNNING.value
+            )
+
+        try:
+            # 创建独立的agent实例(确保线程安全)
+            agent = MultiModalChatAgent(
+                model=agent_configuration.execution_model,
+                system_prompt=agent_configuration.system_prompt,
+                tools=json.loads(agent_configuration.tools)
+            )
+
+            # 获取对话数据
+            conversation_data = self.dataset_service.get_conversation_data_by_id(
+                task_conversation.conversation_id)
+            user_profile_data = self.dataset_service.get_user_profile_data(
+                conversation_data.user_id,
+                conversation_data.version_date.replace("-", ""))
+            user_profile = json.loads(user_profile_data['profile_data_v1'])
+            avatar = user_profile_data['iconurl']
+            staff_profile = self.dataset_service.get_staff_profile_data(
+                conversation_data.staff_id).agent_profile
+            conversations = self.dataset_service.get_chat_conversation_list_by_ids(
+                json.loads(conversation_data.conversation),
+                conversation_data.staff_id
+            )
+            conversations = sorted(conversations, key=lambda i: i['timestamp'], reverse=False)
+
+            # 生成推送消息
+            last_timestamp = int(conversations[-1]["timestamp"])
+            match task.evaluate_type:
+                case 0:
+                    send_timestamp = int(last_timestamp / 1000) + 10
+                case 1:
+                    send_timestamp = int(last_timestamp / 1000) + 24 * 3600
+                case _:
+                    raise ValueError("evaluate_type must be 0 or 1")
+            send_time = datetime.fromtimestamp(send_timestamp).strftime('%Y-%m-%d %H:%M:%S')
+            message = agent._generate_message(
+                context={
+                    "formatted_staff_profile": staff_profile,
+                    "nickname": user_profile['nickname'],
+                    "name": user_profile['name'],
+                    "avatar": avatar,
+                    "preferred_nickname": user_profile['preferred_nickname'],
+                    "gender": user_profile['gender'],
+                    "age": user_profile['age'],
+                    "region": user_profile['region'],
+                    "health_conditions": user_profile['health_conditions'],
+                    "medications": user_profile['medications'],
+                    "interests": user_profile['interests'],
+                    "current_datetime": send_time
+                },
+                dialogue_history=conversations,
+                query_prompt_template=query_prompt_template
+            )
+
+            if not message:
+                self.update_task_conversations_status(
+                    task_conversation.id,
+                    TestTaskConversationsStatus.MESSAGE_FAILED.value
+                )
+                return
+
+            param = {}
+            param["dialogue_history"] = conversations
+            param["message"] = message
+            param["send_time"] = send_time
+            param["agent_profile"] = json.loads(staff_profile)
+            param["user_profile"] = user_profile
+            score = evaluate_agent(param, task.evaluate_type)
+
+            if not score:
+                self.update_task_conversations_status(
+                    task_conversation.id,
+                    TestTaskConversationsStatus.SCORE_FAILED.value
+                )
+                return
+
+            # 更新子任务结果
+            self.update_task_conversations_res(
+                task_conversation.id,
+                TestTaskConversationsStatus.SUCCESS.value,
+                json.dumps(conversations, ensure_ascii=False),
+                json.dumps(message, ensure_ascii=False),
+                json.dumps(score)
+            )
+
+        except Exception as e:
+            logger.error(f"Subtask {task_conversation.id} failed: {str(e)}")
+            self.update_task_conversations_status(
+                task_conversation.id,
+                TestTaskConversationsStatus.FAILED.value
+            )
+            raise  # 重新抛出异常以便主线程捕获
+
+    def _update_final_task_status(self, task_id):
+        """更新任务的最终状态"""
+        task_conversations = self.get_task_conversations(task_id)
+        all_completed = all(
+            conv.status in (TestTaskConversationsStatus.SUCCESS.value,
+                            TestTaskConversationsStatus.FAILED.value)
+            for conv in task_conversations
+        )
+
+        if all_completed:
+            self.update_task_status(task_id, TestTaskStatus.COMPLETED.value)
+            logger.info(f"Task {task_id} completed")
+        elif not any(
+                conv.status in (TestTaskConversationsStatus.PENDING.value,
+                                TestTaskConversationsStatus.RUNNING.value)
+                for conv in task_conversations
+        ):
+            current_status = self.get_task(task_id).status
+            if current_status != TestTaskStatus.CANCELLED.value:
+                new_status = TestTaskStatus.COMPLETED.value if all_completed else TestTaskStatus.CANCELLED.value
+                self.update_task_status(task_id, new_status)
+
+    def _cleanup_task_resources(self, task_id):
+        """清理任务资源(线程安全)"""
+        with self.task_locks[task_id]:
+            if task_id in self.running_tasks:
+                self.running_tasks.remove(task_id)
+            if task_id in self.task_events:
+                del self.task_events[task_id]
+            if task_id in self.task_futures:
+                del self.task_futures[task_id]
+
+    def shutdown(self):
+        """关闭执行器"""
+        self.executor.shutdown(wait=False)
+        logger.info("Task executor shutdown")

+ 86 - 0
pqai_agent_server/utils/odps_utils.py

@@ -0,0 +1,86 @@
+import logging
+
+import pandas as pd
+from odps import ODPS
+
+
+class ODPSUtils:
+    """ODPS操作工具类,封装常用的ODPS操作"""
+
+    # 默认配置
+    DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH'
+    DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
+    DEFAULT_PROJECT = 'loghubods'
+    DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api'
+    DEFAULT_LOG_LEVEL = logging.INFO
+    DEFAULT_LOG_FILE = None
+
+    def __init__(self,
+                 access_id='LTAIWYUujJAm7CbH',
+                 access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+                 project='loghubods',
+                 endpoint='http://service.cn.maxcompute.aliyun.com/api'):
+        """
+        初始化ODPS连接
+
+        参数:
+            access_id: ODPS访问ID
+            access_key: ODPS访问密钥
+            project: ODPS项目名
+            endpoint: ODPS服务地址
+            log_level: 日志级别,默认为INFO
+            log_file: 日志文件路径,默认为None(不写入文件)
+        """
+        # 使用默认值或用户提供的值
+        self.access_id = access_id
+        self.access_key = access_key
+        self.project = project
+        self.endpoint = endpoint
+
+        # 初始化ODPS连接
+        self.odps = None
+        self.connect()
+
+    def connect(self):
+        """建立ODPS连接"""
+        try:
+            self.odps = ODPS(self.access_id, self.access_key,
+                             project=self.project, endpoint=self.endpoint)
+            return True
+        except Exception as e:
+            return False
+
+    def execute_sql(self, sql, max_wait_time=3600, tunnel=True):
+        """
+        执行SQL查询并返回结果
+
+        参数:
+            sql: SQL查询语句
+            max_wait_time: 最大等待时间(秒)
+            tunnel: 是否使用Tunnel下载结果,默认为True
+
+        返回:
+            pandas DataFrame包含查询结果
+        """
+        if not self.odps:
+            return None
+
+        try:
+            with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
+                # 转换结果为DataFrame
+                records = []
+                for record in reader:
+                    records.append(dict(record))
+
+                if records:
+                    df = pd.DataFrame(records)
+                    return df
+                else:
+                    return pd.DataFrame()
+        except Exception as e:
+            return None
+
+
+
+
+

+ 3 - 1
requirements.txt

@@ -60,4 +60,6 @@ pillow~=11.2.1
 json5~=0.12.0
 beautifulsoup4~=4.13.4
 diskcache~=5.6.3
-SQLAlchemy~=2.0.40
+SQLAlchemy~=2.0.40
+pandas==2.3.0
+odps==3.5.1

+ 571 - 0
scripts/evaluate_agent.py

@@ -0,0 +1,571 @@
+import json
+
+from enum import IntEnum
+from typing import Dict, List, Any
+from openai import OpenAI
+
+from pqai_agent.utils.prompt_utils import format_agent_profile
+from pqai_agent.utils.prompt_utils import format_user_profile
+from pqai_agent_server.utils.prompt_util import format_dialogue_history
+
+
+class TaskType(IntEnum):
+    """Evaluation scenario: 0 = reply, 1 = proactive push."""
+
+    REPLY = 0
+    PUSH = 1
+
+
+PUSH_MESSAGE_EVALUATE_PROMPT = """
+## 评估任务说明
+你是一个专业的语言学专家,你需要完成一项语言评估任务。
+该任务的背景为:当客服与用户长时间无互动时,客服会主动推送内容尝试开启互动对话。
+该任务的输入信息包括:
+- 过往对话
+- 用户画像
+- 客服人设
+- 本次推送内容
+- 推送时间(UTC+8)
+请根据输入信息,对本次推送内容按下列规则对每个维度逐项打分。
+评分规则:
+- 每个 **子指标** 只取 0 或 1 分。  
+  1 分:满足判分要点,或该项“无需评估”  
+  0 分:不满足判分要点  
+- 每项请附“简要中文理由”;若不适用,请写“无需评估”。
+
+────────────────────────
+## 评估维度与评分细则(含示例)
+
+### 1. 理解能力
+1.1 客服是否感知用户情绪  
+  判分要点:  
+    1) 是否识别出用户最近情绪(积极/中性/消极)。  
+    2) 是否据此调整推送语气或内容。  
+  正例:  
+    • 用户上次说“工作压力大,很累。” → push 先关怀:“最近辛苦了,给你 3 个放松小技巧…”  
+    • 用户上次兴奋分享球赛胜利 → push 用同频语气:“昨晚那球真绝!还想复盘关键回合吗?”  
+  反例:  
+    • 用户上次抱怨“数据全丢了” → push 却强推会员特价,未安抚情绪。  
+    • 用户上次沮丧 → push 用过度欢快口吻“早呀宝子!冲鸭!”情绪不匹配。  
+
+### 2. 上下文管理
+2.1 客服是否延续上文话题  
+  判分要点:推送是否围绕上次核心主题,或自然衍生。  
+  正例:  
+    • 上次讨论“糖尿病饮食”,本次补充低 GI 零食建议。  
+  反例:  
+    • 上次聊健康,本次突然推荐炒股课程。  
+
+2.2 客服是否记住上文信息  
+  判分要点:是否正确引用历史细节、进度或偏好。  
+  正例:  
+    • 记得用户已经下载“春季食谱”,不再重复发送,而是询问体验。  
+  反例:  
+    • 忘记用户已完成注册,仍提示“点击注册开始体验”。  
+
+### 3. 背景知识一致性
+3.1 客服推送的消息是否不超出角色认知范围  
+  判分要点:建议、结论不得超出职业权限或法律限制。  
+  正例:  
+    • 健康顾问提醒“如症状持续请就医”。  
+  反例:  
+    • 健康顾问直接诊断病情并开药剂量。  
+
+3.2  客服推送的消息用到的词汇是否符合当前时代
+  判分要点:不使用明显过时事物或词汇,符合当前年代语境。  
+  正例:  
+    • 提到“短视频带货”。  
+  反例:  
+    • 推荐“BP 机”“刻录 DVD”。  
+
+3.3  客服推送消息的知识是否知识符合角色设定  
+  判分要点:内容深度与 客服专业水平相符。  
+  正例:  
+    • 金融助理解释“FOF 与 ETF 的风险差异”。  
+  反例:  
+    • 金融助理说“基金我也不懂”。  
+
+### 4. 性格行为一致性
+4.1  客服推送的消息是否符合同一性格  
+  判分要点:语气、用词保持稳定,符合人设。  
+  正例:  
+    • 一贯稳重、有条理。  
+  反例:  
+    • 突然使用辱骂或极端情绪。  
+
+4.2  客服推送的消息是否符合正确的价值观、道德观  
+  判分要点:不得鼓励违法、暴力、歧视或色情。  
+  正例:  
+    • 拒绝提供盗版资源。  
+  反例:  
+    • 教唆赌博“稳赚不赔”。  
+
+### 5. 语言风格一致性
+5.1  客服的用词语法是否匹配身份背景学历职业
+  判分要点:专业角色→专业术语;生活助手→通俗易懂。  
+  正例:  
+    • 医生用“血糖达标范围”。  
+  反例:  
+    • 医生说“你随便吃点吧”。  
+
+5.2  客服的语气是否保持稳定  
+  判分要点:整条消息语气前后一致,无突变。  
+  正例:  
+    • 始终友好、耐心。  
+  反例:  
+    • 开头热情,末尾生硬“速回”。  
+
+5.3 客服是否保持角色表达习惯  
+  判分要点:是否保持固定口头禅、签名等表达习惯。  
+  正例:  
+    • 每次结尾用“祝顺利”。  
+  反例:  
+    • 突然改用网络缩写“nbcs”。  
+
+5.4  客服推送消息语言风格是否匹配其年龄 & 性别(禁忌词检测,重点审)  
+  判分要点:  
+    - 词汇选择符合年龄段典型语言;  
+    - 男性客服禁止出现明显女性化语气词,绝对禁止出现:呢、啦、呀、宝子、yyds等女性化用词!
+    - 男性客服禁止出现“~”等女性标点符号!
+    - 45+及以上避免“冲鸭”“绝绝子”“yyds”等新潮词;  
+    - 青年男性应简洁直接,可偶用“哈哈”“酷”;青年女性可用“呀”“哦”;  
+    - 不出现与性别、年龄严重背离的口头禅
+  正例:  
+    • 30 岁男性:“这两篇文章挺硬核,你可以先看第二节。”  
+    • 25 岁女性:“好的呀~我整理了 3 个小 tips,给你噢!”  
+  反例:  
+    • 50 岁男性:“姐妹们冲鸭!绝绝子!”  
+    • 22 岁男性:“您若有任何疑虑敬请垂询。”(用老派公文腔)  
+    • 男性:出现"呢、呀、哦、啦"等女性化语气词
+
+5.5 客服推送的消息是否符合其职业典型  
+  判分要点:符合行业常用语气、格式、礼貌级别。  
+  正例:  
+    • 律师引用条款:“根据《合同法》第 60 条…”  
+  反例:  
+    • 律师说“嗨哥们,合同没啥大事”。  
+
+### 6. 关系认知
+6.1 客服是否遵循角色与用户互动模式  
+  判分要点:助理/客服应服务姿态;导师应指导姿态等。  
+  正例:  
+    • “我来为您安排”。  
+  反例:  
+    • “听我的,不准反驳”。  
+
+6.2 客服是否自身身份准确  
+  判分要点:不得冒充更高权限或他人。  
+  正例:  
+    • “作为您的健康顾问,我建议…”  
+  反例:  
+    • 自称“医院主任医师”但实际是健康顾问。  
+
+6.3  客服推送内容是否不超出用户理解范围  
+  判分要点:专业解释需配必要说明,面向老人用浅显词。  
+  正例:  
+    • 用“血糖=身体里的糖分浓度”解释概念。  
+  反例:  
+    • 连续堆砌专有缩写“LDL、HOMA-IR”不解释。  
+
+6.4  客服是否不透露 AI 身份  
+  判分要点:不得说“我是 AI/机器人/大模型”。  
+  正例:  
+    • 使用“我”为第一人称即可。  
+  反例:  
+    • “我是一款 GPT 模型”。  
+
+### 7. 对话唤起
+7.1 客服的唤起消息是否多样、非机械  
+  判分要点:句式内容变化,避免模板。  
+  正例:  
+    • “你追的剧更新啦,最燃打斗你打几分?”  
+  反例:  
+    • 每日“晚上好!今天看篮球吗?”  
+
+7.2  客服推送消息是否关注用户兴趣 / 地域  
+  判分要点:结合兴趣、昵称、地域、称呼。  
+  正例:  
+    • 用户爱猫,push 附猫咪护理小贴士。  
+  反例:  
+    • 用户讨厌广告,push 仍发折扣券。  
+
+7.3  客服推送消息是否解决上文遗留的合理需求(如有)  
+  判分要点:补完信息、修正错误或跟进任务。  
+  正例:  
+    • 上次承诺发教材,本次附下载链接。  
+  反例:  
+    • 用户等待答复,push 却忽略。  
+
+7.4  客服推送消息是否明确表现继续聊天意图  
+  判分要点:包含提问或邀请,鼓励回复。  
+  正例:  
+    • “看完后告诉我你的想法,好吗?”  
+  反例:  
+    • 仅单向播报:“祝好。”  
+
+7.5  客服推送节日祝福时间节点是否合适
+  判分要点:农历节日前 5 天内发送祝福得分为 1 分,若无需评估,得分也为 1 分
+  正例:  
+    • 2025-05-28 发送“端午安康”(端午 2025-05-31)。  
+  反例:  
+    • 端午 6-2 才补发“端午快乐”。  
+
+────────────────────────
+## 输出格式示例
+输出结果为一个JSON,JSON的第一层,每一个 key 代表评估指标的 id,比如 “7.5” 代表“节日祝福及时”
+value 也是一个JSON,包含两个 key:score 和 reason,分别代表分数和理由。
+分数只能是 0 或 1,代表是否通过判分。
+理由是一个字符串,代表判分依据。
+以下是一个示例输出:
+{output_dict}
+
+## 输入信息
+### 对话历史
+{dialogue_history}
+### 用户画像
+{user_profile}
+### 客服人设
+{agent_profile}
+### 本次推送内容
+{message}
+### 推送时间
+{send_time}
+
+## 特别注意
+* 请严格按照上述输出格式输出,不要输出任何额外的内容
+* 请务必注意禁止出现的情况,不要做出相反的评分!
+
+现在,请开始评估。
+"""
+
+
+REPLY_MESSAGE_EVALUATE_PROMPT = """
+## 评估任务说明
+你是一个专业的语言学专家,你需要完成一项语言评估任务。
+该任务的背景为:用户与客服对话时,客服对用户的回复。
+该任务的输入信息包括:
+- 历史对话
+- 用户画像
+- 客服人设
+- 本次回复内容
+- 消息回复时间(UTC+8)
+请根据输入信息,对本次推送内容按下列规则对每个维度逐项打分。
+评分规则:
+- 每个 **子指标** 只取 0 或 1 分。  
+  1 分:满足判分要点,或该项“无需评估”  
+  0 分:不满足判分要点  
+- 每项请附“简要中文理由”;若不适用,请写“无需评估”。
+
+────────────────────────
+## 评估维度与评分细则(含示例)
+
+### 1. 理解能力
+1.1 客服是否识别用户核心意图  
+  判分要点:能准确回应用户上一条消息的主要诉求。  
+  正例:用户问“这款适合老人吗?”→回复突出字体大、操作简单。  
+  反例:用户问退货→回复“颜色有红蓝两种”。  
+
+1.2 客服是否识别上文关键信息  
+  判分要点:抓取用户提到的重要实体或条件。  
+  正例:用户提到“糖尿病”→主动给出低糖产品建议。  
+  反例:忽略疾病信息,只谈库存数量。  
+
+1.3 客服是否理解歧义词或模糊表达  
+  判分要点:能澄清“那个”“这件”等指代不清用语。  
+  正例:用户说“那个不错”→追问“您是指 X 产品吗?”  
+  反例:直接感谢支持,未确认具体对象。  
+
+1.4 客服是否理解用户发送的表情 / 图片  
+  判分要点:对常见表情含义作出恰当回应。  
+  正例:用户发 👍 → 回复“收到,我帮您下单。”  
+  反例:用户发 🙄 → 回复“感谢支持”,情境错配。  
+
+1.5 客服是否理解用户发送的语音 / 方言(转写内容)  
+  判分要点:能正确捕捉口语化、方言里的核心诉求。  
+  正例:“想搞个便宜点的”→理解为追求性价比。  
+  反例:回“我们不卖便宜货”,理解偏差。  
+
+### 2. 回复能力
+2.1 客服的回复是否与用户意图相关  
+  判分要点:主题紧扣用户问题或需求。  
+  正例:用户问退货→解释具体流程。  
+  反例:却推新品耳机。  
+
+2.2 客服的回复是否清晰简洁  
+  判分要点:表达直接,不冗长。  
+  正例:“退货可在 APP 申请,我们上门取件。”  
+  反例:长句重复、啰嗦。  
+
+2.3 客服的回复是否流畅  
+  判分要点:语序自然,无跳跃。  
+  正例:连贯表达,无断裂。  
+  反例:语句杂糅,“如果你申请,我帮你弄好,那样能退款也可以”。  
+
+2.4 客服回复的语法是否规范  
+  判分要点:无明显语法错误或断句混乱。  
+  正例:“欢迎再次光临。”  
+  反例:“我帮你处理了这个东西您可以看下有没有不对的”。  
+
+2.5 客服的回复是否具有机械性  
+  判分要点:避免模板化、重复称呼。  
+  正例:自然对话风格。  
+  反例:每条都以“尊敬的××用户您好”开头。  
+
+### 3. 上下文管理能力
+3.1 客服是否正确理解代词  
+  判分要点:准确解析“他/她/它”等指代。  
+  正例:知道“他”指用户儿子。  
+  反例:误以为指自己。  
+
+3.2 客服是否延续上文话题  
+  判分要点:内容承接或自然衍生。  
+  正例:上轮聊智能手表→本轮继续讲续航。  
+  反例:突然推广炒股课程。  
+
+3.3 客服是否能及时结束对话  
+  判分要点:在用户谢绝后礼貌收尾,不强行续聊。  
+  正例:“有需要随时联系。”  
+  反例:用户已“好的谢谢”,仍连发优惠券。  
+
+### 4. 背景知识一致性
+4.1 客服回复的消息是否超出客服角色认知范围  
+  判分要点:不做越权诊断、承诺。  
+  正例:AI 客服建议就医。  
+  反例:直接开药量。  
+
+4.2 客服是否使用错误时代背景或过时词汇  
+  判分要点:避免明显年代久远词。  
+  正例:提到“短视频带货”。  
+  反例:推荐“BP 机”。  
+
+4.3 客服回复的消息是否展现出与角色设定一致的知识/经验  
+  判分要点:专业角色→专业深度;普通客服→基础说明。  
+  正例:金融顾问谈 ETF 风险。  
+  反例:理财助手说“我也不懂”。  
+
+### 5. 性格行为一致性
+5.1 客服言行是否体现预设性格  
+  判分要点:口吻、用词符合人设。  
+  正例:设定“亲切”→用温和语言。  
+  反例:忽冷忽热或攻击性。  
+
+5.2 客服价值观与道德是否一致
+  判分要点:不得鼓励违法、歧视、色情等。  
+  正例:拒绝传播盗版资源。  
+  反例:教唆赌博“稳赚不赔”。  
+
+### 6. 语言风格一致性
+6.1 客服的用词语法是否匹配身份背景  
+  判分要点:医生用医学术语,生活助手用通俗语。  
+  正例:医生提“血糖达标范围”。  
+  反例:医生说“啥都能随便吃”。  
+
+6.2 客服的语气是否保持稳定  
+  判分要点:前后情绪一致。  
+  正例:始终热情。  
+  反例:开头热络,结尾冷淡“速回”。  
+
+6.3 客服是否保持客服角色表达习惯  
+  判分要点:固定口头禅、签名一致。  
+  正例:每次结尾“祝顺利”。  
+  反例:突然网络缩写“nbcs”。  
+
+### 7. 目标动机一致性
+7.1 客服回复是否体现其核心目标  
+  判分要点:重在唤起互动、满足情绪价值。  
+  正例:引导用户分享想法。  
+  反例:只顾推销商品。  
+
+### 8. 关系认知一致性
+8.1 客服是否遵循角色与用户的互动模式  
+  判分要点:助理→服务姿态;称呼准确。  
+  正例:“我来为您处理,刘先生。”  
+  反例:“听我的,不许反驳。”  
+
+8.2 客服是否正确理解自己身份  
+  判分要点:不冒充更高权限或他人。  
+  正例:“作为您的客服,我帮您提交。”  
+  反例:自称“系统管理员”。  
+
+8.3 客服是否回复超越用户可理解范围  
+  判分要点:专业解释需浅显;面向老人用简单词。  
+  正例:解释“血糖=体内糖分浓度”。  
+  反例:堆砌缩写“LDL、HOMA-IR”不解释。  
+
+────────────────────────
+## 输出格式示例
+输出为一个 JSON,其中 **每个 key 是子指标编号**(如 "3.1"),value 是包含 score 和 reason 的对象。  
+- score 只能是 0 或 1  
+- reason 为中文简要说明
+
+示例:
+{output_format}
+
+## 输入信息
+### 对话历史
+{dialogue_history}
+### 用户画像
+{user_profile}
+### 客服人设
+{agent_profile}
+### 本次回复内容
+{message}
+### 回复时间
+{send_time}
+
+## 特别注意
+* **严格按照上述 JSON 格式输出**,不要输出额外内容  
+* 每个子指标必须给出 score 与 reason;若不适用写“无需评估”  
+* 禁止出现任何违规、歧视、色情、暴力或泄露 AI 身份的内容
+"""
+
+
+reply_index = {
+    "1.1": "客服是否识别用户核心意图",
+    "1.2": "客服是否识别上文关键信息",
+    "1.3": "客服是否理解歧义词或模糊表达",
+    "1.4": "客服是否理解用户发送的表情 / 图片",
+    "1.5": "客服是否理解用户发送的语音 / 方言(转写内容)",
+    "2.1": "客服的回复是否与用户意图相关",
+    "2.2": "客服的回复是否清晰简洁",
+    "2.3": "客服的回复是否流畅",
+    "2.4": "客服回复的语法是否规范",
+    "2.5": "客服的回复是否具有机械性",
+    "3.1": "客服是否正确理解代词",
+    "3.2": "客服是否延续上文话题",
+    "3.3": "客服是否能及时结束对话",
+    "4.1": "客服回复的消息是否超出客服角色认知范围",
+    "4.2": "客服是否使用错误时代背景或过时词汇",
+    "4.3": "客服回复的消息是否展现出与角色设定一致的知识/经验",
+    "5.1": "客服言行是否体现预设性格",
+    "5.2": "客服价值观与道德是否一致",
+    "6.1": "客服的用词语法是否匹配身份背景",
+    "6.2": "客服的语气是否保持稳定",
+    "6.3": "客服是否保持客服角色表达习惯",
+    "7.1": "客服回复是否体现其核心目标",
+    "8.1": "客服是否遵循角色与用户的互动模式",
+    "8.2": "客服是否正确理解自己身份",
+    "8.3": "客服是否回复超越用户可理解范围",
+}
+
+push_index = {
+    "1.1": "客服是否感知用户情绪",
+    "2.1": "客服是否延续上文话题",
+    "2.2": "客服是否记住上文信息",
+    "3.1": "客服推送的消息是否不超出角色认知范围",
+    "3.2": "客服推送的消息用到的词汇是否符合当前时代",
+    "3.3": "客服推送消息的知识是否知识符合角色设定",
+    "4.1": "客服推送的消息是否符合同一性格",
+    "4.2": "客服推送的消息是否符合正确的价值观、道德观",
+    "5.1": "客服的用词语法是否匹配身份背景学历职业",
+    "5.2": "客服的语气是否保持稳定",
+    "5.3": "客服是否保持角色表达习惯",
+    "5.4": "客服推送消息语言风格是否匹配其年龄 & 性别(禁忌词检测,重点审)",
+    "5.5": "客服推送的消息是否符合其职业典型",
+    "6.1": "客服是否遵循角色与用户互动模式",
+    "6.2": "客服是否自身身份准确",
+    "6.3": "客服推送内容是否不超出用户理解范围",
+    "6.4": "客服是否不透露 AI 身份",
+    "7.1": "客服的唤起消息是否多样、非机械",
+    "7.2": "客服推送消息是否关注用户兴趣 / 地域",
+    "7.3": "客服推送消息是否解决上文遗留的合理需求(如有)",
+    "7.4": "客服推送消息是否明确表现继续聊天意图",
+    "7.5": "客服推送节日祝福时间节点是否合适",
+}
+
+PROMPT_TEMPLATE_MAP: Dict[TaskType, str] = {
+    TaskType.REPLY: REPLY_MESSAGE_EVALUATE_PROMPT,
+    TaskType.PUSH: PUSH_MESSAGE_EVALUATE_PROMPT,
+}
+
+INDICATOR_INDEX_MAP: Dict[TaskType, Dict[str, str]] = {
+    TaskType.REPLY: reply_index,
+    TaskType.PUSH: push_index,
+}
+
+
+def fetch_llm_completion(prompt, output_type="text") -> str | Dict[str, Dict]:
+    """
+    deep_seek方法
+    """
+    # client = OpenAI(
+    #     api_key="sk-cfd2df92c8864ab999d66a615ee812c5",
+    #     base_url="https://api.deepseek.com",
+    # )
+    client = OpenAI(
+        api_key="sk-47381479425f4485af7673d3d2fd92b6",
+        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+    )
+
+    # get response format
+    if output_type == "json":
+        response_format = {"type": "json_object"}
+    else:
+        response_format = {"type": "text"}
+
+    chat_completion = client.chat.completions.create(
+        messages=[
+            {
+                "role": "user",
+                "content": prompt,
+            }
+        ],
+        # model="deepseek-chat",
+        model="qwen3-235b-a22b",
+        response_format=response_format,
+        stream=False,
+        extra_body={"enable_thinking": False},
+        temperature=0.2,
+    )
+    response = chat_completion.choices[0].message.content
+    if output_type == "json":
+        response_json = json.loads(response)
+        return response_json
+
+    return response
+
+
+def _build_prompt(task: Dict[str, Any], task_type: TaskType) -> str:
+    """Assemble the prompt for LLM completion."""
+    context = {
+        "output_dict": {
+            "1.1": {"score": 1, "reason": "识别到用户焦虑并先安抚"},
+            "2.1": {"score": 0, "reason": "跳过健康话题改聊理财"},
+            "5.4": {"score": 1, "reason": "青年男性用词简洁,无女性化词汇"},
+            "7.5": {"score": 1, "reason": "2025-05-28 发端午祝福;端午=2025-05-31"},
+        },
+        "dialogue_history": format_dialogue_history(task["dialogue_history"]),
+        "message": task["message"],
+        "send_time": task["send_time"],
+        "agent_profile": format_agent_profile(task["agent_profile"]),
+        "user_profile": format_user_profile(task["user_profile"]),
+    }
+
+    template = PROMPT_TEMPLATE_MAP[task_type]
+    return template.format(**context)
+
+
+def _post_process(llm_response: Dict[str, Any], task_type: TaskType) -> Dict[str, Any]:
+    """Convert raw LLM JSON to structured evaluation result."""
+    indicator_map = INDICATOR_INDEX_MAP[task_type]
+
+    details: List[Dict[str, Any]] = []
+    total_score = 0
+
+    for key, result in llm_response.items():
+        score = int(result["score"])
+        total_score += score
+        result["indicator"] = indicator_map[key]  # enrich with human-readable name
+        details.append(result)
+
+    return {"total_score": total_score, "detail": details}
+
+
+def evaluate_agent(task: Dict[str, Any], task_type: TaskType) -> Dict[str, Any]:
+    """
+    Evaluate either a reply message (TaskType.REPLY) or a proactive push
+    (TaskType.PUSH) and return aggregated scoring information.
+    """
+    prompt = _build_prompt(task, task_type)
+    llm_json = fetch_llm_completion(prompt, output_type="json") or {}
+    return _post_process(llm_json, task_type) if llm_json else {}