import json import time import random import traceback from datetime import datetime from openai import OpenAI from tqdm import tqdm from pymysql.cursors import DictCursor from pqai_agent.database import MySQLManager from pqai_agent.agents.message_push_agent import MessagePushAgent from pqai_agent.logging_service import logger from pqai_agent import configs, logging_service logging_service.setup_root_logger() config = { 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com', 'port': 3306, 'user': 'wqsd', 'password': 'wqsd@2025', 'database': 'ai_agent', 'charset': 'utf8mb4' } mysql_client = MySQLManager(config) def split_dialogue_history(dialogue_history_, timeout=30*60*1000): """ :param dialogue_history_: :param timeout: 30 minutes :return: """ messages_sorted = sorted(dialogue_history_, key=lambda x: x['timestamp']) dialogues = [] current_dialogue = [] for i, msg in enumerate(messages_sorted): if not current_dialogue: current_dialogue.append(msg) continue prev_msg = messages_sorted[i - 1] time_diff = msg["timestamp"] - prev_msg["timestamp"] # 判断是否为新对话 is_new_dialogue = False if time_diff > timeout: is_new_dialogue = True if is_new_dialogue: dialogues.append(current_dialogue) current_dialogue = [msg] else: current_dialogue.append(msg) if current_dialogue: dialogues.append(current_dialogue) return dialogues def get_conversation_info(): sql = f""" select roomid, count(id) as 'article_num' from qywx_chat_history where msg_type in (1,2,4) group by roomid having count(id) > 20; """ return mysql_client.select(sql, cursor_type=DictCursor) def get_dialogue_history(room_id_): """ 获取对话历史 :param room_id_: :return: """ sql = f""" select id, sender, receiver, sendtime, content from qywx_chat_history where roomid = %s and msg_type in %s order by sendtime; """ return mysql_client.select(sql=sql, cursor_type=DictCursor, args=(room_id_, (1, 2, 4))) def get_dialogue_history_by_id(staff_id, dialogue_id_tuple): sql = f""" select sender, sendtime, content from qywx_chat_history where id in %s; """ conversation_list = mysql_client.select(sql=sql, cursor_type=DictCursor, args=(dialogue_id_tuple,)) history_conversation = [ { "content": i['content'], "role": "assistant" if i['sender'] == staff_id else "user", "timestamp": int(i['sendtime'] / 1000) } for i in conversation_list ] return history_conversation def get_profile_info(user_id_, user_type): match user_type: case "user": sql = f""" select iconurl as 'avatar', profile_data_v1 as 'profile' from third_party_user where third_party_user_id = %s; """ case "staff": sql = f""" select agent_profile as 'profile' from qywx_employee where third_party_user_id = %s; """ case _: raise ValueError("user_type must be 'user' or 'staff'") return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,)) def generate_reply_dataset(): conversation_info_list = get_conversation_info() data_set = [] for conversation_info in tqdm(conversation_info_list): room_id = conversation_info["roomid"] staff_id = room_id.split(":")[1] if staff_id in ('1688854974625870', '1688856125791790', '1688856125791452'): user_id = room_id.split(":")[2] if staff_id and user_id: dialogue_history = get_dialogue_history(room_id) for idx, dialogue_info in enumerate(dialogue_history): if dialogue_info["sender"] == staff_id: conversation = dialogue_history[: idx] history_conversation = [ { "id": i['id'], "content": i['content'], "role": "assistant" if i['sender'] == staff_id else "user", "timestamp": int(i['sendtime'] / 1000) } for i in conversation ] # filter history_conversation history_conversation = [i for i in history_conversation if i['timestamp'] > int(dialogue_info['sendtime'] / 1000) - 60 * 60 * 24 * 30] if len(history_conversation) > 20: history_conversation = history_conversation[-20:] eva_conversation = history_conversation[-10:] if history_conversation: user_activate_rate= len([i for i in eva_conversation if i['role'] == 'user']) / len(eva_conversation) reply_msg = dialogue_info['content'] reply_time = int(dialogue_info['sendtime'] / 1000) if "早上好" in reply_msg: continue elif "早安" in reply_msg: continue elif "早" in reply_msg: continue elif "下午好" in reply_msg: continue elif "晚上好" in reply_msg: continue elif user_activate_rate < 0.3: continue else: # obj = { # "staff_id": staff_id, # "user_id": user_id, # "conversation": history_conversation, # "reply_msg": reply_msg, # "reply_time": reply_time, # "user_active_rate": user_activate_rate # } conversation_id_list = [i['id'] for i in history_conversation] insert_query = f""" insert into internal_conversation_data (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ mysql_client.execute(insert_query, args=( '1', staff_id, user_id, '2025-06-16', json.dumps(conversation_id_list, ensure_ascii=False), reply_msg, reply_time, 0, user_activate_rate )) # print(len(data_set)) # with open("reply_data_set_filter_2.json", "w", encoding="utf-8") as f: # f.write(json.dumps(data_set, ensure_ascii=False, indent=4)) def generate_push_dataset(): fetch_query = f""" select staff_id, user_id, conversation, content, send_time, user_active_rate from internal_conversation_data where dataset_id = 1; """ data_set = mysql_client.select(fetch_query, cursor_type=DictCursor) filter_conversation = [i for i in data_set if len(json.loads(i['conversation'])) >= 20] print(len(filter_conversation)) samples =random.sample(filter_conversation, 100) # init message push agent agent = MessagePushAgent() for sample in tqdm(samples): agent_profile = get_profile_info(sample["staff_id"], "staff")[0]['profile'] agent_profile = json.loads(agent_profile) user_profile = get_profile_info(sample["user_id"], "user")[0]['profile'] user_profile = json.loads(user_profile) conversation = get_dialogue_history_by_id( sample["staff_id"], tuple(sample["conversation"]) ) conversation.append( { "content": sample["content"], "role": "assistant", "timestamp": sample["send_time"] } ) conversation = sorted(conversation, key=lambda i: i['timestamp'], reverse=False) last_timestamp = int(conversation[-1]["timestamp"]) push_time = last_timestamp + 48 * 3600 push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S') try: push_message = agent.generate_message( context={ "formatted_staff_profile": agent_profile, "nickname": user_profile.get('nickname'), "name": user_profile.get('name'), "preferred_nickname": user_profile.get('preferred_nickname'), "age": user_profile.get('age'), "region": user_profile.get('region'), "health_conditions": user_profile.get('health_conditions'), "gender": user_profile.get('gender'), "medications": user_profile.get('medications'), "interests": user_profile.get('interests'), "current_datetime": push_dt, "avatar": None }, dialogue_history=conversation, timestamp_type="s" ) insert_query = f""" insert into internal_conversation_data (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ mysql_client.execute(insert_query, args=( '3', sample["staff_id"], sample["user_id"], '2025-06-16', sample["conversation"], push_message, push_time, 1, sample["user_active_rate"] )) except Exception as e: print("error", e) print(traceback.format_exc()) if __name__ == "__main__": generate_push_dataset()