ai_member_xiaoxi/scripts/batch_phone_to_id.py
2026-05-28 08:00:01 +08:00

146 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
批量手机号→用户ID匹配
输入: /tmp/sheet_id_data.json (飞书sheet原始数据)
输出: /tmp/sheet_id_results.json (回填数据)
"""
import json
import re
import os
import psycopg2
from datetime import datetime
DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
DB_PORT = 28591
DB_USER = "ai_member"
DB_NAME = "vala_bi"
def get_password():
pw = os.environ.get("PG_ONLINE_PASSWORD", "")
if pw:
return pw
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
if os.path.exists(secrets_path):
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
raise RuntimeError("PG_ONLINE_PASSWORD not found")
def mask_phone(phone):
phone = str(phone).strip()
if len(phone) >= 7:
return f"{phone[:3]}****{phone[-4:]}"
return phone
def match_phones_to_accounts(phones, conn):
"""手机号脱敏后匹配 bi_vala_app_account.tel"""
if not phones:
return {}, []
# 脱敏
masked_to_phones = {}
for p in phones:
m = mask_phone(p)
masked_to_phones.setdefault(m, []).append(p)
masks = list(masked_to_phones.keys())
phone_to_account = {}
# 分批查询 (每批最多500个)
for i in range(0, len(masks), 500):
batch = masks[i:i+500]
placeholders = ",".join(["%s"] * len(batch))
cur = conn.cursor()
cur.execute(f"""
SELECT id AS account_id, tel
FROM bi_vala_app_account
WHERE tel IN ({placeholders})
AND status = 1
AND deleted_at IS NULL
""", batch)
rows = cur.fetchall()
cur.close()
# masked -> account_id
masked_to_account = {r[1]: r[0] for r in rows}
for p_list in masked_to_phones.values():
for p in p_list:
m = mask_phone(p)
if m in masked_to_account:
phone_to_account[p] = masked_to_account[m]
return phone_to_account
def main():
# 读取数据
with open('/tmp/sheet_id_data.json') as f:
data = json.load(f)
rows = data['data']['valueRange']['values']
headers = rows[0]
# 找出待查询ID的行
pending_rows = []
for idx, row in enumerate(rows[1:], start=1): # row 0 is header, idx 1-based
if len(row) > 4 and row[4] == '待查询ID':
phone = row[1].strip() if len(row) > 1 and row[1] else ''
pending_rows.append({
'row_idx': idx + 1, # 1-based in sheet
'phone': phone,
'sales': row[0] if len(row) > 0 else '',
'month': row[2] if len(row) > 2 else '',
'query_key': row[3] if len(row) > 3 else '',
'notes': row[8] if len(row) > 8 else '',
})
print(f"待查询ID记录: {len(pending_rows)}")
# 提取唯一手机号
phones = list(set(r['phone'] for r in pending_rows if r['phone']))
print(f"唯一手机号: {len(phones)}")
# 匹配
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=get_password(), dbname=DB_NAME, connect_timeout=30)
phone_to_account = match_phones_to_accounts(phones, conn)
conn.close()
matched = sum(1 for p in phones if p in phone_to_account)
unmatched = len(phones) - matched
print(f"匹配成功: {matched}, 未匹配: {unmatched}")
# 生成结果
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
results = []
stats = {'matched': 0, 'unmatched': 0}
for r in pending_rows:
phone = r['phone']
if phone in phone_to_account:
uid = str(phone_to_account[phone])
results.append({
'row_idx': r['row_idx'],
'user_id': uid,
'status': '已回填',
'update_time': now_str,
})
stats['matched'] += 1
else:
results.append({
'row_idx': r['row_idx'],
'user_id': '',
'status': '未查到',
'update_time': now_str,
})
stats['unmatched'] += 1
# 保存结果
with open('/tmp/sheet_id_results.json', 'w') as f:
json.dump({'results': results, 'stats': stats, 'total': len(results)}, f, ensure_ascii=False, indent=2)
print(f"\n结果统计: 匹配 {stats['matched']}, 未查到 {stats['unmatched']}")
print(f"结果已保存到 /tmp/sheet_id_results.json")
if __name__ == "__main__":
main()