import json import datetime import random import traceback import concurrent.futures from tqdm import tqdm from openai import OpenAI from pymysql.cursors import DictCursor from pqai_agent.database import MySQLManager evaluation_metrics_dict = { "1.2": "是否识别关键信息", "1.3": "是否能够理解歧义词/模糊词", "1.4": "是否能理解表情包,图片消息", "1.5": "是否能理解语音/方言", "2.1": "回复是否与用户意图相关", "2.2": "回复是否清晰简洁", "2.3": "回复是否流畅", "2.4": "回复语法是否规范", "3.1": "是否能理解代词(他,她, 她, 这个那个)", "3.2": "是否能延续上文话题", "3.3": "是否记住上文的基础信息", "3.4": "是否及时结束聊天", "4.1": "是否讨论超出角色认知范围的信息", "4.2": "是否讨论了不符合当前时代背景的语言、物品、事件、概念", "4.3": "是否表现出与agent 人设相符的专业知识、生活经验或者常识", "5.1": "agent 的言行是否反映其预设的核心性格", "5.2": "agent 的价值观和道德观是否符合其预设标准", "6.1": "agent 使用的词汇、句式、语法复杂度、行话/俚语是否符合其身份、教育背景和时代?", "6.2": "agent 语气、语调(恭敬、傲慢、亲切、疏离、热情、冷淡)是否稳定?", "6.3": "agent 表达习惯、口头禅是否符合角色预设特点", "7.1": "agent 在对话中表现出的目标、关注重心是否与其设定的核心动机一致?", "8.1": "agent 是否按照预设的互动模式与用户沟通", "8.2": "agent 是否对自身角色有正确理解", "8.3": "agent 是否回复超越用户认知的信息" } def fetch_deepseek_completion(prompt, output_type='text'): """ deep_seek方法 """ client = OpenAI( api_key='sk-cfd2df92c8864ab999d66a615ee812c5', base_url="https://api.deepseek.com" ) # get response format if output_type == "json": response_format = {"type": "json_object"} else: response_format = {"type": "text"} chat_completion = client.chat.completions.create( messages=[ { "role": "user", "content": prompt, } ], model="deepseek-reasoner", response_format=response_format, ) response = chat_completion.choices[0].message.content if output_type == "json": response_json = json.loads(response) return response_json return response def get_profile_info(user_id_, user_type): match user_type: case "user": sql = f""" select iconurl as 'avatar', profile_data_v1 as 'profile' from third_party_user where third_party_user_id = %s; """ case "staff": sql = f""" select agent_profile as 'profile' from qywx_employee where third_party_user_id = %s; """ case _: raise ValueError("user_type must be 'user' or 'staff'") return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,)) def evaluate_reply_agent_prompt(dialogue_history, message, user_profile_, agent_profile, push_time): """ :param dialogue_history: :param message: :param user_profile_: :param agent_profile: :return: """ output_format = { "1.1": { "score": 1, "reason": "理由" }, "1.2": { "score": 0, "reason": "理由" } } prompt_ = f""" **评估任务说明:** 你需要对 agent 当前回复的消息(message)进行质量评估。 请基于以下输入信息: - 历史对话记录:dialogue_history - 用户预设信息:user_profile - agent 预设信息:agent_profile - 消息发送时间:push_time 结合以下指标打分,**每个子指标满分 1 分**: **评估维度与示例说明:** ### 1. 理解能力 - **1.1 是否识别用户核心意图** - 正例:用户:这款适合老人吗?→ agent:是的,它字体更大、操作简单 - 负例:回复:“颜色有红蓝两种” → 偏离意图 - **1.2 是否识别关键信息** - 正例:用户提到“糖尿病”,agent 结合健康推荐产品 - 负例:忽略关键信息,只介绍型号/库存 - **1.3 是否理解歧义词或模糊表达** - 正例:用户说“那个不错”,agent 明确“您是指X产品吗?” - 负例:直接“感谢喜欢”,未澄清 - **1.4 是否理解表情/图片** - 正例:用户发 👍 → agent 回复“收到,我帮您下单” - 负例:用户发 🙄 → agent 回复“感谢支持” - **1.5 是否理解语音/方言(转写内容)** - 正例:“想搞个便宜点的” → 理解为追求性价比 - 负例:回复“我们不卖便宜货” → 理解偏差 ### 2. 回复能力 - **2.1 回复是否与用户意图相关** - 正例:用户问退货 → agent 回复具体流程 - 负例:agent 回复“本店新品推荐” - **2.2 回复是否清晰简洁** - 正例:“退货可在APP内申请,我们会上门取件” - 负例:“嗯这个如果说退货吧,其实我们也可以...” - **2.3 回复是否流畅** - 正例:语言通顺无跳跃 - 负例:表达混乱,“如果你申请,我帮你弄好,那样能退款也可以” - **2.4 回复语法是否规范** - 正例:“欢迎再次光临” - 负例:“我帮你处理了这个东西您可以看下有没有不对的” - **2.5 回复是否具有机械性** - 正例:回复的语句需要保持正常聊天风格 - 负例:每次回复消息均包含用户称呼等属于首次聊天需要用到的称呼语句 ### 3. 上下文管理能力 - **3.1 是否正确理解代词** - 正例:用户:“他说不错” → agent 理解“他”为儿子 - 负例:理解为用户本人 - **3.2 是否延续上文话题** - 正例:上轮聊智能手表 → 本轮继续其功能 - 负例:突然推广耳机 - **3.4 是否能及时结束对话** - 正例:用户说“好的谢谢” → agent 回复“有需要随时联系” - 负例:用户已表达结束意图 → agent 仍持续推销 ### 4. 背景知识一致性 - **4.1 是否超出角色认知范围** - 正例:AI客服:推荐就医 → 建议联系医生 - 负例:自称能诊断病症 - **4.2 是否使用错误时代背景或过时词汇** - 正例:使用当下流行产品/概念 - 负例:讨论 BP 机、DVD 机等 - **4.3 是否展现出与角色设定一致的知识/经验** - 正例:金融顾问角色能清晰解释基金风险 - 负例:理财助手说“我也不太懂” ### 5. 性格行为一致性 - **5.1 言行是否体现预设性格** - 正例:设定为“亲切” → 用词温和,如“亲爱的、别担心~” - 负例:忽冷忽热,或说话带攻击性 - **5.2 价值观与道德是否一致** - 正例:拒绝不当请求、拒绝传播敏感信息 - 负例:发表不当政治/色情暗示 / 赌博相关 ### 6. 语言风格一致性 - **6.1 用词语法是否匹配身份背景** - 正例:医生角色用专业术语适度表达, 年长角色说话语气和小姑娘小孩子一样 - 负例:医生说“这个就随便吃点吧”, 65岁的角色说“好呀,好呢”这样的话 - **6.2 语气是否保持稳定** - 正例:始终热情/恭敬/冷静 - 负例:开头热情,后文冷淡 - **6.3 是否保持角色表达习惯** - 正例:师傅型角色口头禅“照您说的办” - 负例:前后说话风格差异明显 ### 7. 目标动机一致性 - **7.1 是否体现核心目标** - 正例:核心是为了和用户保持联系,为用户提供情绪价值 - 负例:一味推销 ### 8. 关系认知一致性 - **8.1 是否遵循角色与用户之间的互动模式** - 正例:设定为“助理”→ 使用“我来为您处理”, 用户是男性,使用“先生”称呼 - 负例:助理语气“我说了算”,过于权威,用户是男性,使用“女士”称呼 - **8.2 是否正确理解自己身份** - 正例:客服表明“我可以帮您提交” - 负例:自称“我是系统管理员” - **8.3 是否回复超越用户可理解范围** - 正例:面向老人用简洁语言解释 - 负例:一上来即使用技术术语堆叠回复 **评估规则:** - 每个子项: - 符合要求:1 分 - 不符合要求:0 分 - 未涉及/不适用:1 分,理由写“无需评估” - 每项后附简要中文评估理由,客观明确。 **输入:** - **对话历史**: {dialogue_history} - **Agent 预设信息**: {agent_profile} - **用户预设信息**: {user_profile_} - **Agent 消息**: {message} - **Agent 发送消息时间**:{push_time} **输出格式要求:JSON 格式** 输出格式参考:{output_format} """ return prompt_ config = { 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com', 'port': 3306, 'user': 'wqsd', 'password': 'wqsd@2025', 'database': 'ai_agent', 'charset': 'utf8mb4' } mysql_client = MySQLManager(config) if __name__ == '__main__': import pqai_agent.logging_service pqai_agent.logging_service.setup_root_logger() with open("reply_data_set_filter_2.json", "r", encoding="utf-8") as f: data = json.load(f) data = [i for i in data if i['user_active_rate'] > 0.4] print(len(data)) # 随机选择100个对话 dialogues = random.sample(data, 80) dialogue_with_profile = [] for dialogue in dialogues: agent_profile = get_profile_info(dialogue['staff_id'], 'staff') user_profile = get_profile_info(dialogue['user_id'], 'user') dialogue['agent_profile'] = json.loads(agent_profile[0]['profile']) dialogue['user_profile'] = json.loads(user_profile[0]['profile']) dialogue_with_profile.append(dialogue) F = [] errors = [] from threading import Lock import concurrent.futures write_lock = Lock() def process_sample(sub_dialogues): try: message = sub_dialogues["conversation"] agent_message = sub_dialogues["reply_msg"] push_time = sub_dialogues["reply_time"] user_profile = sub_dialogues["user_profile"] staff_profile = sub_dialogues["agent_profile"] if not agent_message: return None prompt = evaluate_reply_agent_prompt( message, agent_message, user_profile, staff_profile, push_time ) response = fetch_deepseek_completion(prompt, output_type='json') return { "user_profile": user_profile, "agent_profile": staff_profile, "dialogue_history": message, "push_message": agent_message, "push_time": push_time, "evaluation_result": response } except Exception as e: # 捕获异常并存储 error_msg = f"Error processing sample: {e}\n{traceback.format_exc()}" with write_lock: errors.append(error_msg) return None # 使用线程池处理 with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: # 提交所有任务 futures = {executor.submit(process_sample, sample): sample for sample in dialogues} # 使用tqdm创建进度条 for future in tqdm(concurrent.futures.as_completed(futures), total=len(dialogues), desc="Evaluating"): result = future.result() if result: with write_lock: F.append(result) # 打印处理过程中遇到的错误 if errors: print(f"\nEncountered {len(errors)} errors during processing:") for error in errors[:5]: # 最多打印前5个错误 print(error) # 保存结果 with open("push_message_evaluation_result_7.json", "w", encoding="utf-8") as f: json.dump(F, f, ensure_ascii=False, indent=4)