| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python3
- """
- 清理 PostgreSQL 数据库锁和阻塞会话
- """
- import os
- import sys
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- # 加载环境变量
- _script_dir = os.path.dirname(os.path.abspath(__file__))
- _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..'))
- load_dotenv(os.path.join(_project_root, '.env'))
- def get_connection():
- """建立数据库连接"""
- host = os.getenv('KNOWHUB_DB')
- port = int(os.getenv('KNOWHUB_PORT', 5432))
- user = os.getenv('KNOWHUB_USER')
- password = os.getenv('KNOWHUB_PASSWORD')
- dbname = os.getenv('KNOWHUB_DB_NAME')
- print(f"连接到 {host}:{port}/{dbname} as {user} ...")
- conn = psycopg2.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- database=dbname,
- connect_timeout=10
- )
- conn.autocommit = True
- print("连接成功。\n")
- return conn
- def show_locks(cursor):
- """显示当前的锁信息"""
- print("=" * 80)
- print("当前数据库锁信息:")
- print("=" * 80)
- cursor.execute("""
- SELECT
- l.pid,
- l.locktype,
- l.relation::regclass AS table_name,
- l.mode,
- l.granted,
- a.usename,
- a.application_name,
- a.state,
- a.query_start,
- a.state_change,
- LEFT(a.query, 100) AS query
- FROM pg_locks l
- LEFT JOIN pg_stat_activity a ON l.pid = a.pid
- WHERE l.relation IS NOT NULL
- ORDER BY l.granted, a.query_start;
- """)
- locks = cursor.fetchall()
- if not locks:
- print("✓ 没有发现表级锁\n")
- return []
- for lock in locks:
- print(f"\nPID: {lock['pid']}")
- print(f" 表: {lock['table_name']}")
- print(f" 锁类型: {lock['locktype']} / {lock['mode']}")
- print(f" 已授予: {'是' if lock['granted'] else '否(等待中)'}")
- print(f" 用户: {lock['usename']}")
- print(f" 应用: {lock['application_name']}")
- print(f" 状态: {lock['state']}")
- print(f" 查询开始: {lock['query_start']}")
- print(f" 查询: {lock['query']}")
- return locks
- def show_blocking(cursor):
- """显示阻塞关系"""
- print("\n" + "=" * 80)
- print("阻塞关系:")
- print("=" * 80)
- cursor.execute("""
- SELECT
- blocked_locks.pid AS blocked_pid,
- blocked_activity.usename AS blocked_user,
- blocking_locks.pid AS blocking_pid,
- blocking_activity.usename AS blocking_user,
- blocked_activity.query AS blocked_query,
- blocking_activity.query AS blocking_query,
- blocking_activity.state AS blocking_state
- FROM pg_catalog.pg_locks blocked_locks
- JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
- JOIN pg_catalog.pg_locks blocking_locks
- ON blocking_locks.locktype = blocked_locks.locktype
- AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
- AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
- AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
- AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
- AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
- AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
- AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
- AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
- AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
- AND blocking_locks.pid != blocked_locks.pid
- JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
- WHERE NOT blocked_locks.granted;
- """)
- blocking = cursor.fetchall()
- if not blocking:
- print("✓ 没有发现阻塞关系\n")
- return []
- for block in blocking:
- print(f"\n被阻塞的会话 PID: {block['blocked_pid']} (用户: {block['blocked_user']})")
- print(f" 查询: {block['blocked_query'][:100]}")
- print(f"\n阻塞者 PID: {block['blocking_pid']} (用户: {block['blocking_user']})")
- print(f" 状态: {block['blocking_state']}")
- print(f" 查询: {block['blocking_query'][:100]}")
- return blocking
- def show_active_connections(cursor):
- """显示活跃连接"""
- print("\n" + "=" * 80)
- print("活跃连接:")
- print("=" * 80)
- cursor.execute("""
- SELECT
- pid,
- usename,
- application_name,
- client_addr,
- state,
- query_start,
- state_change,
- LEFT(query, 100) AS query
- FROM pg_stat_activity
- WHERE state != 'idle'
- AND pid != pg_backend_pid()
- ORDER BY query_start;
- """)
- connections = cursor.fetchall()
- if not connections:
- print("✓ 没有其他活跃连接\n")
- return []
- for conn in connections:
- print(f"\nPID: {conn['pid']}")
- print(f" 用户: {conn['usename']}")
- print(f" 应用: {conn['application_name']}")
- print(f" 客户端: {conn['client_addr']}")
- print(f" 状态: {conn['state']}")
- print(f" 查询开始: {conn['query_start']}")
- print(f" 查询: {conn['query']}")
- return connections
- def kill_session(cursor, pid):
- """终止指定的会话"""
- try:
- cursor.execute("SELECT pg_terminate_backend(%s)", (pid,))
- result = cursor.fetchone()
- if result and result[0]:
- print(f"✓ 成功终止会话 PID: {pid}")
- return True
- else:
- print(f"✗ 无法终止会话 PID: {pid}")
- return False
- except Exception as e:
- print(f"✗ 终止会话失败: {e}")
- return False
- def main():
- if len(sys.argv) > 1 and sys.argv[1] == '--kill-all':
- kill_all = True
- else:
- kill_all = False
- conn = get_connection()
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- # 显示锁信息
- locks = show_locks(cursor)
- # 显示阻塞关系
- blocking = show_blocking(cursor)
- # 显示活跃连接
- connections = show_active_connections(cursor)
- # 如果有阻塞,询问是否终止
- if blocking or kill_all:
- print("\n" + "=" * 80)
- if kill_all:
- print("将终止所有活跃连接...")
- pids_to_kill = [c['pid'] for c in connections]
- else:
- print("发现阻塞关系,建议终止阻塞者会话")
- pids_to_kill = list(set([b['blocking_pid'] for b in blocking]))
- if pids_to_kill:
- print(f"\n准备终止的 PID: {pids_to_kill}")
- confirm = input("确认终止这些会话?(yes/no): ")
- if confirm.lower() in ['yes', 'y']:
- for pid in pids_to_kill:
- kill_session(cursor, pid)
- print("\n清理完成!")
- else:
- print("\n已取消操作")
- else:
- print("\n没有需要终止的会话")
- cursor.close()
- conn.close()
- if __name__ == '__main__':
- print("PostgreSQL 锁清理工具")
- print("用法:")
- print(" python clear_locks.py # 检查锁并选择性终止")
- print(" python clear_locks.py --kill-all # 终止所有活跃连接")
- print()
- main()
|