ai_member_xiaoxi/scripts/test_db_connections.py
2026-03-12 08:00:01 +08:00

273 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
数据库连接测试脚本
仅用于测试连接和读取基本信息,不进行任何写入操作
"""
import sys
import json
import warnings
from urllib.parse import quote_plus
# 忽略 SSL 警告
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
def test_es_connection(host, port, scheme, user, password, description):
"""测试 Elasticsearch 连接"""
try:
import requests
from requests.auth import HTTPBasicAuth
url = f"{scheme}://{host}:{port}"
print(f"\n{'='*60}")
print(f"测试: {description}")
print(f"地址: {url}")
print(f"{'='*60}")
# 测试基本连接
response = requests.get(
url,
auth=HTTPBasicAuth(user, password),
verify=False, # 忽略 SSL 证书验证(测试环境)
timeout=10
)
if response.status_code == 200:
info = response.json()
print(f"✅ 连接成功!")
print(f" 集群名称: {info.get('cluster_name', 'N/A')}")
print(f" 版本: {info.get('version', {}).get('number', 'N/A')}")
# 尝试获取索引列表
indices_response = requests.get(
f"{url}/_cat/indices?format=json",
auth=HTTPBasicAuth(user, password),
verify=False,
timeout=10
)
if indices_response.status_code == 200:
indices = indices_response.json()
print(f" 索引数量: {len(indices)}")
if indices:
print(f" 索引示例: {', '.join([idx['index'] for idx in indices[:3]])}")
return True
else:
print(f"❌ 连接失败: HTTP {response.status_code}")
print(f" 响应: {response.text[:200]}")
return False
except ImportError:
print(f"\n⚠️ 缺少 requests 库,无法测试 Elasticsearch")
print(f" 请运行: pip install requests")
return None
except Exception as e:
print(f"❌ 连接异常: {str(e)[:200]}")
return False
def test_mysql_connection(host, port, user, password, description, database=None):
"""测试 MySQL 连接"""
try:
import pymysql
print(f"\n{'='*60}")
print(f"测试: {description}")
print(f"地址: {host}:{port}")
print(f"{'='*60}")
# 尝试连接
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
connect_timeout=10,
read_timeout=10
)
print(f"✅ 连接成功!")
# 获取服务器信息
with connection.cursor() as cursor:
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()
print(f" 版本: {version[0] if version else 'N/A'}")
# 获取数据库列表
cursor.execute("SHOW DATABASES")
databases = cursor.fetchall()
print(f" 数据库数量: {len(databases)}")
if databases:
print(f" 数据库示例: {', '.join([db[0] for db in databases[:5]])}")
connection.close()
return True
except ImportError:
print(f"\n⚠️ 缺少 pymysql 库,无法测试 MySQL")
print(f" 请运行: pip install pymysql")
return None
except Exception as e:
print(f"❌ 连接异常: {str(e)[:200]}")
return False
def test_postgresql_connection(host, port, user, password, description, database=None):
"""测试 PostgreSQL 连接"""
try:
import psycopg2
print(f"\n{'='*60}")
print(f"测试: {description}")
print(f"地址: {host}:{port}")
print(f"{'='*60}")
# 尝试连接
connection = psycopg2.connect(
host=host,
port=port,
user=user,
password=password,
dbname=database if database else 'postgres',
connect_timeout=10
)
print(f"✅ 连接成功!")
# 获取服务器信息
with connection.cursor() as cursor:
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f" 版本: {version[0].split()[0] if version else 'N/A'}")
# 获取数据库列表
cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false")
databases = cursor.fetchall()
print(f" 数据库数量: {len(databases)}")
if databases:
print(f" 数据库示例: {', '.join([db[0] for db in databases[:5]])}")
connection.close()
return True
except ImportError:
print(f"\n⚠️ 缺少 psycopg2-binary 库,无法测试 PostgreSQL")
print(f" 请运行: pip install psycopg2-binary")
return None
except Exception as e:
print(f"❌ 连接异常: {str(e)[:200]}")
return False
def main():
print("="*60)
print("数据库连接测试")
print("注意: 仅进行连接测试和只读操作")
print("="*60)
results = {}
# ES 配置
es_configs = [
{
"description": "Test ES (测试环境服务日志)",
"host": "es-o79jsx9i.public.tencentelasticsearch.com",
"port": 9200,
"scheme": "https",
"user": "elastic",
"password": "lPLYr2!ap%^4UQb#"
},
{
"description": "Online ES (正式环境服务日志)",
"host": "es-7vd7jcu9.public.tencentelasticsearch.com",
"port": 9200,
"scheme": "https",
"user": "elastic",
"password": "F%?QDcWes7N2WTuiYD11"
}
]
# MySQL 配置
mysql_configs = [
{
"description": "Online MySQL (线上版本)",
"host": "bj-cdb-dh2fkqa0.sql.tencentcdb.com",
"port": 27751,
"user": "read_only",
"password": "fsdo45ijfmfmuu77$%^&"
},
{
"description": "Test MySQL (测试环境)",
"host": "bj-cdb-8frbdwju.sql.tencentcdb.com",
"port": 25413,
"user": "read_only",
"password": "fdsfiidier^$*hjfdijjd232"
}
]
# PostgreSQL 配置
pg_configs = [
{
"description": "Online PostgreSQL 1 (线上用户行为数据)",
"host": "bj-postgres-16pob4sg.sql.tencentcdb.com",
"port": 28591,
"user": "ai_member",
"password": "Jhfdhsfduse&%$*^&6786"
},
{
"description": "Online PostgreSQL 2 (正式环境用户行为数据)",
"host": "bj-postgres-642mcico.sql.tencentcdb.com",
"port": 21531,
"user": "ai_member",
"password": "LdfjdjL83h3h3^$&**YGG*"
}
]
# 安装必要的库
print("\n正在安装必要的 Python 库...")
import subprocess
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "pymysql", "psycopg2-binary"])
print("✅ 库安装成功!")
except Exception as e:
print(f"⚠️ 库安装可能遇到问题: {e}")
print(" 继续尝试测试...")
# 测试 ES 连接
print("\n" + "="*60)
print("测试 Elasticsearch 数据库")
print("="*60)
for config in es_configs:
result = test_es_connection(**config)
results[config["description"]] = result
# 测试 MySQL 连接
print("\n" + "="*60)
print("测试 MySQL 数据库")
print("="*60)
for config in mysql_configs:
result = test_mysql_connection(**config)
results[config["description"]] = result
# 测试 PostgreSQL 连接
print("\n" + "="*60)
print("测试 PostgreSQL 数据库")
print("="*60)
for config in pg_configs:
result = test_postgresql_connection(**config)
results[config["description"]] = result
# 总结
print("\n" + "="*60)
print("测试总结")
print("="*60)
for name, result in results.items():
status = "✅ 成功" if result else ("❌ 失败" if result is False else "⚠️ 跳过")
print(f"{name}: {status}")
print("\n📋 备注:")
print(" - Test PostgreSQL 配置缺少 host 和 port 信息")
print(" - 所有测试仅进行只读操作,未修改任何数据")
if __name__ == "__main__":
main()