import json import time import random import traceback from datetime import datetime from typing import Dict, List, Tuple from openai import OpenAI from tqdm import tqdm from pymysql.cursors import DictCursor from pqai_agent.database import MySQLManager from pqai_agent.agents.message_push_agent import MessagePushAgent from pqai_agent.logging_service import logger from pqai_agent import configs, logging_service from pqai_agent.mq_message import MessageType logging_service.setup_root_logger() config = { 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com', 'port': 3306, 'user': 'wqsd', 'password': 'wqsd@2025', 'database': 'ai_agent', 'charset': 'utf8mb4' } mysql_client = MySQLManager(config) def split_dialogue_history(dialogue_history_, timeout=30*60*1000): """ :param dialogue_history_: :param timeout: 30 minutes :return: """ messages_sorted = sorted(dialogue_history_, key=lambda x: x['timestamp']) dialogues = [] current_dialogue = [] for i, msg in enumerate(messages_sorted): if not current_dialogue: current_dialogue.append(msg) continue prev_msg = messages_sorted[i - 1] time_diff = msg["timestamp"] - prev_msg["timestamp"] # 判断是否为新对话 is_new_dialogue = False if time_diff > timeout: is_new_dialogue = True if is_new_dialogue: dialogues.append(current_dialogue) current_dialogue = [msg] else: current_dialogue.append(msg) if current_dialogue: dialogues.append(current_dialogue) return dialogues def get_conversation_info(): sql = f""" select roomid, count(id) as 'article_num' from qywx_chat_history where msg_type in (1,2,4) group by roomid having count(id) > 20; """ return mysql_client.select(sql, cursor_type=DictCursor) def get_dialogue_history(room_id_): """ 获取对话历史 :param room_id_: :return: """ sql = f""" select id, sender, receiver, sendtime, content from qywx_chat_history where roomid = %s and msg_type in %s order by sendtime; """ return mysql_client.select(sql=sql, cursor_type=DictCursor, args=(room_id_, (1, 2, 4))) def get_dialogue_history_by_id(staff_id, dialogue_id_tuple): sql = f""" select sender, sendtime, content from qywx_chat_history where id in %s; """ conversation_list = mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,)) history_conversation = [ { "content": i['content'], "role": "assistant" if i['sender'] == staff_id else "user", "timestamp": i['sendtime'] } for i in conversation_list ] return history_conversation def get_profile_info(user_id_, user_type): match user_type: case "user": sql = f""" select iconurl as 'avatar', profile_data_v1 as 'profile' from third_party_user where third_party_user_id = %s; """ case "staff": sql = f""" select agent_profile as 'profile' from qywx_employee where third_party_user_id = %s; """ case _: raise ValueError("user_type must be 'user' or 'staff'") return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,)) def generate_reply_dataset(): conversation_info_list = get_conversation_info() data_set = [] for conversation_info in tqdm(conversation_info_list): room_id = conversation_info["roomid"] staff_id = room_id.split(":")[1] if staff_id in ('1688854974625870', '1688856125791790', '1688856125791452'): user_id = room_id.split(":")[2] if staff_id and user_id: dialogue_history = get_dialogue_history(room_id) for idx, dialogue_info in enumerate(dialogue_history): if dialogue_info["sender"] == staff_id: conversation = dialogue_history[: idx] history_conversation = [ { "id": i['id'], "content": i['content'], "role": "assistant" if i['sender'] == staff_id else "user", "timestamp": int(i['sendtime'] / 1000) } for i in conversation ] # filter history_conversation history_conversation = [i for i in history_conversation if i['timestamp'] > int(dialogue_info['sendtime'] / 1000) - 60 * 60 * 24 * 30] if len(history_conversation) > 20: history_conversation = history_conversation[-20:] eva_conversation = history_conversation[-10:] if history_conversation: user_activate_rate= len([i for i in eva_conversation if i['role'] == 'user']) / len(eva_conversation) reply_msg = dialogue_info['content'] reply_time = int(dialogue_info['sendtime'] / 1000) if "早上好" in reply_msg: continue elif "早安" in reply_msg: continue elif "早" in reply_msg: continue elif "下午好" in reply_msg: continue elif "晚上好" in reply_msg: continue elif user_activate_rate < 0.3: continue else: # obj = { # "staff_id": staff_id, # "user_id": user_id, # "conversation": history_conversation, # "reply_msg": reply_msg, # "reply_time": reply_time, # "user_active_rate": user_activate_rate # } conversation_id_list = [i['id'] for i in history_conversation] insert_query = f""" insert into internal_conversation_data (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ mysql_client.execute(insert_query, args=( '1', staff_id, user_id, '2025-06-16', json.dumps(conversation_id_list, ensure_ascii=False), reply_msg, reply_time, 0, user_activate_rate )) # print(len(data_set)) # with open("reply_data_set_filter_2.json", "w", encoding="utf-8") as f: # f.write(json.dumps(data_set, ensure_ascii=False, indent=4)) def compose_dialogue(dialogue: List[Dict], timestamp_type: str='ms') -> str: role_map = {'user': '用户', 'assistant': '客服'} messages = [] for msg in dialogue: if not msg['content']: continue if msg['role'] not in role_map: continue if timestamp_type == 'ms': format_dt = datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S') else: format_dt = datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S') msg_type = msg.get('type', MessageType.TEXT).description messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content'])) return '\n'.join(messages) def generate_push_dataset(): fetch_query = f""" select staff_id, user_id, conversation, content, send_time, user_active_rate from internal_conversation_data where dataset_id = 1; """ data_set = mysql_client.select(fetch_query, cursor_type=DictCursor) filter_conversation = [i for i in data_set if len(json.loads(i['conversation'])) >= 20] samples =random.sample(filter_conversation, 100) # init message push agent agent = MessagePushAgent() for sample in tqdm(samples): agent_profile = get_profile_info(sample["staff_id"], "staff")[0]['profile'] agent_profile = json.loads(agent_profile) user_profile = get_profile_info(sample["user_id"], "user")[0]['profile'] user_profile = json.loads(user_profile) conversation = get_dialogue_history_by_id( sample["staff_id"], tuple(json.loads(sample["conversation"])) ) conversation.append( { "content": sample["content"], "role": "assistant", "timestamp": sample["send_time"] * 1000, # "type": 1 } ) conversation = sorted(conversation, key=lambda i: i['timestamp'], reverse=False) last_timestamp = int(conversation[-1]["timestamp"]) push_time = int(last_timestamp / 1000) + 24 * 3600 push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S') try: push_message = agent.generate_message( context={ "formatted_staff_profile": agent_profile, "nickname": user_profile.get('nickname'), "name": user_profile.get('name'), "preferred_nickname": user_profile.get('preferred_nickname'), "age": user_profile.get('age'), "region": user_profile.get('region'), "health_conditions": user_profile.get('health_conditions'), "gender": user_profile.get('gender'), "medications": user_profile.get('medications'), "interests": user_profile.get('interests'), "current_datetime": push_dt, "avatar": None }, dialogue_history=conversation ) if not push_message: print("push message error") continue else: print("push message success", push_message) insert_query = f""" insert into internal_conversation_data (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ mysql_client.execute(insert_query, args=( '2', sample["staff_id"], sample["user_id"], '2025-06-16', sample["conversation"], push_message, push_time, 1, sample["user_active_rate"] )) except Exception as e: print("error", e) print(traceback.format_exc()) if __name__ == "__main__": generate_push_dataset()