123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- from datetime import datetime
- import re
- from pqai_agent import configs
- from pqai_agent.mq_message import MessageChannel, MqMessage, MessageType
- from pqai_agent.message_queue_backend import AliyunRocketMQQueueBackend
- from pqai_agent.user_manager import MySQLUserRelationManager
- config = configs.get()
- def main():
- user_db_config = config['storage']['user']
- agent_db_config = config['database']['ai_agent']
- growth_db_config = config['database']['growth']
- wecom_db_config = config['storage']['user_relation']
- user_relation_manager = MySQLUserRelationManager(
- agent_db_config, growth_db_config,
- config['storage']['staff']['table'],
- user_db_config['table'],
- wecom_db_config['table']['staff'],
- wecom_db_config['table']['relation'],
- wecom_db_config['table']['user']
- )
- send_queue = AliyunRocketMQQueueBackend(
- config['mq']['endpoints'],
- config['mq']['instance_id'],
- config['mq']['send_topic'],
- has_consumer=False, has_producer=True
- )
- message_type_map = {
- 'MessageType.TEXT': MessageType.TEXT,
- 'MessageType.VOICE': MessageType.VOICE,
- }
- """
- log格式
- 2025-05-02 07:17:15,869 - agent _send_response[200] - WARNING - staff[1688857241615085] user[7881299501048462]: response[MessageType.TEXT] 早上好呀!感谢您的祝福~您这发送祝福信息的爱好真不错,您一般都喜欢给哪些人发祝福呀?
- 2025-05-02 07:17:15,949 - agent _send_response[209] - WARNING - staff[1688857241615085] user[7881299501048462]: skip reply
- """
-
-
-
-
-
-
-
-
- log_name = '/var/log/agent_service/service.log'
- processed_users = set()
- target_tags = {"04W4-AA-1", "04W4-AA-2", "04W4-AA-3", "04W4-AA-4"}
- cutoff_time = datetime.strptime("2025-05-07 07:35:00", "%Y-%m-%d %H:%M:%S")
- with open(log_name, "r", encoding="utf-8") as log_file:
- logs = log_file.readlines()[::-1]
- for i, log in enumerate(logs):
- if ": response[" in log:
- match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
- if not match:
- continue
- staff_id, user_id = match.groups()
- processed_users.add(user_id)
- elif "skip reply" in log:
- match = re.search(r"staff\[(\d+)\] user\[(\d+)\]", log)
- if not match:
- continue
- staff_id, user_id = match.groups()
-
- for prev_log in logs[i + 1:]:
- if f"staff[{staff_id}] user[{user_id}]: response[" in prev_log:
- response_match = re.search(
- r": response\[(.*?)\] (.*)", prev_log
- )
- if not response_match:
- break
- message_type, response = response_match.groups()
- message_type = message_type_map[message_type]
- timestamp_match = re.search(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", prev_log)
- if not timestamp_match:
- break
- timestamp = datetime.strptime(timestamp_match.group(1), "%Y-%m-%d %H:%M:%S")
- if timestamp <= cutoff_time:
- break
-
- user_tags = set(user_relation_manager.get_user_tags(user_id))
- if not target_tags.intersection(user_tags):
- break
-
- if user_id in processed_users:
- break
- message = MqMessage.build(message_type, MessageChannel.CORP_WECHAT,
- staff_id, user_id, response, int(timestamp.timestamp() * 1000))
- print(message)
-
- send_queue.produce(message)
- processed_users.add(user_id)
- break
- if __name__ == '__main__':
- main()
|