#!/usr/bin/env python3 """ 清理 PostgreSQL 数据库锁和阻塞会话 """ import os import sys import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv # 加载环境变量 _script_dir = os.path.dirname(os.path.abspath(__file__)) _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..')) load_dotenv(os.path.join(_project_root, '.env')) def get_connection(): """建立数据库连接""" host = os.getenv('KNOWHUB_DB') port = int(os.getenv('KNOWHUB_PORT', 5432)) user = os.getenv('KNOWHUB_USER') password = os.getenv('KNOWHUB_PASSWORD') dbname = os.getenv('KNOWHUB_DB_NAME') print(f"连接到 {host}:{port}/{dbname} as {user} ...") conn = psycopg2.connect( host=host, port=port, user=user, password=password, database=dbname, connect_timeout=10 ) conn.autocommit = True print("连接成功。\n") return conn def show_locks(cursor): """显示当前的锁信息""" print("=" * 80) print("当前数据库锁信息:") print("=" * 80) cursor.execute(""" SELECT l.pid, l.locktype, l.relation::regclass AS table_name, l.mode, l.granted, a.usename, a.application_name, a.state, a.query_start, a.state_change, LEFT(a.query, 100) AS query FROM pg_locks l LEFT JOIN pg_stat_activity a ON l.pid = a.pid WHERE l.relation IS NOT NULL ORDER BY l.granted, a.query_start; """) locks = cursor.fetchall() if not locks: print("✓ 没有发现表级锁\n") return [] for lock in locks: print(f"\nPID: {lock['pid']}") print(f" 表: {lock['table_name']}") print(f" 锁类型: {lock['locktype']} / {lock['mode']}") print(f" 已授予: {'是' if lock['granted'] else '否(等待中)'}") print(f" 用户: {lock['usename']}") print(f" 应用: {lock['application_name']}") print(f" 状态: {lock['state']}") print(f" 查询开始: {lock['query_start']}") print(f" 查询: {lock['query']}") return locks def show_blocking(cursor): """显示阻塞关系""" print("\n" + "=" * 80) print("阻塞关系:") print("=" * 80) cursor.execute(""" SELECT blocked_locks.pid AS blocked_pid, blocked_activity.usename AS blocked_user, blocking_locks.pid AS blocking_pid, blocking_activity.usename AS blocking_user, blocked_activity.query AS blocked_query, blocking_activity.query AS blocking_query, blocking_activity.state AS blocking_state FROM pg_catalog.pg_locks blocked_locks JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid AND blocking_locks.pid != blocked_locks.pid JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid WHERE NOT blocked_locks.granted; """) blocking = cursor.fetchall() if not blocking: print("✓ 没有发现阻塞关系\n") return [] for block in blocking: print(f"\n被阻塞的会话 PID: {block['blocked_pid']} (用户: {block['blocked_user']})") print(f" 查询: {block['blocked_query'][:100]}") print(f"\n阻塞者 PID: {block['blocking_pid']} (用户: {block['blocking_user']})") print(f" 状态: {block['blocking_state']}") print(f" 查询: {block['blocking_query'][:100]}") return blocking def show_active_connections(cursor): """显示活跃连接""" print("\n" + "=" * 80) print("活跃连接:") print("=" * 80) cursor.execute(""" SELECT pid, usename, application_name, client_addr, state, query_start, state_change, LEFT(query, 100) AS query FROM pg_stat_activity WHERE state != 'idle' AND pid != pg_backend_pid() ORDER BY query_start; """) connections = cursor.fetchall() if not connections: print("✓ 没有其他活跃连接\n") return [] for conn in connections: print(f"\nPID: {conn['pid']}") print(f" 用户: {conn['usename']}") print(f" 应用: {conn['application_name']}") print(f" 客户端: {conn['client_addr']}") print(f" 状态: {conn['state']}") print(f" 查询开始: {conn['query_start']}") print(f" 查询: {conn['query']}") return connections def kill_session(cursor, pid): """终止指定的会话""" try: cursor.execute("SELECT pg_terminate_backend(%s)", (pid,)) result = cursor.fetchone() if result and result[0]: print(f"✓ 成功终止会话 PID: {pid}") return True else: print(f"✗ 无法终止会话 PID: {pid}") return False except Exception as e: print(f"✗ 终止会话失败: {e}") return False def main(): if len(sys.argv) > 1 and sys.argv[1] == '--kill-all': kill_all = True else: kill_all = False conn = get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # 显示锁信息 locks = show_locks(cursor) # 显示阻塞关系 blocking = show_blocking(cursor) # 显示活跃连接 connections = show_active_connections(cursor) # 如果有阻塞,询问是否终止 if blocking or kill_all: print("\n" + "=" * 80) if kill_all: print("将终止所有活跃连接...") pids_to_kill = [c['pid'] for c in connections] else: print("发现阻塞关系,建议终止阻塞者会话") pids_to_kill = list(set([b['blocking_pid'] for b in blocking])) if pids_to_kill: print(f"\n准备终止的 PID: {pids_to_kill}") confirm = input("确认终止这些会话?(yes/no): ") if confirm.lower() in ['yes', 'y']: for pid in pids_to_kill: kill_session(cursor, pid) print("\n清理完成!") else: print("\n已取消操作") else: print("\n没有需要终止的会话") cursor.close() conn.close() if __name__ == '__main__': print("PostgreSQL 锁清理工具") print("用法:") print(" python clear_locks.py # 检查锁并选择性终止") print(" python clear_locks.py --kill-all # 终止所有活跃连接") print() main()