luojunhui hace 4 días
padre
commit
6896b7cfff

+ 24 - 20
evaluate_agent_v2.py

@@ -232,6 +232,7 @@ class PushMessageEvaluator(AgentEvaluator):
                 agent_profile=line["agent_profile"],
                 user_profile=line["user_profile"],
             )
+            print(evaluator_prompt)
             response = fetch_deepseek_completion(evaluator_prompt, output_type='json')
             return {
                 "user_profile": line["user_profile"],
@@ -244,30 +245,33 @@ class PushMessageEvaluator(AgentEvaluator):
         return None
 
     def evaluate(self):
-        with open("push_dataset_0613_48h_v2.json", encoding="utf-8") as f:
+        with open("push_dataset_new_0613_24h.json", encoding="utf-8") as f:
             data = json.load(f)
 
         # data = data[:8]
 
-        from concurrent.futures import ThreadPoolExecutor
-        from tqdm import tqdm
-        # # 多线程处理主逻辑
-        L = []
-        with ThreadPoolExecutor(max_workers=8) as executor:  # 可根据CPU核心数调整worker数量
-            futures = []
-            for line in data:
-                futures.append(executor.submit(self.evaluate_task, line))
-
-            # 使用tqdm显示进度
-            for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
-                result = future.result()
-                if result:
-                    L.append(result)
-        # for line in tqdm(data):
-        #     response = self.evaluate_task(line)
+        # from concurrent.futures import ThreadPoolExecutor
+        # from tqdm import tqdm
+        # # # 多线程处理主逻辑
+        # L = []
+        # with ThreadPoolExecutor(max_workers=8) as executor:  # 可根据CPU核心数调整worker数量
+        #     futures = []
+        #     for line in data:
+        #         futures.append(executor.submit(self.evaluate_task, line))
+        #
+        #     # 使用tqdm显示进度
+        #     for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
+        #         result = future.result()
+        #         if result:
+        #             L.append(result)
+        for line in tqdm(data):
+            response = self.evaluate_task(line)
         #     if response:
         #         L.append(response)
+        #
+        # # 保存结果(与原代码相同)
+        # with open("push_message_evaluation_result_0613_24_v2.json", "w", encoding="utf-8") as f:
+        #     json.dump(L, f, ensure_ascii=False, indent=4)
 
-        # 保存结果(与原代码相同)
-        with open("push_message_evaluation_result_0613_24_v2.json", "w", encoding="utf-8") as f:
-            json.dump(L, f, ensure_ascii=False, indent=4)
+if __name__ == "__main__":
+    PushMessageEvaluator().evaluate()

+ 48 - 24
generate_data_set.py

@@ -4,6 +4,7 @@ import random
 import traceback
 
 from datetime import datetime
+from typing import Dict, List, Tuple
 
 from openai import OpenAI
 from tqdm import tqdm
@@ -12,6 +13,7 @@ from pqai_agent.database import MySQLManager
 from pqai_agent.agents.message_push_agent import MessagePushAgent
 from pqai_agent.logging_service import logger
 from pqai_agent import configs, logging_service
+from pqai_agent.mq_message import MessageType
 
 logging_service.setup_root_logger()
 
@@ -91,12 +93,13 @@ def get_dialogue_history_by_id(staff_id, dialogue_id_tuple):
         from qywx_chat_history
         where id in %s;
     """
+
     conversation_list = mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,))
     history_conversation = [
         {
             "content": i['content'],
             "role": "assistant" if i['sender'] == staff_id else "user",
-            "timestamp": int(i['sendtime'] / 1000)
+            "timestamp": i['sendtime']
         } for i in conversation_list
     ]
     return history_conversation
@@ -195,6 +198,23 @@ def generate_reply_dataset():
     #     f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
 
 
+def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str:
+    role_map = {'user': '用户', 'assistant': '客服'}
+    messages = []
+    for msg in dialogue:
+        if not msg['content']:
+            continue
+        if msg['role'] not in role_map:
+            continue
+        if timestamp_type == 'ms':
+            format_dt = datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
+        else:
+            format_dt = datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
+        msg_type = msg.get('type', MessageType.TEXT).description
+        messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
+    return '\n'.join(messages)
+
+
 def generate_push_dataset():
 
     fetch_query = f"""
@@ -204,9 +224,8 @@ def generate_push_dataset():
     """
     data_set = mysql_client.select(fetch_query, cursor_type=DictCursor)
     filter_conversation = [i for i in data_set if len(json.loads(i['conversation'])) >= 20]
-    print(len(filter_conversation))
 
-    samples =random.sample(filter_conversation, 300)
+    samples =random.sample(filter_conversation, 100)
 
     # init message push agent
     agent = MessagePushAgent()
@@ -217,18 +236,20 @@ def generate_push_dataset():
         user_profile = json.loads(user_profile)
         conversation = get_dialogue_history_by_id(
             sample["staff_id"],
-            tuple(sample["conversation"])
+            tuple(json.loads(sample["conversation"]))
         )
         conversation.append(
             {
                 "content": sample["content"],
                 "role": "assistant",
-                "timestamp": sample["send_time"]
+                "timestamp": sample["send_time"] * 1000,
+                # "type": 1
             }
         )
         conversation = sorted(conversation, key=lambda i: i['timestamp'], reverse=False)
+
         last_timestamp = int(conversation[-1]["timestamp"])
-        push_time = last_timestamp + 24 * 3600
+        push_time = int(last_timestamp / 1000) + 24 * 3600
         push_dt =  datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
         try:
             push_message = agent.generate_message(
@@ -246,25 +267,28 @@ def generate_push_dataset():
                     "current_datetime": push_dt,
                     "avatar": None
                 },
-                dialogue_history=conversation,
-                timestamp_type="s"
+                dialogue_history=conversation
             )
-            insert_query = f"""
-                insert into internal_conversation_data
-                (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate)
-                values (%s, %s, %s, %s, %s, %s, %s, %s, %s);
-            """
-            mysql_client.execute(insert_query, args=(
-                '2',
-                sample["staff_id"],
-                sample["user_id"],
-                '2025-06-16',
-                sample["conversation"],
-                push_message,
-                push_time,
-                1,
-                sample["user_active_rate"]
-            ))
+            if not push_message:
+                print("push message error")
+                continue
+            else:
+                insert_query = f"""
+                    insert into internal_conversation_data
+                    (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate)
+                    values (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                mysql_client.execute(insert_query, args=(
+                    '2',
+                    sample["staff_id"],
+                    sample["user_id"],
+                    '2025-06-16',
+                    sample["conversation"],
+                    push_message,
+                    push_time,
+                    1,
+                    sample["user_active_rate"]
+                ))
         except Exception as e:
             print("error", e)
             print(traceback.format_exc())

+ 140 - 0
pqai_agent/agents/evaluate_agent.py

@@ -0,0 +1,140 @@
+import datetime
+from typing import Optional, List, Dict
+
+from dev import dialogue
+from pqai_agent.agents.simple_chat_agent import SimpleOpenAICompatibleChatAgent
+from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
+from pqai_agent.logging_service import logger
+from pqai_agent.mq_message import MessageType
+from pqai_agent.toolkit.function_tool import FunctionTool
+from pqai_agent.toolkit.lunar_festival_mapper import LunarFestivalMapper
+
+
+
+PUSH_QUERY_PROMPT_TEMPLATE = """
+    **评估任务说明**
+    **任务场景**: 客服和用户之间有一段时间没有聊天互动了,客服通过主动推送消息,希望能和用户保持联系
+    **评估任务**: 请给予以下输入信息和评分细则,对客服唤起的消息的质量进行打分
+      **输入信息**
+        1.客服的基本信息: {agent_profile}
+        2.用户的基本信息: {user_profile}
+        3.消息发送的时间: {send_time}
+        4.发送消息内容: {message}
+        5.历史对话信息:{dialogue_history},注意对话信息的格式为: [角色][时间][消息类型]对话内容
+      **评分细则,以下每个指标满分 1 分且只会存在 1 分和 0 分两种结果**
+      **评分指标和示例说明**
+        -1. 能否感知上文中用户的情绪,若上文无明显情绪,则无需评估
+            正例:客服感知到用户对“健康”话题有较为强烈的正向情绪,因此可以主动推送相关健康知识。
+            反例:客服没有感知上文用户的情绪
+            
+        -2. 能否延续上文话题
+            正例:上文聊“健康”话题,唤起消息可以继续聊“健康”、“养生”等相关话题
+            反例:上文聊“健康”话题,唤起消息却聊运动等与上文不相符话题
+            
+        -3. 回复信息是否超出客服人设的认知范围
+            正例:人设为老师的客服,需要回复教育以及教育相关的
+            负例:人设为老师的客服回答和教育方面无关的话题,比如天气预报等等
+            
+        -4. 是否使用错误时代背景或者过时词汇
+            正例:使用当下流行产品/概念
+            负例:讨论 BP 机、DVD 机等
+              
+        -5. 是否展示出与客服人设一致的知识或者经验
+            正例:金融顾问角色能清晰解释基金风险
+            负例:理财助手说“我也不太懂”, 教师职业提供天气预报等不相关话题
+            
+        -6. 语言风格是否符合客服人设
+            正例: 客服唤起消息的语言风格符合其年龄,职业,性格,比如:60岁以上老人的语言风格不应该出现“呀”“呢”等嘻嘻哈哈不符合其年龄特征的话术
+            负例: 客服唤起消息语言风格公式化,无论什么职业,年龄都是一套可爱热情小姑娘风格式的话术
+            
+        -7. 价值观、道德观是否正确
+            正例:拒绝不当请求、拒绝传播敏感信息
+            负例:发表不当政治 / 色情暗示 / 赌博相关
+            
+        -8. 客服是否遵循与用户之间的互动模式
+            正例:设定为“助理”→ 使用“我来为您处理”, 用户是男性,使用“先生”称呼
+            负例:助理语气“我说了算”,过于权威,用户是男性,使用“女士”称呼
+            
+        -9. 客服是否正确理解自己的身份
+            正例:客服理解自己聊天助理的身份,和用户的沟通仅限于线上聊天
+            负例:客服未理解自己的身份,和用户约线下相关
+            
+        -10.客服主动发起的消息是否超出用户的认知上限
+            正例: 若用户是中老年人,客服的唤起方式应该用简洁的语言传播用户感兴趣或者能理解的话题
+            负例: 用户是中老年人,客服唤起时讲AI等话题或者各种技术属于叠加
+            
+        -11.客服在唤起消息过程是否表现为真实人类
+            正例:agent 任何场景都不能告诉用户自己是 AI
+            负例:agent 向用户透露自己是 AI 助理
+            
+        -12.客服唤起消息是否机械感高
+            正例:agent 主动唤起的消息风格在符合用户兴趣爱好,迎合用户人设的前提下,需要有丰富的表达
+            负例:agent 多次唤起的消息风格相似,比如每次都是 称呼+早上/下午/晚上好 + 用户兴趣相关 之类的公式化,机械化表达 
+        
+        -13.客服唤起的消息是否关注了用户的兴趣,健康状态,昵称,地域等信息
+            正例:用户喜欢打篮球,agent 在 push 的时候可以提到篮球相关, 
+            负例:用户喜欢看种花,push 消息提到体育,用户地域在珠海,push 消息提到大连
+            
+        -14.客服唤起消息是否解决上文遗留下的合理问题或者需求
+            正例:对于健康助手agent,如果用户提到了想了解“养生”相关的知识,上文回复不够完全的可以在 push 的时候提出
+            负例:上文遗留的合理问题需求没有参考,或者回复一些不合理需求(参考第七条价值观)
+        -15.客服唤起消息是否明确表现出唤起对话聊天的意图
+            正例:agent 为了保持和用户的联系,主动 push 消息,明确表达出继续聊天的意图
+            负例:agent push 的消息没有体现出继续聊天的意图,只是机械完成推送任务
+            
+        -16.如果客服推送消息包含农历节日,判断节日日期是否在推送消息之后
+            如果客服推送消息包含农历节日祝福,比如说端午节,元宵节等,需要调用lunar_festival_mapper获取当前年份的节日日期,判断节日日期和发送日期的关系
+            正例: 节日日期在发送日期之后
+            负例: 节日日期在发送日期之前
+     **评估规则:**
+        - 每个子项:
+          - 符合要求:1 分
+          - 不符合要求:0 分
+          - 未涉及/不适用:1 分,理由写“无需评估”
+          - 每项后附简要中文评估理由,客观明确
+
+     **输出**
+        请输出一个 JSON 格式的对象,输出格式参考:{output_format}
+     """
+
+
+class EvaluatePushAgent(SimpleOpenAICompatibleChatAgent):
+    """
+    use agent to evaluate agent
+    """
+    def __init__(self,
+                 model: Optional[str] = VOLCENGINE_MODEL_DEEPSEEK_V3,
+                 system_prompt: Optional[str] = None,
+                 tools: Optional[List[FunctionTool]] = None,
+                 generate_cfg: Optional[dict] = None,
+                 max_run_step: Optional[int] = None
+                 ):
+        tools = tools or []
+        tools = tools.copy()
+        tools.extend([
+            *LunarFestivalMapper().get_tools()
+        ])
+        super().__init__(model, system_prompt, tools, generate_cfg, max_run_step)
+
+    @staticmethod
+    def compose_dialogue(dialogue: List[Dict]) -> str:
+        role_map = {'user': '用户', 'assistant': '客服'}
+        messages = []
+        for msg in dialogue:
+            if not msg['content']:
+                continue
+            if msg['role'] not in role_map:
+                continue
+            format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
+            msg_type = msg.get('type', MessageType.TEXT).description
+            messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
+        return '\n'.join(messages)
+
+    def get_evaluate_result(self, context: Dict, dialogue_history: List) -> Dict:
+        formatted_dialogue = self.compose_dialogue(dialogue_history)
+        query = PUSH_QUERY_PROMPT_TEMPLATE.format(**context, dialogue_history=formatted_dialogue)
+        self.run(query)
+        for tool_call in reversed(self.tool_call_records):
+            if tool_call['name'] == 'output_multimodal_message':
+                return tool_call['arguments']['message']
+        return {}

+ 1 - 1
pqai_agent/agents/message_push_agent.py

@@ -4,7 +4,7 @@ from typing import Optional, List, Dict
 from pqai_agent.agents.simple_chat_agent import SimpleOpenAICompatibleChatAgent
 from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
 from pqai_agent.logging_service import logger
-from pqai_agent.message import MessageType
+from pqai_agent.mq_message import MessageType
 from pqai_agent.toolkit.function_tool import FunctionTool
 from pqai_agent.toolkit.image_describer import ImageDescriber
 from pqai_agent.toolkit.message_notifier import MessageNotifier

+ 72 - 0
pqai_agent/toolkit/lunar_festival_mapper.py

@@ -0,0 +1,72 @@
+import lunardate
+import datetime
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.base import BaseToolkit
+from pqai_agent.toolkit.function_tool import FunctionTool
+from collections import defaultdict
+
+
+class LunarFestivalMapper(BaseToolkit):
+    # 常见农历节日定义(月份, 日期)
+    FESTIVALS = {
+        (1, 1): "春节",
+        (1, 15): "元宵节",
+        (2, 2): "龙抬头",
+        (5, 5): "端午节",
+        (7, 7): "七夕",
+        (7, 15): "中元节",
+        (8, 15): "中秋节",
+        (9, 9): "重阳节",
+        (12, 8): "腊八节",
+        (12, 23): "小年",
+        (12, 30): "除夕"
+    }
+
+    def __init__(self, year=2025):
+        super().__init__()
+        self.year = year
+        self.festival_dates = self._calculate_festivals()
+
+    def _calculate_festivals(self):
+        """计算指定年份的农历节日对应的公历日期"""
+        results = defaultdict(list)
+
+        # 遍历整年的每一天
+        start_date = datetime.date(self.year, 1, 1)
+        end_date = datetime.date(self.year, 12, 31)
+        current_date = start_date
+
+        while current_date <= end_date:
+            try:
+                # 将公历转换为农历
+                lunar = lunardate.LunarDate.fromSolarDate(
+                    current_date.year,
+                    current_date.month,
+                    current_date.day
+                )
+                # 检查是否为农历节日(非闰月)
+                festival_key = (lunar.month, lunar.day)
+                if festival_key in self.FESTIVALS:
+                    festival_name = self.FESTIVALS[festival_key]
+                    results[festival_name].append(current_date)
+
+            except ValueError:
+                # 跳过无效日期(如2月30日等)
+                pass
+
+            # 下一天
+            current_date += datetime.timedelta(days=1)
+
+        # 处理结果(每个节日只取第一个出现的日期)
+        return {name: dates[0] for name, dates in results.items()}
+
+    def get_festival_date(self, festival_name):
+        """获取指定节日的公历日期"""
+        return self.festival_dates.get(festival_name, "节日未找到或不在该年")
+
+    def get_all_festivals(self):
+        """获取该年所有农历节日日期"""
+        return self.festival_dates
+
+    def get_tools(self):
+        return [FunctionTool(self.get_festival_date)]