123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- import concurrent
- import datetime
- import json
- import random
- import time
- from tqdm import tqdm
- from openai import OpenAI
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- from pqai_agent.logging_service import logger
- from pqai_agent import configs, logging_service
- from pqai_agent.utils import prompt_utils
- from pqai_agent_server.utils.prompt_util import format_dialogue_history
- logging_service.setup_root_logger()
- def fetch_deepseek_completion(prompt, output_type="text"):
- """
- deep_seek方法
- """
- # client = OpenAI(
- # api_key="sk-cfd2df92c8864ab999d66a615ee812c5",
- # base_url="https://api.deepseek.com",
- # )
- client = OpenAI(
- api_key='sk-47381479425f4485af7673d3d2fd92b6',
- base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',
- )
- # get response format
- if output_type == "json":
- response_format = {"type": "json_object"}
- else:
- response_format = {"type": "text"}
- chat_completion = client.chat.completions.create(
- messages=[
- {
- "role": "user",
- "content": prompt,
- }
- ],
- # model="deepseek-chat",
- model='qwen3-235b-a22b',
- response_format=response_format,
- stream=False,
- extra_body={"enable_thinking": False},
- temperature=0.2
- )
- response = chat_completion.choices[0].message.content
- if output_type == "json":
- response_json = json.loads(response)
- return response_json
- return response
- def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str:
- role_map = {'user': '用户', 'assistant': '客服'}
- messages = []
- for msg in dialogue:
- if not msg['content']:
- continue
- if msg['role'] not in role_map:
- continue
- if timestamp_type == 'ms':
- format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
- else:
- format_dt = datetime.datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
- msg_type = "文本"
- messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
- return '\n'.join(messages)
- class AgentEvaluator:
- def __init__(self) -> None:
- config = {
- "host": "rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com",
- "port": 3306,
- "user": "wqsd",
- "password": "wqsd@2025",
- "database": "ai_agent",
- "charset": "utf8mb4",
- }
- self.mysql_client = MySQLManager(config)
- self.output_format = {
- "1.1": {
- "score": 1,
- "reason": "理由"
- },
- "1.2": {
- "score": 0,
- "reason": "理由"
- }
- }
- def get_profile_info(self, user_id_, user_type):
- match user_type:
- case "user":
- sql = f"""
- select iconurl as 'avatar', profile_data_v1 as 'profile'
- from third_party_user where third_party_user_id = %s;
- """
- case "staff":
- sql = f"""
- select agent_profile as 'profile'
- from qywx_employee where third_party_user_id = %s;
- """
- case _:
- raise ValueError("user_type must be 'user' or 'staff'")
- return self.mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
- output_dict = {
- "1.1": { "score": 1, "reason": "识别到用户焦虑并先安抚" },
- "2.1": { "score": 0, "reason": "跳过健康话题改聊理财" },
- "5.4": { "score": 1, "reason": "青年男性用词简洁,无女性化词汇" },
- "7.5": { "score": 1, "reason": "2025-05-28 发端午祝福;端午=2025-05-31" }
- }
- def generate_prompt(dialogue_history: str, message: str,
- send_time: str, user_profile: str, agent_profile: str) -> str:
- """
- 生成评估prompt
- :return: prompt
- """
- prompt = f"""
- ## 评估任务说明
- 你是一个专业的语言学专家,你需要完成一项语言评估任务。
- 该任务的背景为:当客服与用户长时间无互动时,客服会主动推送内容尝试开启互动对话。
- 该任务的输入信息包括:
- - 过往对话
- - 用户画像
- - 客服人设
- - 本次推送内容
- - 推送时间(UTC+8)
- 请根据输入信息,对本次推送内容按下列规则对每个维度逐项打分。
- 评分规则:
- - 每个 **子指标** 只取 0 或 1 分。
- 1 分:满足判分要点,或该项“无需评估”
- 0 分:不满足判分要点
- - 每项请附“简要中文理由”;若不适用,请写“无需评估”。
- ────────────────────────
- ## 评估维度与评分细则(含示例)
- ### 1. 理解能力
- 1.1 客服是否感知用户情绪
- 判分要点:
- 1) 是否识别出用户最近情绪(积极/中性/消极)。
- 2) 是否据此调整推送语气或内容。
- 正例:
- • 用户上次说“工作压力大,很累。” → push 先关怀:“最近辛苦了,给你 3 个放松小技巧…”
- • 用户上次兴奋分享球赛胜利 → push 用同频语气:“昨晚那球真绝!还想复盘关键回合吗?”
- 反例:
- • 用户上次抱怨“数据全丢了” → push 却强推会员特价,未安抚情绪。
- • 用户上次沮丧 → push 用过度欢快口吻“早呀宝子!冲鸭!”情绪不匹配。
- ### 2. 上下文管理
- 2.1 客服是否延续上文话题
- 判分要点:推送是否围绕上次核心主题,或自然衍生。
- 正例:
- • 上次讨论“糖尿病饮食”,本次补充低 GI 零食建议。
- 反例:
- • 上次聊健康,本次突然推荐炒股课程。
- 2.2 客服是否记住上文信息
- 判分要点:是否正确引用历史细节、进度或偏好。
- 正例:
- • 记得用户已经下载“春季食谱”,不再重复发送,而是询问体验。
- 反例:
- • 忘记用户已完成注册,仍提示“点击注册开始体验”。
- ### 3. 背景知识一致性
- 3.1 客服推送的消息是否不超出角色认知范围
- 判分要点:建议、结论不得超出职业权限或法律限制。
- 正例:
- • 健康顾问提醒“如症状持续请就医”。
- 反例:
- • 健康顾问直接诊断病情并开药剂量。
- 3.2 客服推送的消息用到的词汇是否符合当前时代
- 判分要点:不使用明显过时事物或词汇,符合当前年代语境。
- 正例:
- • 提到“短视频带货”。
- 反例:
- • 推荐“BP 机”“刻录 DVD”。
- 3.3 客服推送消息的知识是否知识符合角色设定
- 判分要点:内容深度与 客服专业水平相符。
- 正例:
- • 金融助理解释“FOF 与 ETF 的风险差异”。
- 反例:
- • 金融助理说“基金我也不懂”。
- ### 4. 性格行为一致性
- 4.1 客服推送的消息是否符合同一性格
- 判分要点:语气、用词保持稳定,符合人设。
- 正例:
- • 一贯稳重、有条理。
- 反例:
- • 突然使用辱骂或极端情绪。
- 4.2 客服推送的消息是否符合正确的价值观、道德观
- 判分要点:不得鼓励违法、暴力、歧视或色情。
- 正例:
- • 拒绝提供盗版资源。
- 反例:
- • 教唆赌博“稳赚不赔”。
- ### 5. 语言风格一致性
- 5.1 客服的用词语法是否匹配身份背景学历职业
- 判分要点:专业角色→专业术语;生活助手→通俗易懂。
- 正例:
- • 医生用“血糖达标范围”。
- 反例:
- • 医生说“你随便吃点吧”。
- 5.2 客服的语气是否保持稳定
- 判分要点:整条消息语气前后一致,无突变。
- 正例:
- • 始终友好、耐心。
- 反例:
- • 开头热情,末尾生硬“速回”。
- 5.3 客服是否保持角色表达习惯
- 判分要点:是否保持固定口头禅、签名等表达习惯。
- 正例:
- • 每次结尾用“祝顺利”。
- 反例:
- • 突然改用网络缩写“nbcs”。
- 5.4 客服推送消息语言风格是否匹配其年龄 & 性别(禁忌词检测,重点审)
- 判分要点:
- - 词汇选择符合年龄段典型语言;
- - 男性客服禁止出现明显女性化语气词,绝对禁止出现:呢、啦、呀、宝子、yyds等女性化用词!
- - 男性客服禁止出现“~”等女性标点符号!
- - 45+及以上避免“冲鸭”“绝绝子”“yyds”等新潮词;
- - 青年男性应简洁直接,可偶用“哈哈”“酷”;青年女性可用“呀”“哦”;
- - 不出现与性别、年龄严重背离的口头禅
- 正例:
- • 30 岁男性:“这两篇文章挺硬核,你可以先看第二节。”
- • 25 岁女性:“好的呀~我整理了 3 个小 tips,给你噢!”
- 反例:
- • 50 岁男性:“姐妹们冲鸭!绝绝子!”
- • 22 岁男性:“您若有任何疑虑敬请垂询。”(用老派公文腔)
- • 男性:出现"呢、呀、哦、啦"等女性化语气词
- 5.5 客服推送的消息是否符合其职业典型
- 判分要点:符合行业常用语气、格式、礼貌级别。
- 正例:
- • 律师引用条款:“根据《合同法》第 60 条…”
- 反例:
- • 律师说“嗨哥们,合同没啥大事”。
- ### 6. 关系认知
- 6.1 客服是否遵循角色与用户互动模式
- 判分要点:助理/客服应服务姿态;导师应指导姿态等。
- 正例:
- • “我来为您安排”。
- 反例:
- • “听我的,不准反驳”。
- 6.2 客服是否自身身份准确
- 判分要点:不得冒充更高权限或他人。
- 正例:
- • “作为您的健康顾问,我建议…”
- 反例:
- • 自称“医院主任医师”但实际是健康顾问。
- 6.3 客服推送内容是否不超出用户理解范围
- 判分要点:专业解释需配必要说明,面向老人用浅显词。
- 正例:
- • 用“血糖=身体里的糖分浓度”解释概念。
- 反例:
- • 连续堆砌专有缩写“LDL、HOMA-IR”不解释。
- 6.4 客服是否不透露 AI 身份
- 判分要点:不得说“我是 AI/机器人/大模型”。
- 正例:
- • 使用“我”为第一人称即可。
- 反例:
- • “我是一款 GPT 模型”。
- ### 7. 对话唤起
- 7.1 客服的唤起消息是否多样、非机械
- 判分要点:句式内容变化,避免模板。
- 正例:
- • “你追的剧更新啦,最燃打斗你打几分?”
- 反例:
- • 每日“晚上好!今天看篮球吗?”
- 7.2 客服推送消息是否关注用户兴趣 / 地域
- 判分要点:结合兴趣、昵称、地域、称呼。
- 正例:
- • 用户爱猫,push 附猫咪护理小贴士。
- 反例:
- • 用户讨厌广告,push 仍发折扣券。
- 7.3 客服推送消息是否解决上文遗留的合理需求(如有)
- 判分要点:补完信息、修正错误或跟进任务。
- 正例:
- • 上次承诺发教材,本次附下载链接。
- 反例:
- • 用户等待答复,push 却忽略。
- 7.4 客服推送消息是否明确表现继续聊天意图
- 判分要点:包含提问或邀请,鼓励回复。
- 正例:
- • “看完后告诉我你的想法,好吗?”
- 反例:
- • 仅单向播报:“祝好。”
- 7.5 客服推送节日祝福时间节点是否合适
- 判分要点:农历节日前 5 天内发送祝福得分为 1 分,若无需评估,得分也为 1 分
- 正例:
- • 2025-05-28 发送“端午安康”(端午 2025-05-31)。
- 反例:
- • 端午 6-2 才补发“端午快乐”。
- ────────────────────────
- ## 输出格式示例
- 输出结果为一个JSON,JSON的第一层,每一个 key 代表评估指标的 id,比如 “7.5” 代表“节日祝福及时”
- value 也是一个JSON,包含两个 key:score 和 reason,分别代表分数和理由。
- 分数只能是 0 或 1,代表是否通过判分。
- 理由是一个字符串,代表判分依据。
- 以下是一个示例输出:
- {output_dict}
- ## 输入信息
- ### 对话历史
- {dialogue_history}
- ### 用户画像
- {user_profile}
- ### 客服人设
- {agent_profile}
- ### 本次推送内容
- {message}
- ### 推送时间
- {send_time}
- ## 特别注意
- * 请严格按照上述输出格式输出,不要输出任何额外的内容
- * 请务必注意禁止出现的情况,不要做出相反的评分!
- 现在,请开始评估。
- """
- return prompt
- class PushMessageEvaluator(AgentEvaluator):
- def get_push_dataset(self):
- sql = f"""
- select staff_id, user_id, conversation, content, send_time
- from internal_conversation_data
- where dataset_id = 2;
- """
- return self.mysql_client.select(sql, cursor_type=DictCursor)
- def get_dialogue_history_by_id(self, staff_id, dialogue_id_tuple):
- sql = f"""
- select sender, sendtime, content
- from qywx_chat_history
- where id in %s;
- """
- conversation_list = self.mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,))
- history_conversation = [
- {
- "content": i['content'],
- "role": "assistant" if i['sender'] == staff_id else "user",
- "timestamp": i['sendtime']
- } for i in conversation_list
- ]
- return history_conversation
- def evaluate_task(self, line):
- user_profile = line["user_profile"]
- agent_profile = line["agent_profile"]
- send_date_str = line["push_time"]
- push_message = line["push_message"]
- format_dialogue = line['dialogue_history']
- evaluator_prompt = generate_prompt(
- dialogue_history=format_dialogue,
- message=push_message,
- send_time=send_date_str,
- agent_profile=prompt_utils.format_agent_profile(agent_profile),
- user_profile=prompt_utils.format_user_profile(user_profile),
- )
- # print(len(evaluator_prompt))
- response = fetch_deepseek_completion(evaluator_prompt, output_type='json')
- return {
- "user_profile": user_profile,
- "agent_profile": agent_profile,
- "dialogue_history": format_dialogue,
- "push_message": push_message,
- "push_time": send_date_str,
- "evaluation_result": response
- }
- def evaluate(self):
- # data = self.get_push_dataset()
- with open("test_0618_r1.json", encoding="utf-8") as f:
- data = json.loads(f.read())
- samples = random.sample(data, 48)
- # samples = [i for i in data if i['push_message'] == '文芝阿姨,晚上好呀!今天有没有抽空做做颈部拉伸运动或者热敷一下颈椎呢?这些小方法对缓解头晕很有帮助哦~']
- from concurrent.futures import ThreadPoolExecutor
- from tqdm import tqdm
- # # # 多线程处理主逻辑
- L = []
- with ThreadPoolExecutor(max_workers=8) as executor: # 可根据CPU核心数调整worker数量
- futures = []
- for line in samples:
- futures.append(executor.submit(self.evaluate_task, line))
- # 使用tqdm显示进度
- for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
- result = future.result()
- if result:
- # print(json.dumps(result, ensure_ascii=False, indent=4))
- L.append(result)
- # for line in tqdm(data):
- # response = self.evaluate_task(line)
- # print("\n")
- # print(json.dumps(response, ensure_ascii=False, indent=4))
- # if response:
- # L.append(response)
- #
- # 保存结果(与原代码相同)
- with open("test_0618_qw_v2.json", "w", encoding="utf-8") as f:
- json.dump(L, f, ensure_ascii=False, indent=4)
- if __name__ == '__main__':
- P = PushMessageEvaluator()
- P.evaluate()
|