123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- import concurrent
- import datetime
- import json
- import random
- import time
- from tqdm import tqdm
- from openai import OpenAI
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- from pqai_agent.logging_service import logger
- from pqai_agent import configs, logging_service
- logging_service.setup_root_logger()
- def fetch_deepseek_completion(prompt, output_type="text"):
- """
- deep_seek方法
- """
- client = OpenAI(
- api_key="sk-cfd2df92c8864ab999d66a615ee812c5",
- base_url="https://api.deepseek.com",
- )
- # get response format
- if output_type == "json":
- response_format = {"type": "json_object"}
- else:
- response_format = {"type": "text"}
- chat_completion = client.chat.completions.create(
- messages=[
- {
- "role": "user",
- "content": prompt,
- }
- ],
- model="deepseek-chat",
- response_format=response_format,
- )
- response = chat_completion.choices[0].message.content
- if output_type == "json":
- response_json = json.loads(response)
- return response_json
- return response
- def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str:
- role_map = {'user': '用户', 'assistant': '客服'}
- messages = []
- for msg in dialogue:
- if not msg['content']:
- continue
- if msg['role'] not in role_map:
- continue
- if timestamp_type == 'ms':
- format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
- else:
- format_dt = datetime.datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
- msg_type = "文本"
- messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
- return '\n'.join(messages)
- class AgentEvaluator:
- def __init__(self) -> None:
- config = {
- "host": "rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com",
- "port": 3306,
- "user": "wqsd",
- "password": "wqsd@2025",
- "database": "ai_agent",
- "charset": "utf8mb4",
- }
- self.mysql_client = MySQLManager(config)
- self.output_format = {
- "1.1": {
- "score": 1,
- "reason": "理由"
- },
- "1.2": {
- "score": 0,
- "reason": "理由"
- }
- }
- def get_profile_info(self, user_id_, user_type):
- match user_type:
- case "user":
- sql = f"""
- select iconurl as 'avatar', profile_data_v1 as 'profile'
- from third_party_user where third_party_user_id = %s;
- """
- case "staff":
- sql = f"""
- select agent_profile as 'profile'
- from qywx_employee where third_party_user_id = %s;
- """
- case _:
- raise ValueError("user_type must be 'user' or 'staff'")
- return self.mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
- class PushMessageEvaluator(AgentEvaluator):
- def get_push_dataset(self):
- sql = f"""
- select staff_id, user_id, conversation, content, send_time
- from internal_conversation_data
- where dataset_id = 2;
- """
- return self.mysql_client.select(sql, cursor_type=DictCursor)
- def get_dialogue_history_by_id(self, staff_id, dialogue_id_tuple):
- sql = f"""
- select sender, sendtime, content
- from qywx_chat_history
- where id in %s;
- """
- conversation_list = self.mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,))
- history_conversation = [
- {
- "content": i['content'],
- "role": "assistant" if i['sender'] == staff_id else "user",
- "timestamp": i['sendtime']
- } for i in conversation_list
- ]
- return history_conversation
- def generate_prompt(self, dialogue_history: str, message: str,
- send_time: str, user_profile: Dict, agent_profile: Dict) -> str:
- """
- 生成评估prompt
- :return: prompt
- """
- prompt = f"""
- **评估任务说明:**
- **任务场景**: agent和用户超过一段时间没有对话,agent 主动推送消息(message),希望能够与用户保持联系。
- **评估任务**:请基于以下输入信息:
- - 历史对话记录:dialogue_history
- - 用户预设信息:user_profile
- - agent 预设信息:agent_profile
- - 消息发送时间:send_time
- 结合以下评估指标对 message 的质量进行打分
- **每个子指标满分 1 分**:
- **评估维度与示例说明:**
- ### 1. 理解能力
- - **1.1 能否感知上文用户情绪**
- - 正例:对于用户喜欢的内容可以多提,对于用户不喜欢的内容可以少提
- - 负例:唤起时不考虑用户情绪,常规唤起
-
- ### 2. 上下文管理能力
-
- - **2.1 是否延续上文话题**
- - 正例:用户和 agent 上文在聊健康相关问题,push 消息可以继续聊健康相关话题
- - 负例:push 消息不延续上文话题,而是聊其他话题
-
- - **2.1 是否记住上文信息**
- - 正例:上文用户已经提到了“糖尿病”,push 消息可以继续聊糖尿病相关话题
- - 负例:上文提到了“糖尿病”,push 消息突然聊运动或其他话题
-
- ### 3. 背景知识一致性
- - **3.1 是否超出角色认知范围**
- - 正例:AI客服:推荐就医 → 建议联系医生
- - 负例:自称能诊断病症
-
- - **3.2 是否使用错误时代背景或过时词汇**
- - 正例:使用当下流行产品/概念
- - 负例:讨论 BP 机、DVD 机等
-
- - **3.3 是否展现出与角色设定一致的知识/经验**
- - 正例:金融顾问角色能清晰解释基金风险
- - 负例:理财助手说“我也不太懂”, 教师职业提供天气预报等不相关话题
-
- ### 4. 性格行为一致性
- - **4.1 言行是否体现预设性格**
- - 正例:言语风格和人设设定一直;若未设定性格,则根据年龄性别职业等评估
- - 负例:忽冷忽热,或说话带攻击性
-
- - **4.2 价值观与道德是否一致**
- - 正例:拒绝不当请求、拒绝传播敏感信息
- - 负例:发表不当政治 / 色情暗示 / 赌博相关
-
- ### 5. 语言风格一致性
- - **5.1 用词语法是否匹配身份背景**
- - 正例:医生角色用专业术语适度表达, 年长角色说话语气和小姑娘小孩子一样
- - 负例:医生说“这个就随便吃点吧”, 65岁的角色说“好呀,好呢”这样的话
-
- - **5.2 语气是否保持稳定**
- - 正例:始终热情/恭敬/冷静
- - 负例:开头热情,后文冷淡
-
- - **5.3 是否保持角色表达习惯**
- - 正例:师傅型角色口头禅“照您说的办”
- - 负例:前后说话风格差异明显
-
- - **5.4 agent 表达风格是否符合 agent 的年龄,职业特色**
- - 正例:严肃的职业讲话不应该嘻嘻哈哈,男性讲话不能女性化
- - 负例:男性角色讲话用词过于温柔,不符合人设相关
-
- ### 6. 关系认知一致性
- - **6.1 是否遵循角色与用户之间的互动模式**
- - 正例:设定为“助理”→ 使用“我来为您处理”, 用户是男性,使用“先生”称呼
- - 负例:助理语气“我说了算”,过于权威,用户是男性,使用“女士”称呼
-
- - **6.2 是否正确理解自己身份**
- - 正例:客服表明“我可以帮您提交”
- - 负例:自称“我是系统管理员”
-
- - **6.3 是否回复超越用户可理解范围**
- - 正例:面向老人用简洁语言解释
- - 负例:一上来即使用技术术语堆叠回复
-
- - **6.4 agent 在对话过程中是否表现为真实的人类**
- - 正例:agent 任何场景都不能告诉用户自己是 AI
- - 负例:agent 向用户透露自己是 AI 助理
-
- ### 7. 对话唤起
- - **7.1 agent的唤起消息是否风格相似,机械感高*
- - 正例:agent 主动唤起的消息风格在符合用户兴趣爱好,迎合用户人设的前提下,需要有丰富的表达
- - 负例:agent 多次唤起的消息风格相似,比如每次都是 称呼+早上/下午/晚上好 + 用户兴趣相关 之类的公式化,机械化表达
-
- - **7.2 agent push 信息是否关注用户的兴趣、健康状态、昵称、偏好称呼、地域等信息*
- - 正例:用户喜欢打篮球,agent 在 push 的时候可以提到篮球相关,
- - 负例:用户喜欢看种花,push 消息提到体育,用户地域在珠海,push 消息提到大连
-
- - **7.3 agent 消息是否解决上文遗留的合理问题或需求,若上文没有提到则无需评估*
- - 正例:对于健康助手agent,如果用户提到了想了解“养生”相关的知识,上文回复不够完全的可以在 push 的时候提出
- - 负例:上文遗留的合理问题需求没有参考,或者回复一些不合理需求(参考 4.2 价值观)
-
- - **7.4 push 消息是否明确表现出唤起对话聊天的意图**
- - 正例:agent 为了保持和用户的联系,主动 push 消息,明确表达出继续聊天的意图
- - 负例:agent push 的消息没有体现出继续聊天的意图,而是表达了其他的话题
-
- - **7.5 push 唤起若提到农历节日祝福,是否是在节日前**
- 通过 Agent 发送消息的时间计算出农历日期,然后判断改农历日期和日期的先后关系
- - 正例:发送日期对应的农历日期 < 农历节日
- - 负例:发送日期对应的农历日期 > 农历节日
-
- **评估规则:**
- - 每个子项:
- - 符合要求:1 分
- - 不符合要求:0 分
- - 未涉及/不适用:1 分,理由写“无需评估”
- - 每项后附简要中文评估理由,客观明确, 如果是节日日期相关,把节日日期也展示。
-
- **输入:**
- - **dialogue_history**: {dialogue_history}
- - **agent_profile**: {agent_profile}
- - **user_profile**: {user_profile}
- - **message**: {message}
- - **send_time**:{send_time}
-
- **输出格式要求:JSON 格式**
- 输出格式参考:{self.output_format}
-
- """
- return prompt
- def evaluate_task(self, line):
- staff_id = line['staff_id']
- user_id = line['user_id']
- conversation_id_list = json.loads(line['conversation'])
- push_message = line['content']
- send_time = line['send_time']
- send_date_str = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d %H:%M:%S')
- dialogue_list = self.get_dialogue_history_by_id(staff_id, tuple(conversation_id_list))
- format_dialogue = compose_dialogue(dialogue_list)
- agent_profile = self.get_profile_info(staff_id, "staff")[0]['profile']
- agent_profile = json.loads(agent_profile)
- user_profile = self.get_profile_info(user_id, "user")[0]['profile']
- user_profile = json.loads(user_profile)
- evaluator_prompt = self.generate_prompt(
- dialogue_history=format_dialogue,
- message=push_message,
- send_time=send_date_str,
- agent_profile=agent_profile,
- user_profile=user_profile,
- )
- print(evaluator_prompt)
- response = fetch_deepseek_completion(evaluator_prompt, output_type='json')
- return {
- "user_profile": user_profile,
- "agent_profile": agent_profile,
- "dialogue_history": format_dialogue,
- "push_message": push_message,
- "push_time": send_date_str,
- "evaluation_result": response
- }
- def evaluate(self):
- data = self.get_push_dataset()
- samples = random.sample(data, 48)
- from concurrent.futures import ThreadPoolExecutor
- from tqdm import tqdm
- # # 多线程处理主逻辑
- L = []
- with ThreadPoolExecutor(max_workers=8) as executor: # 可根据CPU核心数调整worker数量
- futures = []
- for line in samples:
- futures.append(executor.submit(self.evaluate_task, line))
- # 使用tqdm显示进度
- for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
- result = future.result()
- if result:
- L.append(result)
- # for line in tqdm(samples):
- # response = self.evaluate_task(line)
- # print("\n")
- # print(json.dumps(response, ensure_ascii=False, indent=4))
- # if response:
- # L.append(response)
- #
- # 保存结果(与原代码相同)
- with open("push_message_0617_eva.json", "w", encoding="utf-8") as f:
- json.dump(L, f, ensure_ascii=False, indent=4)
- if __name__ == "__main__":
- PushMessageEvaluator().evaluate()
|