push_service.py 13 KB


  1. import json
  2. import time
  3. import traceback
  4. import uuid
  5. from datetime import datetime
  6. from enum import Enum
  7. from concurrent.futures import ThreadPoolExecutor
  8. from threading import Thread
  9. from typing import Optional, Dict, List
  10. import rocketmq
  11. from rocketmq import ClientConfiguration, Credentials, SimpleConsumer, FilterExpression
  12. from pqai_agent import configs
  13. from pqai_agent.abtest.utils import get_abtest_info
  14. from pqai_agent.agents.message_push_agent import MessagePushAgent, DummyMessagePushAgent
  15. from pqai_agent.configs import apollo_config
  16. from pqai_agent.data_models.agent_push_record import AgentPushRecord
  17. from pqai_agent.logging import logger
  18. from pqai_agent.mq_message import MessageType
  19. from pqai_agent.toolkit import get_tools
  20. from pqai_agent.utils.agent_abtest_utils import get_agent_abtest_config
  21. class TaskType(Enum):
  22. GENERATE = "generate"
  23. SEND = "send"
  24. def generate_task_rmq_message(topic: str, staff_id: str, user_id: str, task_type: TaskType, content: Optional[str] = None) -> rocketmq.Message:
  25. msg = rocketmq.Message()
  26. msg.topic = topic
  27. msg.body = json.dumps({
  28. 'staff_id': staff_id,
  29. 'user_id': user_id,
  30. 'task_type': task_type.value,
  31. # NOTE:通过传入JSON支持多模态消息
  32. 'content': content or '',
  33. 'timestamp': int(time.time() * 1000),
  34. }, ensure_ascii=False).encode('utf-8')
  35. msg.tag = task_type.value
  36. return msg
  37. class PushScanThread:
  38. # PushScanThread实际可以是AgentService的一个函数,从AgentService中独立的主要考虑因素为Push后续可能有拆分和扩展
  39. def __init__(self, staff_id: str, agent_service: 'AgentService', mq_topic: str, mq_producer: rocketmq.Producer):
  40. self.staff_id = staff_id
  41. # 需要大量使用AgentService内部的成员
  42. self.service = agent_service
  43. self.rmq_topic = mq_topic
  44. self.rmq_producer = mq_producer
  45. def run(self):
  46. white_list_tags = set(apollo_config.get_json_value('agent_initiate_whitelist_tags', []))
  47. first_initiate_tags = set(apollo_config.get_json_value('agent_first_initiate_whitelist_tags', []))
  48. # 合并白名单,减少配置成本
  49. white_list_tags.update(first_initiate_tags)
  50. all_staff_users = self.service.user_relation_manager.list_staff_users(staff_id=self.staff_id)
  51. all_users = list({pair['user_id'] for pair in all_staff_users})
  52. all_user_tags = self.service.user_manager.get_user_tags(all_users)
  53. for staff_user in all_staff_users:
  54. staff_id = staff_user['staff_id']
  55. user_id = staff_user['user_id']
  56. # 通过AB实验配置控制用户组是否启用push
  57. # abtest_params = get_abtest_info(user_id).params
  58. # if abtest_params.get('agent_push_enabled', 'false').lower() != 'true':
  59. # logger.debug(f"User {user_id} not enabled agent push, skipping.")
  60. # continue
  61. user_tags = all_user_tags.get(user_id, list())
  62. if not white_list_tags.intersection(user_tags):
  63. should_initiate = False
  64. else:
  65. agent = self.service.get_agent_instance(staff_id, user_id)
  66. should_initiate = agent.should_initiate_conversation()
  67. if should_initiate:
  68. logger.info(f"user[{user_id}], tags{user_tags}: generate a generation task for conversation initiation")
  69. rmq_msg = generate_task_rmq_message(self.rmq_topic, staff_id, user_id, TaskType.GENERATE)
  70. self.rmq_producer.send(rmq_msg)
  71. else:
  72. logger.debug(f"user[{user_id}], do not initiate conversation")
  73. class PushTaskWorkerPool:
  74. def __init__(self, agent_service: 'AgentService', mq_topic: str,
  75. mq_consumer_generate: rocketmq.SimpleConsumer,
  76. mq_consumer_send: rocketmq.SimpleConsumer,
  77. mq_producer: rocketmq.Producer):
  78. self.agent_service = agent_service
  79. max_workers = configs.get()['system'].get('push_task_workers', 5)
  80. self.max_push_workers = max_workers
  81. self.generate_executor = ThreadPoolExecutor(max_workers=max_workers)
  82. self.send_executors = {}
  83. self.rmq_topic = mq_topic
  84. self.generate_consumer = mq_consumer_generate
  85. self.send_consumer = mq_consumer_send
  86. self.producer = mq_producer
  87. self.generate_loop_thread = None
  88. self.send_loop_thread = None
  89. self.is_generator_running = True
  90. self.generate_send_done = False # set by wait_to_finish,表示所有生成任务均已进入队列
  91. self.no_more_generate_task = False # generate_send_done被设置之后队列中未再收到生成任务时设置
  92. def start(self):
  93. self.send_loop_thread = Thread(target=self.process_send_tasks)
  94. self.send_loop_thread.start()
  95. self.generate_loop_thread = Thread(target=self.process_generate_tasks)
  96. self.generate_loop_thread.start()
  97. def process_send_tasks(self):
  98. time.sleep(1)
  99. while True:
  100. msgs = self.send_consumer.receive(1, 60)
  101. if not msgs:
  102. # 没有生成任务在执行且没有消息,才可退出
  103. if self.no_more_generate_task and not self.is_generator_running:
  104. break
  105. else:
  106. continue
  107. msg = msgs[0]
  108. task = json.loads(msg.body.decode('utf-8'))
  109. msg_time = datetime.fromtimestamp(task['timestamp'] / 1000).strftime("%Y-%m-%d %H:%M:%S")
  110. logger.debug(f"recv message:{msg_time} - {task}")
  111. if task['task_type'] == TaskType.SEND.value:
  112. staff_id = task['staff_id']
  113. if staff_id not in self.send_executors:
  114. self.send_executors[staff_id] = ThreadPoolExecutor(max_workers=1)
  115. self.send_executors[staff_id].submit(self.handle_send_task, task, msg)
  116. else:
  117. logger.error(f"Unknown task type: {task['task_type']}")
  118. self.send_consumer.ack(msg)
  119. logger.info("PushGenerateWorkerPool send thread stopped")
  120. def process_generate_tasks(self):
  121. time.sleep(1)
  122. while True:
  123. if self.generate_executor._work_queue.qsize() > self.max_push_workers * 2:
  124. logger.warning("Too many generate tasks in queue, consume later")
  125. time.sleep(10)
  126. continue
  127. msgs = self.generate_consumer.receive(1, 300)
  128. if not msgs:
  129. # 生成任务已经创建完成 且 未收到新任务,才可退出
  130. if self.generate_send_done:
  131. logger.debug("no message received, there should be no more generate task")
  132. self.no_more_generate_task = True
  133. break
  134. else:
  135. continue
  136. msg = msgs[0]
  137. task = json.loads(msg.body.decode('utf-8'))
  138. msg_time = datetime.fromtimestamp(task['timestamp'] / 1000).strftime("%Y-%m-%d %H:%M:%S")
  139. logger.debug(f"recv message:{msg_time} - {task}")
  140. if task['task_type'] == TaskType.GENERATE.value:
  141. self.generate_executor.submit(self.handle_generate_task, task, msg)
  142. else:
  143. self.generate_consumer.ack(msg)
  144. logger.info("PushGenerateWorkerPool generator thread stopped")
  145. def wait_to_finish(self):
  146. self.generate_send_done = True
  147. while not self.no_more_generate_task:
  148. #FIXME(zhoutian): condition variable should be used to replace time sleep
  149. time.sleep(1)
  150. self.generate_executor.shutdown(wait=True)
  151. self.is_generator_running = False
  152. self.generate_loop_thread.join()
  153. self.send_loop_thread.join()
  154. def handle_send_task(self, task: Dict, msg: rocketmq.Message):
  155. try:
  156. staff_id = task['staff_id']
  157. user_id = task['user_id']
  158. agent = self.agent_service.get_agent_instance(staff_id, user_id)
  159. # 二次校验是否需要发送
  160. if not agent.should_initiate_conversation():
  161. logger.debug(f"user[{user_id}], should not initiate, skip sending task")
  162. self.send_consumer.ack(msg)
  163. return
  164. contents: List[Dict] = json.loads(task['content'])
  165. if not contents:
  166. logger.debug(f"staff[{staff_id}], user[{user_id}]: empty content, do not send")
  167. self.send_consumer.ack(msg)
  168. return
  169. recent_dialogue = agent.dialogue_history[-10:]
  170. agent_voice_whitelist = set(apollo_config.get_json_value("agent_voice_whitelist", []))
  171. messages_to_send = []
  172. current_ts = int(time.time())
  173. for item in contents:
  174. item["timestamp"] = current_ts * 1000
  175. if item["type"] == "text":
  176. if staff_id not in agent_voice_whitelist:
  177. message_type = MessageType.TEXT
  178. else:
  179. message_type = self.agent_service.response_type_detector.detect_type(
  180. recent_dialogue, item, enable_random=True)
  181. response = agent.generate_response(item["content"])
  182. if response:
  183. messages_to_send.append({'type': message_type, 'content': response})
  184. else:
  185. message_type = MessageType.from_str(item["type"])
  186. response = agent.generate_multimodal_response(item)
  187. if response:
  188. item["type"] = message_type
  189. messages_to_send.append(item)
  190. with self.agent_service.agent_db_session_maker() as session:
  191. msg_list = [{"type": msg["type"].value, "content": msg["content"]} for msg in messages_to_send]
  192. record = AgentPushRecord(staff_id=staff_id, user_id=user_id,
  193. content=json.dumps(msg_list, ensure_ascii=False),
  194. timestamp=current_ts)
  195. session.add(record)
  196. session.commit()
  197. if messages_to_send:
  198. for response in messages_to_send:
  199. self.agent_service.send_multimodal_response(staff_id, user_id, response, skip_check=True)
  200. agent.update_last_active_interaction_time(current_ts)
  201. else:
  202. logger.debug(f"staff[{staff_id}], user[{user_id}]: generate empty response")
  203. self.send_consumer.ack(msg)
  204. except Exception as e:
  205. fmt_exc = traceback.format_exc()
  206. logger.error(f"Error processing message sending: {e}, {fmt_exc}")
  207. self.send_consumer.ack(msg)
  208. def handle_generate_task(self, task: Dict, msg: rocketmq.Message):
  209. try:
  210. staff_id = task['staff_id']
  211. user_id = task['user_id']
  212. main_agent = self.agent_service.get_agent_instance(staff_id, user_id)
  213. agent_config = get_agent_abtest_config('push', user_id,
  214. self.agent_service.service_module_manager,
  215. self.agent_service.agent_config_manager)
  216. if agent_config:
  217. try:
  218. tool_names = json.loads(agent_config.tools)
  219. except json.JSONDecodeError:
  220. logger.error(f"Invalid JSON in agent tools: {agent_config.tools}")
  221. tool_names = []
  222. push_agent = MessagePushAgent(model=agent_config.execution_model,
  223. system_prompt=agent_config.system_prompt,
  224. tools=get_tools(tool_names))
  225. query_prompt_template = agent_config.task_prompt
  226. else:
  227. push_agent = MessagePushAgent()
  228. query_prompt_template = None
  229. message_to_user = push_agent.generate_message(
  230. context=main_agent.get_prompt_context(None),
  231. dialogue_history=self.agent_service.history_dialogue_db.get_dialogue_history_backward(
  232. staff_id, user_id, main_agent.last_interaction_time_ms, limit=30
  233. ),
  234. query_prompt_template=query_prompt_template
  235. )
  236. cost = push_agent.get_total_cost()
  237. logger.debug(f"staff[{staff_id}], user[{user_id}]: push message generation cost: {cost}")
  238. if message_to_user:
  239. rmq_message = generate_task_rmq_message(
  240. self.rmq_topic, staff_id, user_id, TaskType.SEND, json.dumps(message_to_user))
  241. self.producer.send(rmq_message)
  242. else:
  243. logger.info(f"staff[{staff_id}], user[{user_id}]: no push message generated")
  244. self.generate_consumer.ack(msg)
  245. except Exception as e:
  246. fmt_exc = traceback.format_exc()
  247. logger.error(f"Error processing message generation: {e}, {fmt_exc}")
  248. # FIXME: 是否需要ACK
  249. self.generate_consumer.ack(msg)