瀏覽代碼

Merge remote-tracking branch 'origin/dev-xym-add-test-task' into dev-xym-add-test-task

luojunhui 1 天之前
父節點
當前提交
dd1f7ec2cc

+ 6 - 3
pqai_agent/agents/message_push_agent.py

@@ -112,7 +112,7 @@ QUERY_PROMPT_TEMPLATE = """现在,请通过多步思考,以客服的角色
 注意分析客服和用户当前的社交阶段,先确立本次问候的目的。
 注意一定要分析对话信息中的时间,避免和当前时间段不符的内容!注意一定要结合历史的对话情况进行分析和问候方式的选择!
 如有必要,可以使用analyse_image分析用户头像。
-使用message_notify_user发送最终的问候内容,调用时不要传入除了问候内容外的其它任何信息
+使用output_multimodal_message发送最终的消息,如果有多条消息需要发送,可以多次调用output_multimodal_message,请务必保证所有内容都通过output_multimodal_message发出
 如果无需发起问候,可直接结束,无需调用message_notify_user。
 注意每次问候只使用一种话术。
 Now, start to process your task. Please think step by step.
@@ -143,6 +143,9 @@ class DummyMessagePushAgent(MessagePushAgent):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
 
-    def generate_message(self, context: Dict, dialogue_history: List[Dict]) -> str:
+    def generate_message(self, context: Dict, dialogue_history: List[Dict],
+                         query_prompt_template: Optional[str] = None) -> List[Dict]:
         logger.debug(f"DummyMessagePushAgent.generate_message called, context: {context}")
-        return "测试消息: {agent_name} -> {nickname}".format(**context)
+        result = [{"type": "text", "content": "测试消息: {agent_name} -> {nickname}".format(**context)},
+                  {"type": "image", "content": "https://example.com/test_image.jpg"}]
+        return result

+ 1 - 0
pqai_agent/data_models/agent_test_task.py

@@ -13,6 +13,7 @@ class AgentTestTask(Base):
     create_user = Column(String(32), nullable=True, comment="创建用户")
     update_user = Column(String(32), nullable=True, comment="更新用户")
     dataset_ids = Column(Text, nullable=True, comment="数据集ids")
+    evaluate_type = Column(Integer, nullable=False, default=0, comment="数据集ids")
     status = Column(Integer, nullable=True, comment="状态(0:未开始, 1:进行中, 2:已完成, 3:已取消)")
     create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
     update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",

+ 1 - 1
pqai_agent/data_models/agent_test_task_conversations.py

@@ -16,7 +16,7 @@ class AgentTestTaskConversations(Base):
     output = Column(Text, nullable=False, comment="输出内容")
     score = Column(Text, nullable=False, comment="得分")
     status = Column(Integer, default=0, nullable=False,
-                    comment="状态(0:待执行, 1:执行中, 2:执行成功, 3:执行失败, 4:已取消)")
+                    comment="状态(0:待执行, 1:执行中, 2:执行成功, 3:执行失败, 4:已取消, 5:消息失败, 6:打分失败)")
     create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
     update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
                          comment="更新时间")

+ 2 - 2
pqai_agent/logging_service.py

@@ -36,9 +36,9 @@ def setup_root_logger(level=logging.DEBUG, logfile_name='service.log'):
     root_logger = logging.getLogger()
     root_logger.handlers.clear()
     root_logger.addHandler(console_handler)
-    if configs.get_env() == 'dev':
+    if configs.get_env() == 'prod':
         file_handler = RotatingFileHandler(
-            f'{logfile_name}',
+            f'/var/log/agent_service/{logfile_name}',
             maxBytes=64 * 1024 * 1024,
             backupCount=5,
             encoding='utf-8'

+ 5 - 1
pqai_agent_server/api_server.py

@@ -7,6 +7,7 @@ import werkzeug.exceptions
 from flask import Flask, request, jsonify
 from argparse import ArgumentParser
 
+from pandas.core.computation.expressions import evaluate
 from pyarrow.dataset import dataset
 from sqlalchemy.orm import sessionmaker
 
@@ -579,11 +580,14 @@ def create_test_task():
     req_data = request.json
     agent_id = req_data.get('agentId', None)
     module_id = req_data.get('moduleId', None)
+    evaluate_type = req_data.get('evaluateType', None)
     if not agent_id:
         return wrap_response(404, msg='agent id is required')
     if not module_id:
         return wrap_response(404, msg='module id is required')
-    app.task_manager.create_task(agent_id, module_id)
+    if not evaluate_type:
+        return wrap_response(404, msg='evaluate_type id is required')
+    app.task_manager.create_task(agent_id, module_id, evaluate_type)
     return wrap_response(200)
 
 

+ 9 - 5
pqai_agent_server/const/status_enum.py

@@ -10,7 +10,6 @@ class TestTaskStatus(Enum):
     CREATING = 5
     CREATED_FAIL = 6
 
-
     @property
     def description(self):
         descriptions = {
@@ -20,10 +19,11 @@ class TestTaskStatus(Enum):
             self.CANCELLED: "已取消",
             self.FAILED: "已失败",
             self.CREATING: "生成任务中",
-            self.CREATED_FAIL:"生成任务失败"
+            self.CREATED_FAIL: "生成任务失败"
         }
         return descriptions.get(self)
 
+
 # 使用示例
 def get_test_task_status_desc(status_code):
     try:
@@ -32,6 +32,7 @@ def get_test_task_status_desc(status_code):
     except ValueError:
         return f"未知状态: {status_code}"
 
+
 class TestTaskConversationsStatus(Enum):
     """任务状态枚举类"""
     PENDING = 0  # 待执行
@@ -39,6 +40,8 @@ class TestTaskConversationsStatus(Enum):
     SUCCESS = 2  # 执行成功
     FAILED = 3  # 执行失败
     CANCELLED = 4  # 已取消
+    MESSAGE_FAILED = 5  # 消息失败
+    SCORE_FAILED = 6  # 打分失败
 
     @property
     def description(self):
@@ -47,10 +50,13 @@ class TestTaskConversationsStatus(Enum):
             self.RUNNING: "执行中",
             self.SUCCESS: "执行成功",
             self.FAILED: "执行失败",
-            self.CANCELLED: "已取消"
+            self.CANCELLED: "已取消",
+            self.MESSAGE_FAILED: "消息失败",
+            self.SCORE_FAILED: "打分失败"
         }
         return descriptions.get(self)
 
+
 # 使用示例
 def get_test_task_conversations_status_desc(status_code):
     try:
@@ -58,5 +64,3 @@ def get_test_task_conversations_status_desc(status_code):
         return status.description
     except ValueError:
         return f"未知状态: {status_code}"
-
-

+ 51 - 16
pqai_agent_server/task_server.py

@@ -2,6 +2,7 @@ import json
 import threading
 import concurrent.futures
 import time
+import traceback
 from concurrent.futures import ThreadPoolExecutor
 from datetime import datetime
 from typing import Dict
@@ -15,8 +16,12 @@ from pqai_agent.data_models.agent_configuration import AgentConfiguration
 from pqai_agent.data_models.agent_test_task import AgentTestTask
 from pqai_agent.data_models.agent_test_task_conversations import AgentTestTaskConversations
 from pqai_agent.data_models.service_module import ServiceModule
+from pqai_agent.utils.prompt_utils import format_agent_profile
 from pqai_agent_server.const.status_enum import TestTaskConversationsStatus, TestTaskStatus, get_test_task_status_desc
 from concurrent.futures import ThreadPoolExecutor
+
+from scripts.evaluate_agent import evaluate_agent
+
 logger = logging_service.logger
 
 
@@ -84,7 +89,7 @@ class TaskManager:
                 {
                     "id": agent_test_task_conversation.id,
                     "agentName": agent_configuration.name,
-                    "input":MultiModalChatAgent.compose_dialogue(json.loads(agent_test_task_conversation.input)),
+                    "input": MultiModalChatAgent.compose_dialogue(json.loads(agent_test_task_conversation.input)),
                     "output": agent_test_task_conversation.output,
                     "score": agent_test_task_conversation.score,
                     "statusName": get_test_task_status_desc(agent_test_task_conversation.status),
@@ -101,10 +106,10 @@ class TaskManager:
                 "list": response_data,
             }
 
-    def create_task(self, agent_id: int, module_id: int) -> Dict:
+    def create_task(self, agent_id: int, module_id: int, evaluate_type: int) -> Dict:
         """创建新任务"""
         with self.session_maker() as session:
-            agent_test_task = AgentTestTask(agent_id=agent_id, module_id=module_id,
+            agent_test_task = AgentTestTask(agent_id=agent_id, module_id=module_id, evaluate_type=evaluate_type,
                                             status=TestTaskStatus.CREATING.value)
             session.add(agent_test_task)
             session.commit()  # 显式提交
@@ -126,7 +131,8 @@ class TaskManager:
 
             for dataset_module in dataset_module_list:
                 # 获取对话数据列表
-                conversation_datas = self.dataset_service.get_conversation_data_list_by_dataset(dataset_module.dataset_id)
+                conversation_datas = self.dataset_service.get_conversation_data_list_by_dataset(
+                    dataset_module.dataset_id)
 
                 for conversation_data in conversation_datas:
                     # 创建子任务对象
@@ -337,8 +343,10 @@ class TaskManager:
             agent_configuration = self.get_agent_configuration_by_task_id(task_id)
             query_prompt_template = agent_configuration.task_prompt
 
+            task = self.get_task(task_id)
+
             # 使用线程池执行子任务
-            with ThreadPoolExecutor(max_workers=20) as executor:  # 可根据需要调整并发数
+            with ThreadPoolExecutor(max_workers=8) as executor:  # 可根据需要调整并发数
                 futures = {}
                 for task_conversation in task_conversations:
                     if self.task_events[task_id].is_set():
@@ -348,6 +356,7 @@ class TaskManager:
                     future = executor.submit(
                         self._process_single_conversation,
                         task_id,
+                        task,
                         task_conversation,
                         query_prompt_template,
                         agent_configuration
@@ -375,7 +384,8 @@ class TaskManager:
         finally:
             self._cleanup_task_resources(task_id)
 
-    def _process_single_conversation(self, task_id, task_conversation, query_prompt_template, agent_configuration):
+    def _process_single_conversation(self, task_id, task, task_conversation, query_prompt_template,
+                                     agent_configuration):
         """处理单个对话子任务(线程安全)"""
         # 检查任务是否被取消
         if self.task_events[task_id].is_set():
@@ -404,7 +414,7 @@ class TaskManager:
                 conversation_data.version_date.replace("-", ""))
             user_profile = json.loads(user_profile_data['profile_data_v1'])
             avatar = user_profile_data['iconurl']
-            staff_profile_data = self.dataset_service.get_staff_profile_data(
+            staff_profile = self.dataset_service.get_staff_profile_data(
                 conversation_data.staff_id).agent_profile
             conversations = self.dataset_service.get_chat_conversation_list_by_ids(
                 json.loads(conversation_data.conversation),
@@ -414,11 +424,17 @@ class TaskManager:
 
             # 生成推送消息
             last_timestamp = int(conversations[-1]["timestamp"])
-            push_time = int(last_timestamp / 1000) + 24 * 3600
-            push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
-            push_message = agent._generate_message(
+            match task.evaluate_type:
+                case 0:
+                    send_timestamp = int(last_timestamp / 1000) + 10
+                case 1:
+                    send_timestamp = int(last_timestamp / 1000) + 24 * 3600
+                case _:
+                    raise ValueError("evaluate_type must be 0 or 1")
+            send_time = datetime.fromtimestamp(send_timestamp).strftime('%Y-%m-%d %H:%M:%S')
+            message = agent._generate_message(
                 context={
-                    "formatted_staff_profile": staff_profile_data,
+                    "formatted_staff_profile": staff_profile,
                     "nickname": user_profile['nickname'],
                     "name": user_profile['name'],
                     "avatar": avatar,
@@ -429,22 +445,41 @@ class TaskManager:
                     "health_conditions": user_profile['health_conditions'],
                     "medications": user_profile['medications'],
                     "interests": user_profile['interests'],
-                    "current_datetime": push_dt
+                    "current_datetime": send_time
                 },
                 dialogue_history=conversations,
                 query_prompt_template=query_prompt_template
             )
 
-            # 获取打分(TODO: 实际实现)
-            score = '{"score":0.05}'
+            if not message:
+                self.update_task_conversations_status(
+                    task_conversation.id,
+                    TestTaskConversationsStatus.MESSAGE_FAILED.value
+                )
+                return
+
+            param = {}
+            param["dialogue_history"] = conversations
+            param["message"] = message
+            param["send_time"] = send_time
+            param["agent_profile"] = json.loads(staff_profile)
+            param["user_profile"] = user_profile
+            score = evaluate_agent(param, task.evaluate_type)
+
+            if not score:
+                self.update_task_conversations_status(
+                    task_conversation.id,
+                    TestTaskConversationsStatus.SCORE_FAILED.value
+                )
+                return
 
             # 更新子任务结果
             self.update_task_conversations_res(
                 task_conversation.id,
                 TestTaskConversationsStatus.SUCCESS.value,
                 json.dumps(conversations, ensure_ascii=False),
-                json.dumps(push_message, ensure_ascii=False),
-                score
+                json.dumps(message, ensure_ascii=False),
+                json.dumps(score)
             )
 
         except Exception as e: