123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import json
- import datetime
- import random
- import traceback
- import concurrent.futures
- from threading import Lock
- from tqdm import tqdm
- from openai import OpenAI
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- evaluation_metrics_dict = {
- "1.2": "是否识别关键信息",
- "1.3": "是否能够理解歧义词/模糊词",
- "1.4": "是否能理解表情包,图片消息",
- "1.5": "是否能理解语音/方言",
- "2.1": "回复是否与用户意图相关",
- "2.2": "回复是否清晰简洁",
- "2.3": "回复是否流畅",
- "2.4": "回复语法是否规范",
- "3.1": "是否能理解代词(他,她, 她, 这个那个)",
- "3.2": "是否能延续上文话题",
- "3.3": "是否记住上文的基础信息",
- "3.4": "是否及时结束聊天",
- "4.1": "是否讨论超出角色认知范围的信息",
- "4.2": "是否讨论了不符合当前时代背景的语言、物品、事件、概念",
- "4.3": "是否表现出与agent 人设相符的专业知识、生活经验或者常识",
- "5.1": "agent 的言行是否反映其预设的核心性格",
- "5.2": "agent 的价值观和道德观是否符合其预设标准",
- "6.1": "agent 使用的词汇、句式、语法复杂度、行话/俚语是否符合其身份、教育背景和时代?",
- "6.2": "agent 语气、语调(恭敬、傲慢、亲切、疏离、热情、冷淡)是否稳定?",
- "6.3": "agent 表达习惯、口头禅是否符合角色预设特点",
- "7.1": "agent 在对话中表现出的目标、关注重心是否与其设定的核心动机一致?",
- "8.1": "agent 是否按照预设的互动模式与用户沟通",
- "8.2": "agent 是否对自身角色有正确理解",
- "8.3": "agent 是否回复超越用户认知的信息"
- }
- def fetch_deepseek_completion(prompt, output_type='text'):
- """
- deep_seek方法
- """
- client = OpenAI(
- api_key='sk-cfd2df92c8864ab999d66a615ee812c5',
- base_url="https://api.deepseek.com"
- )
- # get response format
- if output_type == "json":
- response_format = {"type": "json_object"}
- else:
- response_format = {"type": "text"}
- chat_completion = client.chat.completions.create(
- messages=[
- {
- "role": "user",
- "content": prompt,
- }
- ],
- model="deepseek-reasoner",
- response_format=response_format,
- )
- response = chat_completion.choices[0].message.content
- if output_type == "json":
- response_json = json.loads(response)
- return response_json
- return response
- def get_profile_info(user_id_, user_type):
- match user_type:
- case "user":
- sql = f"""
- select iconurl as 'avatar', profile_data_v1 as 'profile'
- from third_party_user where third_party_user_id = %s;
- """
- case "staff":
- sql = f"""
- select agent_profile as 'profile'
- from qywx_employee where third_party_user_id = %s;
- """
- case _:
- raise ValueError("user_type must be 'user' or 'staff'")
- return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
- def evaluate_reply_agent_prompt(dialogue_history, message, user_profile_, agent_profile, push_time):
- """
- :param dialogue_history:
- :param message:
- :param user_profile_:
- :param agent_profile:
- :return:
- """
- output_format = {
- "1.1": {
- "score": 1,
- "reason": "理由"
- },
- "1.2": {
- "score": 0,
- "reason": "理由"
- }
- }
- prompt_ = f"""
- **评估任务:** 基于给定的对话历史和 Agent 预设信息和用户的预设信息,评估 Agent 在对话中的表现。
- 使用以下维度和指标进行评分。
- **评估指标:**
- 1. 理解能力
- 1.1 是否识别用户核心意图
- 1.2 是否识别关键信息
- 1.3 是否能够理解歧义词/模糊词
- 1.4 是否能理解表情包,图片消息
- 1.5 是否能理解语音/方言
- 2. 回复能力
- 2.1 回复是否与用户意图相关
- 2.2 回复是否清晰简洁
- 2.3 回复是否流畅
- 2.4 回复语法是否规范
- 3. 上下文管理能力
- 3.1 是否能理解代词(他,她, 她, 这个那个)
- 3.2 是否能延续上文话题
- 3.4 是否及时结束聊天
- 4. 背景知识一致性
- 4.1 是否讨论超出角色认知范围的信息
- 4.2 是否讨论了不符合当前时代背景的语言、物品、事件、概念
- 4.3 是否表现出与agent 人设相符的专业知识、生活经验或者常识
- 5. 性格行为一致性
- 5.1 agent 的言行是否反映其预设的核心性格
- 5.2 agent 的价值观和道德观是否符合其预设标准
- 6. 语言风格一致性
- 6.1 agent 使用的词汇、句式、语法复杂度、行话/俚语是否符合其身份、教育背景和时代?
- 6.2 agent 语气、语调(恭敬、傲慢、亲切、疏离、热情、冷淡)是否稳定?
- 6.3 agent 表达习惯、口头禅是否符合角色预设特点
- 7. 目标动机一致性
- 7.1 agent 在对话中表现出的目标、关注重心是否与其设定的核心动机一致?
- 8. 关系认知一致性
- 8.1 agent 是否按照预设的互动模式与用户沟通
- 8.2 agent 是否对自身角色有正确理解
- 8.3 agent 是否回复超越用户认知的信息
-
- **评估规则:**
- - 对于每个指标:
- - 如果符合要求,得 1 分。
- - 如果不符合要求,得 0 分。
- - 如果指标不适用(如对话未涉及相关场景),得 1 分(无需评估。
- - 理由必须基于对话内容,简短且客观,理由需要是中文, 如果是无需评估,则理由写无需评估
-
- **输入:**
- - **对话历史**: {dialogue_history}
- - **Agent 预设信息**: {agent_profile}
- - **用户预设信息**: {user_profile_}
- - **Agent 消息**: {message}
- - **Agent 发送消息时间**:{push_time}
-
- **输出格式要求:JSON 格式**
- 输出格式参考:{output_format}
-
- """
- return prompt_
- config = {
- 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com',
- 'port': 3306,
- 'user': 'wqsd',
- 'password': 'wqsd@2025',
- 'database': 'ai_agent',
- 'charset': 'utf8mb4'
- }
- mysql_client = MySQLManager(config)
- if __name__ == '__main__':
- import pqai_agent.logging_service
- pqai_agent.logging_service.setup_root_logger()
- with open("reply_data_set_filter.json", "r", encoding="utf-8") as f:
- data = json.load(f)
- # 随机选择100个对话
- dialogues = random.sample(data, 1)
- dialogue_with_profile = []
- for dialogue in dialogues:
- agent_profile = get_profile_info(dialogue['staff_id'], 'staff')
- user_profile = get_profile_info(dialogue['user_id'], 'user')
- dialogue['agent_profile'] = json.loads(agent_profile[0]['profile'])
- dialogue['user_profile'] = json.loads(user_profile[0]['profile'])
- dialogue_with_profile.append(dialogue)
- F = []
- errors = []
- from threading import Lock
- import concurrent.futures
- write_lock = Lock()
- def process_sample(sub_dialogues):
- try:
- message = sub_dialogues["conversation"]
- agent_message = sub_dialogues["reply_msg"]
- push_time = sub_dialogues["reply_time"]
- user_profile = sub_dialogues["user_profile"]
- staff_profile = sub_dialogues["agent_profile"]
- if not agent_message:
- return None
- prompt = evaluate_reply_agent_prompt(
- message, agent_message, user_profile, staff_profile, push_time
- )
- response = fetch_deepseek_completion(prompt, output_type='json')
- return {
- "user_profile": user_profile,
- "agent_profile": staff_profile,
- "dialogue_history": message,
- "push_message": agent_message,
- "push_time": push_time,
- "evaluation_result": response
- }
- except Exception as e:
- # 捕获异常并存储
- error_msg = f"Error processing sample: {e}\n{traceback.format_exc()}"
- with write_lock:
- errors.append(error_msg)
- return None
- # 使用线程池处理
- with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
- # 提交所有任务
- futures = {executor.submit(process_sample, sample): sample for sample in dialogues}
- # 使用tqdm创建进度条
- for future in tqdm(concurrent.futures.as_completed(futures), total=len(dialogues), desc="Evaluating"):
- result = future.result()
- if result:
- with write_lock:
- F.append(result)
- # 打印处理过程中遇到的错误
- if errors:
- print(f"\nEncountered {len(errors)} errors during processing:")
- for error in errors[:5]: # 最多打印前5个错误
- print(error)
- # 保存结果
- with open("push_message_evaluation_result_4.json", "w", encoding="utf-8") as f:
- json.dump(F, f, ensure_ascii=False, indent=4)
|