瀏覽代碼

Add scripts/resend_lost_message

StrayWarrior 5 天之前
父節點
當前提交
e43cb01799
共有 1 個文件被更改,包括 105 次插入0 次删除
  1. 105 0
      scripts/resend_lost_message.py

+ 105 - 0
scripts/resend_lost_message.py

@@ -0,0 +1,105 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+from datetime import datetime
+import re
+
+import configs
+from message import MessageChannel, Message
+from message_queue_backend import AliyunRocketMQQueueBackend
+from user_manager import MySQLUserRelationManager
+
+
+config = configs.get()
+
+def main():
+    wecom_db_config = config['storage']['user_relation']
+    user_db_config = config['storage']['user']
+
+    user_relation_manager = MySQLUserRelationManager(
+        user_db_config['mysql'], wecom_db_config['mysql'],
+        config['storage']['staff']['table'],
+        user_db_config['table'],
+        wecom_db_config['table']['staff'],
+        wecom_db_config['table']['relation'],
+        wecom_db_config['table']['user']
+    )
+
+    send_queue = AliyunRocketMQQueueBackend(
+        config['mq']['endpoints'],
+        config['mq']['instance_id'],
+        config['mq']['send_topic'],
+        has_consumer=False, has_producer=True
+    )
+
+    """
+    log格式
+    2025-05-02 07:17:15,869 - agent _send_response[200] - WARNING - staff[1688857241615085] user[7881299501048462]: response[MessageType.TEXT] 早上好呀!感谢您的祝福~您这发送祝福信息的爱好真不错,您一般都喜欢给哪些人发祝福呀?
+    2025-05-02 07:17:15,949 - agent _send_response[209] - WARNING - staff[1688857241615085] user[7881299501048462]: skip reply
+    """
+
+    # 从后往前读取指定的日志文件在2025-05-07 07:00:00后的日志,解析内容为skip reply的日志,找到其前一条同一staff和user的response日志,解析其MessageType、response内容、timestamp
+    # 查询userid的tags,如果包含"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"其中之一,则将response组装为Message,放入发送队列
+    # 记录发送的userid,如果同一用户已经发送过,则不再发送
+    # 发送所需代码为:
+    # self.send_queue.produce(
+    #             Message.build(message_type, MessageChannel.CORP_WECHAT,
+    #                           staff_id, user_id, response, current_ts)
+    #         )
+
+    log_name = '/var/log/agent_service/service.log'
+    processed_users = set()
+    target_tags = {"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"}
+    cutoff_time = datetime.strptime("2025-05-07 07:00:00", "%Y-%m-%d %H:%M:%S")
+
+    with open(log_name, "r", encoding="utf-8") as log_file:
+        logs = log_file.readlines()[::-1]  # Reverse the logs for backward processing
+
+    for i, log in enumerate(logs):
+        if "response[" in log:
+            match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
+            if not match:
+                continue
+            staff_id, user_id = match.groups()
+            processed_users.add(user_id)
+
+        if "skip reply" in log:
+            match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
+            if not match:
+                continue
+
+            staff_id, user_id = match.groups()
+
+            # Find the preceding response log
+            for prev_log in logs[i + 1:]:
+                if f"staff[{staff_id}] user[{user_id}]: response[" in prev_log:
+                    response_match = re.search(
+                        r"response\[(.*?)\] (.*)", prev_log
+                    )
+                    if not response_match:
+                        break
+
+                    message_type, response = response_match.groups()
+                    timestamp_match = re.search(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", prev_log)
+                    if not timestamp_match:
+                        break
+
+                    timestamp = datetime.strptime(timestamp_match.group(1), "%Y-%m-%d %H:%M:%S")
+                    if timestamp <= cutoff_time:
+                        break
+
+                    # Query user tags
+                    user_tags = set(user_relation_manager.get_user_tags(user_id))
+                    if not target_tags.intersection(user_tags):
+                        break
+
+                    # Check if user has already been processed
+                    if user_id in processed_users:
+                        break
+                    message = Message.build(message_type, MessageChannel.CORP_WECHAT,
+                                            staff_id, user_id, response, int(timestamp.timestamp() * 1000))
+                    print(message)
+                    # Send the message
+                    # send_queue.produce(message)
+                    processed_users.add(user_id)
+                    break