123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import json
- import time
- import random
- from datetime import datetime
- from openai import OpenAI
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- from pqai_agent.agents.message_push_agent import MessagePushAgent
- from pqai_agent.logging_service import logger
- from pqai_agent import configs, logging_service
- logging_service.setup_root_logger()
- config = {
- 'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com',
- 'port': 3306,
- 'user': 'wqsd',
- 'password': 'wqsd@2025',
- 'database': 'ai_agent',
- 'charset': 'utf8mb4'
- }
- mysql_client = MySQLManager(config)
- def split_dialogue_history(dialogue_history_, timeout=30*60*1000):
- """
- :param dialogue_history_:
- :param timeout: 30 minutes
- :return:
- """
- messages_sorted = sorted(dialogue_history_, key=lambda x: x['timestamp'])
- dialogues = []
- current_dialogue = []
- for i, msg in enumerate(messages_sorted):
- if not current_dialogue:
- current_dialogue.append(msg)
- continue
- prev_msg = messages_sorted[i - 1]
- time_diff = msg["timestamp"] - prev_msg["timestamp"]
- # 判断是否为新对话
- is_new_dialogue = False
- if time_diff > timeout:
- is_new_dialogue = True
- if is_new_dialogue:
- dialogues.append(current_dialogue)
- current_dialogue = [msg]
- else:
- current_dialogue.append(msg)
- if current_dialogue:
- dialogues.append(current_dialogue)
- return dialogues
- def get_conversation_info():
- sql = f"""
- select roomid, count(id) as 'article_num'
- from qywx_chat_history where msg_type in (1,2,4) group by roomid
- having count(id) > 20;
- """
- return mysql_client.select(sql, cursor_type=DictCursor)
- def get_dialogue_history(room_id_):
- """
- 获取对话历史
- :param room_id_:
- :return:
- """
- sql = f"""
- select id, sender, receiver, sendtime, content
- from qywx_chat_history
- where roomid = %s and msg_type in %s order by sendtime;
- """
- return mysql_client.select(sql=sql, cursor_type=DictCursor, args=(room_id_, (1, 2, 4)))
- def get_profile_info(user_id_, user_type):
- match user_type:
- case "user":
- sql = f"""
- select iconurl as 'avatar', profile_data_v1 as 'profile'
- from third_party_user where third_party_user_id = %s;
- """
- case "staff":
- sql = f"""
- select agent_profile as 'profile'
- from qywx_employee where third_party_user_id = %s;
- """
- case _:
- raise ValueError("user_type must be 'user' or 'staff'")
- return mysql_client.select(sql, cursor_type=DictCursor, args=(user_id_,))
- def generate_reply_dataset():
- conversation_info_list = get_conversation_info()
- data_set = []
- for conversation_info in tqdm(conversation_info_list):
- room_id = conversation_info["roomid"]
- staff_id = room_id.split(":")[1]
- if staff_id in ('1688854974625870', '1688856125791790', '1688856125791452'):
- user_id = room_id.split(":")[2]
- if staff_id and user_id:
- dialogue_history = get_dialogue_history(room_id)
- for idx, dialogue_info in enumerate(dialogue_history):
- if dialogue_info["sender"] == staff_id:
- conversation = dialogue_history[: idx]
- history_conversation = [
- {
- "id": i['id'],
- "content": i['content'],
- "role": "assistant" if i['sender'] == staff_id else "user",
- "timestamp": int(i['sendtime'] / 1000)
- } for i in conversation
- ]
- # filter history_conversation
- history_conversation = [i for i in history_conversation if i['timestamp'] > int(dialogue_info['sendtime'] / 1000) - 60 * 60 * 24 * 30]
- if len(history_conversation) > 20:
- history_conversation = history_conversation[-20:]
- eva_conversation = history_conversation[-10:]
- if history_conversation:
- user_activate_rate= len([i for i in eva_conversation if i['role'] == 'user']) / len(eva_conversation)
- reply_msg = dialogue_info['content']
- reply_time = int(dialogue_info['sendtime'] / 1000)
- if "早上好" in reply_msg:
- continue
- elif "早安" in reply_msg:
- continue
- elif "早" in reply_msg:
- continue
- elif "下午好" in reply_msg:
- continue
- elif "晚上好" in reply_msg:
- continue
- elif user_activate_rate < 0.3:
- continue
- else:
- # obj = {
- # "staff_id": staff_id,
- # "user_id": user_id,
- # "conversation": history_conversation,
- # "reply_msg": reply_msg,
- # "reply_time": reply_time,
- # "user_active_rate": user_activate_rate
- # }
- conversation_id_list = [i['id'] for i in history_conversation]
- insert_query = f"""
- insert into internal_conversation_data
- (dataset_id, staff_id, user_id, version_date, conversation, content, send_time, send_type, user_active_rate)
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- mysql_client.execute(insert_query, args=(
- '1',
- staff_id,
- user_id,
- '2025-06-16',
- json.dumps(conversation_id_list, ensure_ascii=False),
- reply_msg,
- reply_time,
- 0,
- user_activate_rate
- ))
- # print(len(data_set))
- # with open("reply_data_set_filter_2.json", "w", encoding="utf-8") as f:
- # f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
- def generate_push_dataset():
- conversation_info_list = get_conversation_info()
- data_set = []
- for conversation_info in conversation_info_list:
- room_id = conversation_info["roomid"]
- staff_id = room_id.split(":")[1]
- # if staff_id in ('1688854974625870', '1688856125791790', '1688856125791452'):
- user_id = room_id.split(":")[2]
- if staff_id and user_id:
- history_push_message = [] # 所有 push 消息历史(这个变量会保留到每次循环)
- dialogue_history = get_dialogue_history(room_id)
- for idx, dialogue_info in enumerate(dialogue_history):
- if idx == 0:
- continue # 防止访问 idx - 1
- sender = dialogue_info["sender"]
- send_timestamp = int(dialogue_info["sendtime"] / 1000)
- before_message = dialogue_history[idx - 1]
- before_send_timestamp = int(before_message["sendtime"] / 1000)
- if sender == staff_id and (send_timestamp - before_send_timestamp) >= 86400:
- push_msg = dialogue_info['content']
- if push_msg == '早安,新的一天,愿你拥有最好的心情去迎接一切美好!爆款视频抢先观看,点击下方精彩不断~':
- continue
- else:
- print( datetime.fromtimestamp(send_timestamp), push_msg)
- conversation = [
- i for i in dialogue_history[: idx] if i['content'] != '早安,新的一天,愿你拥有最好的心情去迎接一切美好!爆款视频抢先观看,点击下方精彩不断~'
- ]
- history_conversation = [
- {
- "content": i['content'],
- "role": "assistant" if i['sender'] == staff_id else "user",
- "timestamp": int(i['sendtime'] / 1000)
- }
- for i in conversation
- if int(i['sendtime'] / 1000) > send_timestamp - 86400 * 5
- ]
- if len(history_conversation) > 50:
- history_conversation = history_conversation[-50:]
- if history_conversation:
- push_msg = dialogue_info['content']
- obj = {
- "staff_id": staff_id,
- "user_id": user_id,
- "conversation": history_conversation,
- "push_msg": push_msg,
- "push_time": datetime.fromtimestamp(send_timestamp).strftime("%Y-%m-%d %H:%M:%S"),
- "history_push_messages": history_push_message.copy()
- }
- data_set.append(obj)
- history_push_message.append(push_msg)
- with open("push_message_dataset_v4.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
- def generate_push_dataset_new():
- import json
- with open("reply_data_set_filter_2.json", "r", encoding="utf-8") as f:
- data_set = json.loads(f.read())
- #
- filter_conversation = [i for i in data_set if len(i['conversation']) >= 20]
- samples =random.sample(filter_conversation, 50)
- # with open("push_dataset_new_0613_24h.json", encoding="utf-8") as f:
- # samples = json.load(f)
- # init message push agent
- agent = MessagePushAgent()
- for sample in tqdm(samples):
- agent_profile = get_profile_info(sample["staff_id"], "staff")[0]['profile']
- agent_profile = json.loads(agent_profile)
- user_profile = get_profile_info(sample["user_id"], "user")[0]['profile']
- user_profile = json.loads(user_profile)
- # agent_profile = sample["agent_profile"]
- # user_profile = sample["user_profile"]
- conversation = sorted(sample["conversation"], key=lambda i: i['timestamp'], reverse=False)
- last_timestamp = int(conversation[-1]["timestamp"])
- push_time = last_timestamp + 48 * 3600
- push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
- try:
- response = agent.generate_message(
- context={
- "formatted_staff_profile": agent_profile,
- "nickname": user_profile.get('nickname'),
- "name": user_profile.get('name'),
- "preferred_nickname": user_profile.get('preferred_nickname'),
- "age": user_profile.get('age'),
- "region": user_profile.get('region'),
- "health_conditions": user_profile.get('health_conditions'),
- "gender": user_profile.get('gender'),
- "medications": user_profile.get('medications'),
- "interests": user_profile.get('interests'),
- "current_datetime": push_dt,
- "avatar": None
- },
- dialogue_history=sample["conversation"],
- timestamp_type="s"
- )
- print("---------push消息----------", response)
- print("\n")
- sample['push_msg'] = response
- sample['user_profile'] = user_profile
- sample['agent_profile'] = agent_profile
- sample['push_time'] = push_time
- data_set.append(sample)
- except Exception as e:
- print("error", e)
- with open("push_dataset_new_0613_24h_v2.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(data_set, ensure_ascii=False, indent=4))
- if __name__ == "__main__":
- generate_reply_dataset()
|