#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 from datetime import datetime import re import configs from message import MessageChannel, Message from message_queue_backend import AliyunRocketMQQueueBackend from user_manager import MySQLUserRelationManager config = configs.get() def main(): wecom_db_config = config['storage']['user_relation'] user_db_config = config['storage']['user'] user_relation_manager = MySQLUserRelationManager( user_db_config['mysql'], wecom_db_config['mysql'], config['storage']['staff']['table'], user_db_config['table'], wecom_db_config['table']['staff'], wecom_db_config['table']['relation'], wecom_db_config['table']['user'] ) send_queue = AliyunRocketMQQueueBackend( config['mq']['endpoints'], config['mq']['instance_id'], config['mq']['send_topic'], has_consumer=False, has_producer=True ) """ log格式 2025-05-02 07:17:15,869 - agent _send_response[200] - WARNING - staff[1688857241615085] user[7881299501048462]: response[MessageType.TEXT] 早上好呀!感谢您的祝福~您这发送祝福信息的爱好真不错,您一般都喜欢给哪些人发祝福呀? 2025-05-02 07:17:15,949 - agent _send_response[209] - WARNING - staff[1688857241615085] user[7881299501048462]: skip reply """ # 从后往前读取指定的日志文件在2025-05-07 07:00:00后的日志,解析内容为skip reply的日志,找到其前一条同一staff和user的response日志,解析其MessageType、response内容、timestamp # 查询userid的tags,如果包含"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"其中之一,则将response组装为Message,放入发送队列 # 记录发送的userid,如果同一用户已经发送过,则不再发送 # 发送所需代码为: # self.send_queue.produce( # Message.build(message_type, MessageChannel.CORP_WECHAT, # staff_id, user_id, response, current_ts) # ) log_name = '/var/log/agent_service/service.log' processed_users = set() target_tags = {"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"} cutoff_time = datetime.strptime("2025-05-07 07:00:00", "%Y-%m-%d %H:%M:%S") with open(log_name, "r", encoding="utf-8") as log_file: logs = log_file.readlines()[::-1] # Reverse the logs for backward processing for i, log in enumerate(logs): if "response[" in log: match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log) if not match: continue staff_id, user_id = match.groups() processed_users.add(user_id) if "skip reply" in log: match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log) if not match: continue staff_id, user_id = match.groups() # Find the preceding response log for prev_log in logs[i + 1:]: if f"staff[{staff_id}] user[{user_id}]: response[" in prev_log: response_match = re.search( r"response\[(.*?)\] (.*)", prev_log ) if not response_match: break message_type, response = response_match.groups() timestamp_match = re.search(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", prev_log) if not timestamp_match: break timestamp = datetime.strptime(timestamp_match.group(1), "%Y-%m-%d %H:%M:%S") if timestamp <= cutoff_time: break # Query user tags user_tags = set(user_relation_manager.get_user_tags(user_id)) if not target_tags.intersection(user_tags): break # Check if user has already been processed if user_id in processed_users: break message = Message.build(message_type, MessageChannel.CORP_WECHAT, staff_id, user_id, response, int(timestamp.timestamp() * 1000)) print(message) # Send the message # send_queue.produce(message) processed_users.add(user_id) break