resend_lost_message.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from datetime import datetime
  5. import re
  6. from pqai_agent import configs
  7. from pqai_agent.mq_message import MessageChannel, MqMessage, MessageType
  8. from pqai_agent.message_queue_backend import AliyunRocketMQQueueBackend
  9. from pqai_agent.user_manager import MySQLUserRelationManager
  10. config = configs.get()
  11. def main():
  12. user_db_config = config['storage']['user']
  13. agent_db_config = config['database']['ai_agent']
  14. growth_db_config = config['database']['growth']
  15. wecom_db_config = config['storage']['user_relation']
  16. user_relation_manager = MySQLUserRelationManager(
  17. agent_db_config, growth_db_config,
  18. config['storage']['staff']['table'],
  19. user_db_config['table'],
  20. wecom_db_config['table']['staff'],
  21. wecom_db_config['table']['relation'],
  22. wecom_db_config['table']['user']
  23. )
  24. send_queue = AliyunRocketMQQueueBackend(
  25. config['mq']['endpoints'],
  26. config['mq']['instance_id'],
  27. config['mq']['send_topic'],
  28. has_consumer=False, has_producer=True
  29. )
  30. message_type_map = {
  31. 'MessageType.TEXT': MessageType.TEXT,
  32. 'MessageType.VOICE': MessageType.VOICE,
  33. }
  34. """
  35. log格式
  36. 2025-05-02 07:17:15,869 - agent _send_response[200] - WARNING - staff[1688857241615085] user[7881299501048462]: response[MessageType.TEXT] 早上好呀!感谢您的祝福~您这发送祝福信息的爱好真不错,您一般都喜欢给哪些人发祝福呀?
  37. 2025-05-02 07:17:15,949 - agent _send_response[209] - WARNING - staff[1688857241615085] user[7881299501048462]: skip reply
  38. """
  39. # 从后往前读取指定的日志文件在2025-05-07 07:00:00后的日志,解析内容为skip reply的日志,找到其前一条同一staff和user的response日志,解析其MessageType、response内容、timestamp
  40. # 查询userid的tags,如果包含"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"其中之一,则将response组装为Message,放入发送队列
  41. # 记录发送的userid,如果同一用户已经发送过,则不再发送
  42. # 发送所需代码为:
  43. # self.send_queue.produce(
  44. # Message.build(message_type, MessageChannel.CORP_WECHAT,
  45. # staff_id, user_id, response, current_ts)
  46. # )
  47. log_name = '/var/log/agent_service/service.log'
  48. processed_users = set()
  49. target_tags = {"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"}
  50. cutoff_time = datetime.strptime("2025-05-07 07:35:00", "%Y-%m-%d %H:%M:%S")
  51. with open(log_name, "r", encoding="utf-8") as log_file:
  52. logs = log_file.readlines()[::-1] # Reverse the logs for backward processing
  53. for i, log in enumerate(logs):
  54. if ": response[" in log:
  55. match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
  56. if not match:
  57. continue
  58. staff_id, user_id = match.groups()
  59. processed_users.add(user_id)
  60. elif "skip reply" in log:
  61. match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
  62. if not match:
  63. continue
  64. staff_id, user_id = match.groups()
  65. # Find the preceding response log
  66. for prev_log in logs[i + 1:]:
  67. if f"staff[{staff_id}] user[{user_id}]: response[" in prev_log:
  68. response_match = re.search(
  69. r": response\[(.*?)\] (.*)", prev_log
  70. )
  71. if not response_match:
  72. break
  73. message_type, response = response_match.groups()
  74. message_type = message_type_map[message_type]
  75. timestamp_match = re.search(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", prev_log)
  76. if not timestamp_match:
  77. break
  78. timestamp = datetime.strptime(timestamp_match.group(1), "%Y-%m-%d %H:%M:%S")
  79. if timestamp <= cutoff_time:
  80. break
  81. # Query user tags
  82. user_tags = set(user_relation_manager.get_user_tags(user_id))
  83. if not target_tags.intersection(user_tags):
  84. break
  85. # Check if user has already been processed
  86. if user_id in processed_users:
  87. break
  88. message = MqMessage.build(message_type, MessageChannel.CORP_WECHAT,
  89. staff_id, user_id, response, int(timestamp.timestamp() * 1000))
  90. print(message)
  91. # Send the message
  92. send_queue.produce(message)
  93. processed_users.add(user_id)
  94. break
  95. if __name__ == '__main__':
  96. main()