123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import json
- import time
- import random
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- config = {
- 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com',
- 'port': 3306,
- 'user': 'wqsd',
- 'password': 'wqsd@2025',
- 'database': 'ai_agent',
- 'charset': 'utf8mb4'
- }
- mysql_client = MySQLManager(config)
- def split_dialogue_history(dialogue_history_, timeout=30*60*1000):
- """
- :param dialogue_history_:
- :param timeout: 30 minutes
- :return:
- """
- messages_sorted = sorted(dialogue_history_, key=lambda x: x['timestamp'])
- dialogues = []
- current_dialogue = []
- for i, msg in enumerate(messages_sorted):
- if not current_dialogue:
- current_dialogue.append(msg)
- continue
- prev_msg = messages_sorted[i - 1]
- time_diff = msg["timestamp"] - prev_msg["timestamp"]
- # 判断是否为新对话
- is_new_dialogue = False
- if time_diff > timeout:
- is_new_dialogue = True
- if is_new_dialogue:
- dialogues.append(current_dialogue)
- current_dialogue = [msg]
- else:
- current_dialogue.append(msg)
- if current_dialogue:
- dialogues.append(current_dialogue)
- return dialogues
- def get_conversation_info():
- sql = f"""
- select roomid, count(id) as 'article_num'
- from qywx_chat_history where msg_type = 1 group by roomid
- having count(id) > 20;
- """
- return mysql_client.select(sql, cursor_type=DictCursor)
- def get_dialogue_history(room_id_):
- """
- 获取对话历史
- :param room_id_:
- :return:
- """
- sql = f"""
- select sender, receiver, sendtime, content
- from qywx_chat_history
- where roomid = %s and msg_type = %s;
- """
- return mysql_client.select(sql=sql, cursor_type=DictCursor, args=(room_id_, 1))
- def get_profile_info(user_id_, user_type):
- match user_type:
- case "user":
- sql = f"""
- select iconurl as 'avatar', profile_data_v1 as 'profile'
- from third_party_user where third_party_user_id = %s;
- """
- case "staff":
- sql = f"""
- select agent_profile as 'profile'
- from qywx_employee where third_party_user_id = %s;
- """
- case _:
- raise ValueError("user_type must be 'user' or 'staff'")
- return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
- if __name__ == "__main__":
- conversation_info_list = get_conversation_info()
- data_set = []
- for conversation_info in tqdm(conversation_info_list):
- room_id = conversation_info["roomid"]
- staff_id = room_id.split(":")[1]
- user_id = room_id.split(":")[2]
- if staff_id and user_id:
- dialogue_history = get_dialogue_history(room_id)
- for idx, dialogue_info in enumerate(dialogue_history):
- if dialogue_info["sender"] == staff_id:
- conversation = dialogue_history[: idx]
- history_conversation = [
- {
- "content": i['content'],
- "role": "assistant" if i['sender'] == staff_id else "user",
- "timestamp": int(i['sendtime'] / 1000)
- } for i in conversation]
- # filter history_conversation
- history_conversation = [i for i in history_conversation if i['timestamp'] > int(dialogue_info['sendtime'] / 1000) - 60 * 60 * 24 * 30]
- if len(history_conversation) > 100:
- history_conversation = history_conversation[-100:]
- reply_msg = dialogue_info['content']
- obj = {
- "staff_id": staff_id,
- "user_id": user_id,
- "conversation": history_conversation,
- "reply_msg": reply_msg
- }
- data_set.append(obj)
- with open("reply_data_set_filter.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
|