ai_member_xiaoxi/scripts/phone_match_userid.py
2026-06-02 08:00:01 +08:00

157 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""手机号匹配账号ID - 快速统计匹配率XXTEA加密匹配 tel_encrypt"""
import os, sys, re
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
import psycopg2
WORKSPACE_DIR = "/root/.openclaw/workspace"
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
SECRETS_FILE = os.path.join(WORKSPACE_DIR, "secrets.env")
DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
DB_PORT = "28591"
DB_USER = "ai_member"
DB_NAME = "vala_bi"
INPUT_FILE = sys.argv[1] if len(sys.argv) > 1 else sys.exit("Usage: python3 phone_match_userid.py <input.xlsx>")
def load_pg_password():
with open(SECRETS_FILE) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.split("=", 1)[1].strip().strip("'\"")
def extract_phones(file_path):
"""提取手机号,保持原始顺序并去重"""
phones = []
seen = set()
wb = openpyxl.load_workbook(file_path, read_only=True)
for ws in wb:
for row in ws.iter_rows(values_only=True):
for cell in row:
if cell is not None:
val = str(int(cell)) if isinstance(cell, float) else str(cell).strip()
if re.match(r'^1\d{10}$', val) and val not in seen:
seen.add(val)
phones.append(val)
return phones
# 1. 提取手机号
print("📱 提取手机号...")
phones = extract_phones(INPUT_FILE)
print(f" 共提取 {len(phones)} 个不重复手机号")
# 2. 加密手机号用于 SQL IN 查询(匹配 tel_encrypt 字段)
encrypted_list = [encrypt_phone(p) for p in phones]
encrypt_to_phone = {encrypt_phone(p): p for p in phones} # 密文 -> 明文
# 3. 连接数据库查询
print("🔗 连接数据库...")
pw = load_pg_password()
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=pw, dbname=DB_NAME)
cur = conn.cursor()
# 分批查询每批最多500个
matched = {} # 明文手机号 -> (account_id, tel_encrypt)
batch_size = 500
for i in range(0, len(encrypted_list), batch_size):
batch = encrypted_list[i:i+batch_size]
placeholders = ','.join(['%s'] * len(batch))
sql = f"""
SELECT id, tel_encrypt
FROM bi_vala_app_account
WHERE tel_encrypt IN ({placeholders})
AND status = 1
AND deleted_at IS NULL
"""
cur.execute(sql, batch)
for account_id, tel_encrypt in cur.fetchall():
if tel_encrypt in encrypt_to_phone:
matched[encrypt_to_phone[tel_encrypt]] = (account_id, tel_encrypt)
cur.close()
conn.close()
# 4. 统计结果
print(f"\n📊 匹配结果:")
print(f" 输入手机号: {len(phones)}")
print(f" 匹配成功: {len(matched)} 个 ({len(matched)/len(phones)*100:.1f}%)")
print(f" 未匹配: {len(phones) - len(matched)} 个 ({(len(phones)-len(matched))/len(phones)*100:.1f}%)")
# 5. 输出 xlsx
ts = __import__('datetime').datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = os.path.join(WORKSPACE_DIR, "output", f"phone_match_result_{ts}.xlsx")
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "匹配结果"
header_font = Font(bold=True, size=11)
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font_white = Font(bold=True, size=11, color="FFFFFF")
no_match_fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid")
# 写表头
headers = ["序号", "手机号", "匹配状态", "账号ID(account_id)", "加密手机号(tel_encrypt)"]
for col, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=h)
cell.font = header_font
cell.alignment = Alignment(horizontal="center")
row = 2
matched_count = 0
unmatched_count = 0
# 先写匹配成功的
for phone in phones:
if phone in matched:
aid, tel_m = matched[phone]
ws.cell(row=row, column=1, value=row - 1)
ws.cell(row=row, column=2, value=phone)
ws.cell(row=row, column=3, value="已匹配")
ws.cell(row=row, column=4, value=aid)
ws.cell(row=row, column=5, value=tel_encrypt)
matched_count += 1
row += 1
else:
unmatched_count += 1
# 再写未匹配的
r_start = row
for phone in phones:
if phone not in matched:
ws.cell(row=row, column=1, value=row - 1)
ws.cell(row=row, column=2, value=phone)
ws.cell(row=row, column=3, value="未匹配")
for col in range(1, 6):
ws.cell(row=row, column=col).fill = no_match_fill
row += 1
# 加统计行
row += 1
ws.cell(row=row, column=2, value="统计汇总").font = Font(bold=True)
row += 1
ws.cell(row=row, column=2, value=f"总手机号数: {len(phones)}")
row += 1
ws.cell(row=row, column=2, value=f"匹配成功: {matched_count} ({matched_count/len(phones)*100:.1f}%)")
row += 1
ws.cell(row=row, column=2, value=f"未匹配: {unmatched_count} ({unmatched_count/len(phones)*100:.1f}%)")
# 调整列宽
ws.column_dimensions['A'].width = 8
ws.column_dimensions['B'].width = 16
ws.column_dimensions['C'].width = 12
ws.column_dimensions['D'].width = 22
ws.column_dimensions['E'].width = 18
wb.save(output_path)
print(f"\n✅ 结果已保存: {output_path}")