import concurrent import datetime import json import random import time from tqdm import tqdm from openai import OpenAI from typing import List, Dict from pymysql.cursors import DictCursor # from dev import push_message from pqai_agent.database import MySQLManager from pqai_agent.logging_service import logger from pqai_agent import configs, logging_service from pqai_agent_server.utils.prompt_util import format_dialogue_history logging_service.setup_root_logger() def fetch_deepseek_completion(prompt, output_type="text"): """ deep_seek方法 """ # client = OpenAI( # api_key="sk-cfd2df92c8864ab999d66a615ee812c5", # base_url="https://api.deepseek.com", # ) client = OpenAI( api_key='sk-47381479425f4485af7673d3d2fd92b6', base_url='https://dashscope.aliyuncs.com/compatible-mode/v1', ) # get response format if output_type == "json": response_format = {"type": "json_object"} else: response_format = {"type": "text"} chat_completion = client.chat.completions.create( messages=[ { "role": "user", "content": prompt, } ], # model="deepseek-chat", model='qwen3-235b-a22b', response_format=response_format, stream=False, extra_body={"enable_thinking": False} ) response = chat_completion.choices[0].message.content if output_type == "json": response_json = json.loads(response) return response_json return response def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str: role_map = {'user': '用户', 'assistant': '客服'} messages = [] for msg in dialogue: if not msg['content']: continue if msg['role'] not in role_map: continue if timestamp_type == 'ms': format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S') else: format_dt = datetime.datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S') msg_type = "文本" messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content'])) return '\n'.join(messages) class AgentEvaluator: def __init__(self) -> None: config = { "host": "rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com", "port": 3306, "user": "wqsd", "password": "wqsd@2025", "database": "ai_agent", "charset": "utf8mb4", } self.mysql_client = MySQLManager(config) self.output_format = { "1.1": { "score": 1, "reason": "理由" }, "1.2": { "score": 0, "reason": "理由" } } def get_profile_info(self, user_id_, user_type): match user_type: case "user": sql = f""" select iconurl as 'avatar', profile_data_v1 as 'profile' from third_party_user where third_party_user_id = %s; """ case "staff": sql = f""" select agent_profile as 'profile' from qywx_employee where third_party_user_id = %s; """ case _: raise ValueError("user_type must be 'user' or 'staff'") return self.mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,)) output_dict = { "1.1": { "score": 1, "reason": "识别到用户焦虑并先安抚" }, "2.1": { "score": 0, "reason": "跳过健康话题改聊理财" }, "5.4": { "score": 1, "reason": "青年男性用词简洁,无女性化词汇" }, "7.5": { "score": 1, "reason": "2025-05-28 发端午祝福;端午=2025-05-31" } } def generate_prompt(dialogue_history: str, message: str, send_time: str, user_profile: Dict, agent_profile: Dict) -> str: """ 生成评估prompt :return: prompt """ prompt = f""" ## 评估任务说明 当 客服与用户长时间无互动时,客服会主动推送 message 以维系联系。 请根据输入信息,对该 message 按下列维度逐项打分。 输入字段: - 过往对话 - 用户画像 - 客服人设 - 本次推送内容 - 推送时间(UTC+8) 评分规则: - 每个 **子指标** 只取 0 或 1 分。 1 分:满足判分要点,或该项“无需评估” 0 分:不满足判分要点 - 每项请附“简要中文理由”;若不适用,请写“无需评估”。 ──────────────────────── ## 评估维度与评分细则(含示例) ### 1. 理解能力 1.1 客服是否感知用户情绪 判分要点: 1) 是否识别出用户最近情绪(积极/中性/消极)。 2) 是否据此调整推送语气或内容。 正例: • 用户上次说“工作压力大,很累。” → push 先关怀:“最近辛苦了,给你 3 个放松小技巧…” • 用户上次兴奋分享球赛胜利 → push 用同频语气:“昨晚那球真绝!还想复盘关键回合吗?” 反例: • 用户上次抱怨“数据全丢了” → push 却强推会员特价,未安抚情绪。 • 用户上次沮丧 → push 用过度欢快口吻“早呀宝子!冲鸭!”情绪不匹配。 ### 2. 上下文管理 2.1 客服是否延续上文话题 判分要点:推送是否围绕上次核心主题,或自然衍生。 正例: • 上次讨论“糖尿病饮食”,本次补充低 GI 零食建议。 反例: • 上次聊健康,本次突然推荐炒股课程。 2.2 客服是否记住上文信息 判分要点:是否正确引用历史细节、进度或偏好。 正例: • 记得用户已经下载“春季食谱”,不再重复发送,而是询问体验。 反例: • 忘记用户已完成注册,仍提示“点击注册开始体验”。 ### 3. 背景知识一致性 3.1 客服推送的消息是否不超出角色认知范围 判分要点:建议、结论不得超出职业权限或法律限制。 正例: • 健康顾问提醒“如症状持续请就医”。 反例: • 健康顾问直接诊断病情并开药剂量。 3.2 客服推送的消息用到的词汇是否符合当前时代 判分要点:不使用明显过时事物或词汇,符合当前年代语境。 正例: • 提到“短视频带货”。 反例: • 推荐“BP 机”“刻录 DVD”。 3.3 客服推送消息的知识是否知识符合角色设定 判分要点:内容深度与 客服专业水平相符。 正例: • 金融助理解释“FOF 与 ETF 的风险差异”。 反例: • 金融助理说“基金我也不懂”。 ### 4. 性格行为一致性 4.1 客服推送的消息是否符合同一性格 判分要点:语气、用词保持稳定,符合人设。 正例: • 一贯稳重、有条理。 反例: • 突然使用辱骂或极端情绪。 4.2 客服推送的消息是否符合正确的价值观、道德观 判分要点:不得鼓励违法、暴力、歧视或色情。 正例: • 拒绝提供盗版资源。 反例: • 教唆赌博“稳赚不赔”。 ### 5. 语言风格一致性 5.1 客服的用词语法是否匹配身份背景学历职业 判分要点:专业角色→专业术语;生活助手→通俗易懂。 正例: • 医生用“血糖达标范围”。 反例: • 医生说“你随便吃点吧”。 5.2 客服的语气是否保持稳定 判分要点:整条消息语气前后一致,无突变。 正例: • 始终友好、耐心。 反例: • 开头热情,末尾生硬“速回”。 5.3 客服是否保持角色表达习惯 判分要点:是否保持固定口头禅、签名等表达习惯。 正例: • 每次结尾用“祝顺利”。 反例: • 突然改用网络缩写“nbcs”。 5.4 客服推送消息语言风格是否匹配其年龄 & 性别(禁忌词检测,重点审) 判分要点: - 词汇选择符合年龄段典型语言; - 男性禁止出现明显女性化语气词。比如说:呢、啦、呀、宝子、yyds; - 45+ 及以上避免“冲鸭”“绝绝子”“yyds”等新潮词; - 青年男性应简洁直接,可偶用“哈哈”“酷”;青年女性可用“呀”“哦”; - 不出现与性别、年龄严重背离的口头禅 正例: • 30 岁男性:“这两篇文章挺硬核,你可以先看第二节。” • 25 岁女性:“好的呀~我整理了 3 个小 tips,给你噢!” 反例: • 50 岁男性:“姐妹们冲鸭!绝绝子!” • 22 岁男性:“您若有任何疑虑敬请垂询。”(用老派公文腔) • 男性:出现"呢、呀、哦、啦"等女性化语气词 5.5 客服推送的消息是否符合其职业典型 判分要点:符合行业常用语气、格式、礼貌级别。 正例: • 律师引用条款:“根据《合同法》第 60 条…” 反例: • 律师说“嗨哥们,合同没啥大事”。 ### 6. 关系认知 6.1 客服是否遵循角色与用户互动模式 判分要点:助理/客服应服务姿态;导师应指导姿态等。 正例: • “我来为您安排”。 反例: • “听我的,不准反驳”。 6.2 客服是否自身身份准确 判分要点:不得冒充更高权限或他人。 正例: • “作为您的健康顾问,我建议…” 反例: • 自称“医院主任医师”但实际是健康顾问。 6.3 客服推送内容是否不超出用户理解范围 判分要点:专业解释需配必要说明,面向老人用浅显词。 正例: • 用“血糖=身体里的糖分浓度”解释概念。 反例: • 连续堆砌专有缩写“LDL、HOMA-IR”不解释。 6.4 客服是否不透露 AI 身份 判分要点:不得说“我是 AI/机器人/大模型”。 正例: • 使用“我”为第一人称即可。 反例: • “我是一款 GPT 模型”。 ### 7. 对话唤起 7.1 客服的唤起消息是否多样、非机械 判分要点:句式内容变化,避免模板。 正例: • “你追的剧更新啦,最燃打斗你打几分?” 反例: • 每日“晚上好!今天看篮球吗?” 7.2 客服推送消息是否关注用户兴趣 / 地域 判分要点:结合兴趣、昵称、地域、称呼。 正例: • 用户爱猫,push 附猫咪护理小贴士。 反例: • 用户讨厌广告,push 仍发折扣券。 7.3 客服推送消息是否解决上文遗留的合理需求(如有) 判分要点:补完信息、修正错误或跟进任务。 正例: • 上次承诺发教材,本次附下载链接。 反例: • 用户等待答复,push 却忽略。 7.4 客服推送消息是否明确表现继续聊天意图 判分要点:包含提问或邀请,鼓励回复。 正例: • “看完后告诉我你的想法,好吗?” 反例: • 仅单向播报:“祝好。” 7.5 客服推送节日祝福时间节点是否合适 判分要点:农历节日前 5 天内发送祝福得分为 1 分,若无需评估,得分也为 1 分 正例: • 2025-05-28 发送“端午安康”(端午 2025-05-31)。 反例: • 端午 6-2 才补发“端午快乐”。 ──────────────────────── ## 输出格式示例 输出结果为一个JSON,JSON的第一层,每一个 key 代表评估指标的 id,比如 “7.5” 代表“节日祝福及时” value 也是一个JSON,包含两个 key:score 和 reason,分别代表分数和理由。 分数只能是 0 或 1,代表是否通过判分。 理由是一个字符串,代表判分依据。 以下是一个示例输出: {output_dict} ## 输入信息 ### 对话历史 {dialogue_history} ### 用户画像 {user_profile} ### 客服人设 {agent_profile} ### 本次推送内容 {message} ### 推送时间 {send_time} """ return prompt class PushMessageEvaluator(AgentEvaluator): def get_push_dataset(self): sql = f""" select staff_id, user_id, conversation, content, send_time from internal_conversation_data where dataset_id = 2; """ return self.mysql_client.select(sql, cursor_type=DictCursor) def get_dialogue_history_by_id(self, staff_id, dialogue_id_tuple): sql = f""" select sender, sendtime, content from qywx_chat_history where id in %s; """ conversation_list = self.mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,)) history_conversation = [ { "content": i['content'], "role": "assistant" if i['sender'] == staff_id else "user", "timestamp": i['sendtime'] } for i in conversation_list ] return history_conversation def evaluate_task(self, line): # staff_id = line['staff_id'] # user_id = line['user_id'] # conversation_id_list = json.loads(line['conversation']) # push_message = line['content'] # send_time = line['send_time'] # send_date_str = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d %H:%M:%S') # dialogue_list = self.get_dialogue_history_by_id(staff_id, tuple(conversation_id_list)) # format_dialogue = compose_dialogue(dialogue_list) # agent_profile = self.get_profile_info(staff_id, "staff")[0]['profile'] # agent_profile = json.loads(agent_profile) # user_profile = self.get_profile_info(user_id, "user")[0]['profile'] # user_profile = json.loads(user_profile) user_profile = line["user_profile"] agent_profile = line["agent_profile"] send_date_str = line["push_time"] push_message = line["push_message"] format_dialogue = line['dialogue_history'] evaluator_prompt = generate_prompt( dialogue_history=format_dialogue, message=push_message, send_time=send_date_str, agent_profile=agent_profile, user_profile=user_profile, ) print(evaluator_prompt) response = fetch_deepseek_completion(evaluator_prompt, output_type='json') return { "user_profile": user_profile, "agent_profile": agent_profile, "dialogue_history": format_dialogue, "push_message": push_message, "push_time": send_date_str, "evaluation_result": response } def evaluate(self): # data = self.get_push_dataset() with open("test_0618_r1.json", encoding="utf-8") as f: data = json.loads(f.read()) samples = random.sample(data, 48) samples = [i for i in data if i['push_message'] == '文芝阿姨,晚上好呀!今天有没有抽空做做颈部拉伸运动或者热敷一下颈椎呢?这些小方法对缓解头晕很有帮助哦~'] from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm # # # 多线程处理主逻辑 L = [] with ThreadPoolExecutor(max_workers=8) as executor: # 可根据CPU核心数调整worker数量 futures = [] for line in samples: futures.append(executor.submit(self.evaluate_task, line)) # 使用tqdm显示进度 for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)): result = future.result() if result: print(json.dumps(result, ensure_ascii=False, indent=4)) L.append(result) # for line in tqdm(data): # response = self.evaluate_task(line) # print("\n") # print(json.dumps(response, ensure_ascii=False, indent=4)) # if response: # L.append(response) # # # 保存结果(与原代码相同) # with open("test_0618_v3.json", "w", encoding="utf-8") as f: # json.dump(L, f, ensure_ascii=False, indent=4) if __name__ == '__main__': P = PushMessageEvaluator() P.evaluate()