#!/usr/bin/env python3 """ 数据库连接测试脚本 仅用于测试连接和读取基本信息,不进行任何写入操作 """ import sys import json import warnings from urllib.parse import quote_plus # 忽略 SSL 警告 warnings.filterwarnings('ignore', message='Unverified HTTPS request') def test_es_connection(host, port, scheme, user, password, description): """测试 Elasticsearch 连接""" try: import requests from requests.auth import HTTPBasicAuth url = f"{scheme}://{host}:{port}" print(f"\n{'='*60}") print(f"测试: {description}") print(f"地址: {url}") print(f"{'='*60}") # 测试基本连接 response = requests.get( url, auth=HTTPBasicAuth(user, password), verify=False, # 忽略 SSL 证书验证(测试环境) timeout=10 ) if response.status_code == 200: info = response.json() print(f"✅ 连接成功!") print(f" 集群名称: {info.get('cluster_name', 'N/A')}") print(f" 版本: {info.get('version', {}).get('number', 'N/A')}") # 尝试获取索引列表 indices_response = requests.get( f"{url}/_cat/indices?format=json", auth=HTTPBasicAuth(user, password), verify=False, timeout=10 ) if indices_response.status_code == 200: indices = indices_response.json() print(f" 索引数量: {len(indices)}") if indices: print(f" 索引示例: {', '.join([idx['index'] for idx in indices[:3]])}") return True else: print(f"❌ 连接失败: HTTP {response.status_code}") print(f" 响应: {response.text[:200]}") return False except ImportError: print(f"\n⚠️ 缺少 requests 库,无法测试 Elasticsearch") print(f" 请运行: pip install requests") return None except Exception as e: print(f"❌ 连接异常: {str(e)[:200]}") return False def test_mysql_connection(host, port, user, password, description, database=None): """测试 MySQL 连接""" try: import pymysql print(f"\n{'='*60}") print(f"测试: {description}") print(f"地址: {host}:{port}") print(f"{'='*60}") # 尝试连接 connection = pymysql.connect( host=host, port=port, user=user, password=password, database=database, connect_timeout=10, read_timeout=10 ) print(f"✅ 连接成功!") # 获取服务器信息 with connection.cursor() as cursor: cursor.execute("SELECT VERSION()") version = cursor.fetchone() print(f" 版本: {version[0] if version else 'N/A'}") # 获取数据库列表 cursor.execute("SHOW DATABASES") databases = cursor.fetchall() print(f" 数据库数量: {len(databases)}") if databases: print(f" 数据库示例: {', '.join([db[0] for db in databases[:5]])}") connection.close() return True except ImportError: print(f"\n⚠️ 缺少 pymysql 库,无法测试 MySQL") print(f" 请运行: pip install pymysql") return None except Exception as e: print(f"❌ 连接异常: {str(e)[:200]}") return False def test_postgresql_connection(host, port, user, password, description, database=None): """测试 PostgreSQL 连接""" try: import psycopg2 print(f"\n{'='*60}") print(f"测试: {description}") print(f"地址: {host}:{port}") print(f"{'='*60}") # 尝试连接 connection = psycopg2.connect( host=host, port=port, user=user, password=password, dbname=database if database else 'postgres', connect_timeout=10 ) print(f"✅ 连接成功!") # 获取服务器信息 with connection.cursor() as cursor: cursor.execute("SELECT version()") version = cursor.fetchone() print(f" 版本: {version[0].split()[0] if version else 'N/A'}") # 获取数据库列表 cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false") databases = cursor.fetchall() print(f" 数据库数量: {len(databases)}") if databases: print(f" 数据库示例: {', '.join([db[0] for db in databases[:5]])}") connection.close() return True except ImportError: print(f"\n⚠️ 缺少 psycopg2-binary 库,无法测试 PostgreSQL") print(f" 请运行: pip install psycopg2-binary") return None except Exception as e: print(f"❌ 连接异常: {str(e)[:200]}") return False def main(): print("="*60) print("数据库连接测试") print("注意: 仅进行连接测试和只读操作") print("="*60) results = {} # ES 配置 es_configs = [ { "description": "Test ES (测试环境服务日志)", "host": "es-o79jsx9i.public.tencentelasticsearch.com", "port": 9200, "scheme": "https", "user": "elastic", "password": "lPLYr2!ap%^4UQb#" }, { "description": "Online ES (正式环境服务日志)", "host": "es-7vd7jcu9.public.tencentelasticsearch.com", "port": 9200, "scheme": "https", "user": "elastic", "password": "F%?QDcWes7N2WTuiYD11" } ] # MySQL 配置 mysql_configs = [ { "description": "Online MySQL (线上版本)", "host": "bj-cdb-dh2fkqa0.sql.tencentcdb.com", "port": 27751, "user": "read_only", "password": "fsdo45ijfmfmuu77$%^&" }, { "description": "Test MySQL (测试环境)", "host": "bj-cdb-8frbdwju.sql.tencentcdb.com", "port": 25413, "user": "read_only", "password": "fdsfiidier^$*hjfdijjd232" } ] # PostgreSQL 配置 pg_configs = [ { "description": "Online PostgreSQL 1 (线上用户行为数据)", "host": "bj-postgres-16pob4sg.sql.tencentcdb.com", "port": 28591, "user": "ai_member", "password": "Jhfdhsfduse&%$*^&6786" }, { "description": "Online PostgreSQL 2 (正式环境用户行为数据)", "host": "bj-postgres-642mcico.sql.tencentcdb.com", "port": 21531, "user": "ai_member", "password": "LdfjdjL83h3h3^$&**YGG*" } ] # 安装必要的库 print("\n正在安装必要的 Python 库...") import subprocess try: subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "pymysql", "psycopg2-binary"]) print("✅ 库安装成功!") except Exception as e: print(f"⚠️ 库安装可能遇到问题: {e}") print(" 继续尝试测试...") # 测试 ES 连接 print("\n" + "="*60) print("测试 Elasticsearch 数据库") print("="*60) for config in es_configs: result = test_es_connection(**config) results[config["description"]] = result # 测试 MySQL 连接 print("\n" + "="*60) print("测试 MySQL 数据库") print("="*60) for config in mysql_configs: result = test_mysql_connection(**config) results[config["description"]] = result # 测试 PostgreSQL 连接 print("\n" + "="*60) print("测试 PostgreSQL 数据库") print("="*60) for config in pg_configs: result = test_postgresql_connection(**config) results[config["description"]] = result # 总结 print("\n" + "="*60) print("测试总结") print("="*60) for name, result in results.items(): status = "✅ 成功" if result else ("❌ 失败" if result is False else "⚠️ 跳过") print(f"{name}: {status}") print("\n📋 备注:") print(" - Test PostgreSQL 配置缺少 host 和 port 信息") print(" - 所有测试仅进行只读操作,未修改任何数据") if __name__ == "__main__": main()