|
@@ -1,305 +0,0 @@
|
|
|
-import json
|
|
|
-import time
|
|
|
-import random
|
|
|
-import traceback
|
|
|
-
|
|
|
-from datetime import datetime
|
|
|
-from typing import Dict, List, Tuple
|
|
|
-
|
|
|
-from openai import OpenAI
|
|
|
-from tqdm import tqdm
|
|
|
-from pymysql.cursors import DictCursor
|
|
|
-from pqai_agent.database import MySQLManager
|
|
|
-from pqai_agent.agents.message_push_agent import MessagePushAgent
|
|
|
-from pqai_agent.logging_service import logger
|
|
|
-from pqai_agent import configs, logging_service
|
|
|
-from pqai_agent.mq_message import MessageType
|
|
|
-
|
|
|
-logging_service.setup_root_logger()
|
|
|
-
|
|
|
-
|
|
|
-config = {
|
|
|
- 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com',
|
|
|
- 'port': 3306,
|
|
|
- 'user': 'wqsd',
|
|
|
- 'password': 'wqsd@2025',
|
|
|
- 'database': 'ai_agent',
|
|
|
- 'charset': 'utf8mb4'
|
|
|
-}
|
|
|
-mysql_client = MySQLManager(config)
|
|
|
-
|
|
|
-
|
|
|
-def split_dialogue_history(dialogue_history_, timeout=30*60*1000):
|
|
|
- """
|
|
|
- :param dialogue_history_:
|
|
|
- :param timeout: 30 minutes
|
|
|
- :return:
|
|
|
- """
|
|
|
- messages_sorted = sorted(dialogue_history_, key=lambda x: x['timestamp'])
|
|
|
- dialogues = []
|
|
|
- current_dialogue = []
|
|
|
-
|
|
|
- for i, msg in enumerate(messages_sorted):
|
|
|
- if not current_dialogue:
|
|
|
- current_dialogue.append(msg)
|
|
|
- continue
|
|
|
-
|
|
|
- prev_msg = messages_sorted[i - 1]
|
|
|
- time_diff = msg["timestamp"] - prev_msg["timestamp"]
|
|
|
-
|
|
|
- # 判断是否为新对话
|
|
|
- is_new_dialogue = False
|
|
|
- if time_diff > timeout:
|
|
|
- is_new_dialogue = True
|
|
|
-
|
|
|
- if is_new_dialogue:
|
|
|
- dialogues.append(current_dialogue)
|
|
|
- current_dialogue = [msg]
|
|
|
- else:
|
|
|
- current_dialogue.append(msg)
|
|
|
-
|
|
|
- if current_dialogue:
|
|
|
- dialogues.append(current_dialogue)
|
|
|
-
|
|
|
- return dialogues
|
|
|
-
|
|
|
-
|
|
|
-def get_conversation_info():
|
|
|
- sql = f"""
|
|
|
- select roomid, count(id) as 'article_num'
|
|
|
- from qywx_chat_history where msg_type in (1,2,4) group by roomid
|
|
|
- having count(id) > 20;
|
|
|
- """
|
|
|
- return mysql_client.select(sql, cursor_type=DictCursor)
|
|
|
-
|
|
|
-
|
|
|
-def get_dialogue_history(room_id_):
|
|
|
- """
|
|
|
- 获取对话历史
|
|
|
- :param room_id_:
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""
|
|
|
- select id, sender, receiver, sendtime, content
|
|
|
- from qywx_chat_history
|
|
|
- where roomid = %s and msg_type in %s order by sendtime;
|
|
|
- """
|
|
|
- return mysql_client.select(sql=sql, cursor_type=DictCursor, args=(room_id_, (1, 2, 4)))
|
|
|
-
|
|
|
-
|
|
|
-def get_dialogue_history_by_id(staff_id, dialogue_id_tuple):
|
|
|
- sql = f"""
|
|
|
- select sender, sendtime, content
|
|
|
- from qywx_chat_history
|
|
|
- where id in %s;
|
|
|
- """
|
|
|
-
|
|
|
- conversation_list = mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,))
|
|
|
- history_conversation = [
|
|
|
- {
|
|
|
- "content": i['content'],
|
|
|
- "role": "assistant" if i['sender'] == staff_id else "user",
|
|
|
- "timestamp": i['sendtime']
|
|
|
- } for i in conversation_list
|
|
|
- ]
|
|
|
- return history_conversation
|
|
|
-
|
|
|
-
|
|
|
-def get_profile_info(user_id_, user_type):
|
|
|
- match user_type:
|
|
|
- case "user":
|
|
|
- sql = f"""
|
|
|
- select iconurl as 'avatar', profile_data_v1 as 'profile'
|
|
|
- from third_party_user where third_party_user_id = %s;
|
|
|
- """
|
|
|
- case "staff":
|
|
|
- sql = f"""
|
|
|
- select agent_profile as 'profile'
|
|
|
- from qywx_employee where third_party_user_id = %s;
|
|
|
- """
|
|
|
- case _:
|
|
|
- raise ValueError("user_type must be 'user' or 'staff'")
|
|
|
-
|
|
|
- return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
|
|
|
-
|
|
|
-
|
|
|
-def generate_reply_dataset():
|
|
|
- conversation_info_list = get_conversation_info()
|
|
|
- data_set = []
|
|
|
- for conversation_info in tqdm(conversation_info_list):
|
|
|
- room_id = conversation_info["roomid"]
|
|
|
- staff_id = room_id.split(":")[1]
|
|
|
- if staff_id in ('1688854974625870', '1688856125791790', '1688856125791452'):
|
|
|
- user_id = room_id.split(":")[2]
|
|
|
- if staff_id and user_id:
|
|
|
- dialogue_history = get_dialogue_history(room_id)
|
|
|
- for idx, dialogue_info in enumerate(dialogue_history):
|
|
|
- if dialogue_info["sender"] == staff_id:
|
|
|
- conversation = dialogue_history[: idx]
|
|
|
- history_conversation = [
|
|
|
- {
|
|
|
- "id": i['id'],
|
|
|
- "content": i['content'],
|
|
|
- "role": "assistant" if i['sender'] == staff_id else "user",
|
|
|
- "timestamp": int(i['sendtime'] / 1000)
|
|
|
- } for i in conversation
|
|
|
- ]
|
|
|
- # filter history_conversation
|
|
|
- history_conversation = [i for i in history_conversation if i['timestamp'] > int(dialogue_info['sendtime'] / 1000) - 60 * 60 * 24 * 30]
|
|
|
-
|
|
|
- if len(history_conversation) > 20:
|
|
|
- history_conversation = history_conversation[-20:]
|
|
|
-
|
|
|
- eva_conversation = history_conversation[-10:]
|
|
|
- if history_conversation:
|
|
|
- user_activate_rate= len([i for i in eva_conversation if i['role'] == 'user']) / len(eva_conversation)
|
|
|
- reply_msg = dialogue_info['content']
|
|
|
- reply_time = int(dialogue_info['sendtime'] / 1000)
|
|
|
- if "早上好" in reply_msg:
|
|
|
- continue
|
|
|
- elif "早安" in reply_msg:
|
|
|
- continue
|
|
|
- elif "早" in reply_msg:
|
|
|
- continue
|
|
|
- elif "下午好" in reply_msg:
|
|
|
- continue
|
|
|
- elif "晚上好" in reply_msg:
|
|
|
- continue
|
|
|
- elif user_activate_rate < 0.3:
|
|
|
- continue
|
|
|
- else:
|
|
|
- # obj = {
|
|
|
- # "staff_id": staff_id,
|
|
|
- # "user_id": user_id,
|
|
|
- # "conversation": history_conversation,
|
|
|
- # "reply_msg": reply_msg,
|
|
|
- # "reply_time": reply_time,
|
|
|
- # "user_active_rate": user_activate_rate
|
|
|
- # }
|
|
|
- conversation_id_list = [i['id'] for i in history_conversation]
|
|
|
- insert_query = f"""
|
|
|
- insert into internal_conversation_data
|
|
|
- (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate)
|
|
|
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
- """
|
|
|
- mysql_client.execute(insert_query, args=(
|
|
|
- '1',
|
|
|
- staff_id,
|
|
|
- user_id,
|
|
|
- '2025-06-16',
|
|
|
- json.dumps(conversation_id_list, ensure_ascii=False),
|
|
|
- reply_msg,
|
|
|
- reply_time,
|
|
|
- 0,
|
|
|
- user_activate_rate
|
|
|
- ))
|
|
|
- # print(len(data_set))
|
|
|
- # with open("reply_data_set_filter_2.json", "w", encoding="utf-8") as f:
|
|
|
- # f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
|
|
|
-
|
|
|
-
|
|
|
-def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str:
|
|
|
- role_map = {'user': '用户', 'assistant': '客服'}
|
|
|
- messages = []
|
|
|
- for msg in dialogue:
|
|
|
- if not msg['content']:
|
|
|
- continue
|
|
|
- if msg['role'] not in role_map:
|
|
|
- continue
|
|
|
- if timestamp_type == 'ms':
|
|
|
- format_dt = datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- else:
|
|
|
- format_dt = datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- msg_type = msg.get('type', MessageType.TEXT).description
|
|
|
- messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
|
|
|
- return '\n'.join(messages)
|
|
|
-
|
|
|
-
|
|
|
-def generate_push_dataset():
|
|
|
-
|
|
|
- fetch_query = f"""
|
|
|
- select staff_id, user_id, conversation, content, send_time, user_active_rate
|
|
|
- from internal_conversation_data
|
|
|
- where dataset_id = 1;
|
|
|
- """
|
|
|
- data_set = mysql_client.select(fetch_query, cursor_type=DictCursor)
|
|
|
- filter_conversation = [i for i in data_set if len(json.loads(i['conversation'])) >= 20]
|
|
|
-
|
|
|
- samples = random.sample(filter_conversation, 300)
|
|
|
-
|
|
|
- # init message push agent
|
|
|
- for sample in tqdm(samples):
|
|
|
- agent = MessagePushAgent()
|
|
|
- agent_profile = get_profile_info(sample["staff_id"], "staff")[0]['profile']
|
|
|
- agent_profile = json.loads(agent_profile)
|
|
|
- user_profile = get_profile_info(sample["user_id"], "user")[0]['profile']
|
|
|
- user_profile = json.loads(user_profile)
|
|
|
- conversation = get_dialogue_history_by_id(
|
|
|
- sample["staff_id"],
|
|
|
- tuple(json.loads(sample["conversation"]))
|
|
|
- )
|
|
|
- conversation.append(
|
|
|
- {
|
|
|
- "content": sample["content"],
|
|
|
- "role": "assistant",
|
|
|
- "timestamp": sample["send_time"] * 1000,
|
|
|
- # "type": 1
|
|
|
- }
|
|
|
- )
|
|
|
- conversation = sorted(conversation, key=lambda i: i['timestamp'], reverse=False)
|
|
|
-
|
|
|
- last_timestamp = int(conversation[-1]["timestamp"])
|
|
|
- push_time = int(last_timestamp / 1000) + 24 * 3600
|
|
|
- push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- try:
|
|
|
- push_message = agent.generate_message(
|
|
|
- context={
|
|
|
- "formatted_staff_profile": agent_profile,
|
|
|
- "nickname": user_profile.get('nickname'),
|
|
|
- "name": user_profile.get('name'),
|
|
|
- "preferred_nickname": user_profile.get('preferred_nickname'),
|
|
|
- "age": user_profile.get('age'),
|
|
|
- "region": user_profile.get('region'),
|
|
|
- "health_conditions": user_profile.get('health_conditions'),
|
|
|
- "gender": user_profile.get('gender'),
|
|
|
- "medications": user_profile.get('medications'),
|
|
|
- "interests": user_profile.get('interests'),
|
|
|
- "current_datetime": push_dt,
|
|
|
- "avatar": None
|
|
|
- },
|
|
|
- dialogue_history=conversation
|
|
|
- )
|
|
|
-
|
|
|
- if not push_message:
|
|
|
- print("push message error")
|
|
|
- continue
|
|
|
- else:
|
|
|
- print("push message success", push_message)
|
|
|
- insert_query = f"""
|
|
|
- insert into internal_conversation_data
|
|
|
- (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate)
|
|
|
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
- """
|
|
|
- mysql_client.execute(insert_query, args=(
|
|
|
- '2',
|
|
|
- sample["staff_id"],
|
|
|
- sample["user_id"],
|
|
|
- '2025-06-16',
|
|
|
- sample["conversation"],
|
|
|
- push_message,
|
|
|
- push_time,
|
|
|
- 1,
|
|
|
- sample["user_active_rate"]
|
|
|
- ))
|
|
|
- except Exception as e:
|
|
|
- print("error", e)
|
|
|
- print(traceback.format_exc())
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- generate_push_dataset()
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|