瀏覽代碼

Fixup: message sender and receiver

StrayWarrior 3 周之前
父節點
當前提交
ce2db4b66a
共有 2 個文件被更改,包括 8 次插入4 次删除
  1. 6 4
      agent_service.py
  2. 2 0
      user_manager.py

+ 6 - 4
agent_service.py

@@ -92,10 +92,10 @@ class AgentService:
         self.user_manager.save_user_profile(user_id, merged_profile)
         return merged_profile
 
-    def _schedule_aggregation_trigger(self, user_id: str, delay_sec: int):
+    def _schedule_aggregation_trigger(self, staff_id: str, user_id: str, delay_sec: int):
         logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec))
         message_ts = int((time.time() + delay_sec) * 1000)
-        message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, None, user_id, None, message_ts)
+        message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, user_id, staff_id, None, message_ts)
         message.id = -MessageType.AGGREGATION_TRIGGER.code
         self.scheduler.add_job(lambda: self.receive_queue.produce(message),
                                'date',
@@ -121,7 +121,7 @@ class AgentService:
             if message.type != MessageType.AGGREGATION_TRIGGER:
                 # 产生一个触发器,但是不能由触发器递归产生
                 logging.debug("user: {}, waiting next message for aggregation".format(user_id))
-                self._schedule_aggregation_trigger(user_id, agent.message_aggregation_sec)
+                self._schedule_aggregation_trigger(staff_id, user_id, agent.message_aggregation_sec)
             return
         else:
             # 先更新用户画像再处理回复
@@ -142,6 +142,7 @@ class AgentService:
     def _check_initiative_conversations(self):
         """定时检查主动发起对话"""
         for user_id in self.user_manager.list_all_users():
+            #FIXME(zhoutian): 需要企微账号与用户关系
             agent = self._get_agent_instance('staff_id_0', user_id)
             should_initiate = agent.should_initiate_conversation()
 
@@ -217,6 +218,7 @@ if __name__ == "__main__":
         user_manager=user_manager,
         chat_service_type=ChatServiceType.COZE_CHAT
     )
+    # 只有企微场景需要主动发起
     service.setup_initiative_conversations({'second': '5,35'})
 
     process_thread = threading.Thread(target=service.process_messages)
@@ -230,7 +232,7 @@ if __name__ == "__main__":
             continue
         message_id += 1
         message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
-            'staff_id_1','user_id_1', text, int(time.time() * 1000)
+            'user_id_1','staff_id_0', text, int(time.time() * 1000)
         )
         message.id = message_id
         receive_queue.produce(message)

+ 2 - 0
user_manager.py

@@ -55,6 +55,8 @@ class LocalUserManager(UserManager):
             return default_profile
 
     def save_user_profile(self, user_id, profile: Dict) -> None:
+        if not user_id:
+            raise Exception("Invalid user_id: {}".format(user_id))
         with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
             json.dump(profile, f, ensure_ascii=False, indent=2)