ai_member_xiaoxi/scripts/xiaolong_cleanup.py
2026-06-05 08:00:01 +08:00

253 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
Final cleanup: write ALL columns for matched rows to clear stale data
"""
import json, sys, time, re
import psycopg2, psycopg2.extras
import requests
sys.path.insert(0, '/root/.openclaw/workspace/scripts')
from phone_encrypt import encrypt_phone
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SHEET_ID = "qJF4I"
FEISHU_APP_ID = "cli_a929ae22e0b8dcc8"
FEISHU_APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ"
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*"
PG_DB = "vala_bi"
def get_token():
r = requests.post('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET})
return r.json()['tenant_access_token']
TOKEN = get_token()
# Read sheet
r = requests.get(
f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{SHEET_ID}!A3:V2512?valueRenderOption=ToString',
headers={'Authorization': f'Bearer {TOKEN}'}
)
rows = r.json()['data']['valueRange']['values']
# Extract phones
phone_map = {}
for i, row in enumerate(rows):
if len(row) > 4 and row[4]:
phone = str(row[4]).strip()
if re.match(r'^1\d{10}$', phone):
phone_map[i] = phone
# Encrypt
enc_to_idx = {}
for idx, phone in phone_map.items():
try:
enc = encrypt_phone(phone)
enc_to_idx[enc] = idx
except:
pass
# Match UIDs
conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB, connect_timeout=30)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
enc_list = list(enc_to_idx.keys())
uid_map = {}
batch_size = 500
for start in range(0, len(enc_list), batch_size):
batch = enc_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT id, tel_encrypt, created_at::date, download_channel
FROM bi_vala_app_account
WHERE tel_encrypt IN ({placeholders})
AND status = 1 AND deleted_at IS NULL
""", batch)
for row in cur.fetchall():
enc = row['tel_encrypt']
if enc in enc_to_idx:
idx = enc_to_idx[enc]
uid_map[idx] = {'uid': row['id'], 'created_at': str(row['created_at']) if row['created_at'] else None, 'download_channel': row['download_channel']}
# Trial counts
uid_list = list(set(v['uid'] for v in uid_map.values()))
trial_counts = {}
for start in range(0, len(uid_list), batch_size):
batch = uid_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT account_id, COUNT(*) as cnt FROM bi_user_course_detail
WHERE account_id IN ({placeholders}) AND expire_time IS NULL AND deleted_at IS NULL
GROUP BY account_id
""", batch)
for row in cur.fetchall():
trial_counts[row['account_id']] = row['cnt']
# Orders
order_data = {}
for start in range(0, len(uid_list), batch_size):
batch = uid_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT o.account_id, o.trade_no, o.pay_success_date::date as pay_date,
o.key_from, o.pay_amount_int, o.order_status, o.out_trade_no
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 AND a.deleted_at IS NULL
WHERE o.account_id IN ({placeholders})
AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4)
ORDER BY o.account_id, o.pay_success_date
""", batch)
for row in cur.fetchall():
uid = row['account_id']
if uid not in order_data:
order_data[uid] = []
order_data[uid].append({
'trade_no': row['trade_no'], 'pay_date': str(row['pay_date']) if row['pay_date'] else None,
'key_from': row['key_from'], 'pay_amount_int': row['pay_amount_int'],
'order_status': row['order_status'], 'out_trade_no': row['out_trade_no']
})
# Refunds
all_trade_nos = []
for uid, orders in order_data.items():
for o in orders:
all_trade_nos.append(o['trade_no'])
refund_map = {}
if all_trade_nos:
for start in range(0, len(all_trade_nos), batch_size):
batch = all_trade_nos[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT r.trade_no, r.refund_amount FROM bi_refund_order r
JOIN bi_vala_order o ON r.trade_no = o.trade_no AND o.order_status = 4
WHERE r.trade_no IN ({placeholders}) AND r.status = 3
""", batch)
for row in cur.fetchall():
refund_map[row['trade_no']] = int(float(row['refund_amount'])) if row['refund_amount'] else 0
cur.close()
conn.close()
# Build complete row data for ALL matched rows
# For each row, compute ALL columns D(3), H(7), I(8), J(9), K(10), L(11), M(12), O(14), P(15), Q(16)
# Use '' for empty cells to clear stale data
col_letters = 'ABCDEFGHIJKLMNOPQRSTUV'
# Find rows where we need to write (any column differs from current)
writes_needed = []
for idx in range(len(rows)):
if idx not in uid_map:
continue
info = uid_map[idx]
uid = info['uid']
# Build expected values for all relevant columns
row_expected = {}
# D(3): trial count
row_expected[3] = trial_counts.get(uid, 0)
# H(7): UID
row_expected[7] = str(uid)
# I(8): register date
row_expected[8] = info['created_at'] if info['created_at'] else ''
# J(9): download channel
row_expected[9] = info['download_channel'] if info['download_channel'] else ''
# K(10): has order
orders = order_data.get(uid, [])
if orders:
row_expected[10] = ''
first = orders[0]
row_expected[11] = first['pay_date'] if first['pay_date'] else ''
row_expected[12] = first['key_from'] if first['key_from'] else ''
total_gmv = sum(o['pay_amount_int'] for o in orders)
gmv_yuan = total_gmv / 100.0
row_expected[14] = gmv_yuan if gmv_yuan > 0 else ''
total_refund = sum(refund_map.get(o['trade_no'], 0) for o in orders)
if total_refund > 0:
row_expected[15] = total_refund
row_expected[16] = gmv_yuan - total_refund
else:
row_expected[15] = ''
row_expected[16] = ''
else:
# No orders - clear K/O/P/Q
row_expected[10] = ''
row_expected[11] = ''
row_expected[12] = ''
row_expected[14] = ''
row_expected[15] = ''
row_expected[16] = ''
# Check which columns actually need updating
row_data = rows[idx] if idx < len(rows) else []
for col, val in row_expected.items():
current = str(row_data[col]) if col < len(row_data) and row_data[col] is not None else ''
current = current.strip()
expected_str = str(val).strip()
if current != expected_str:
writes_needed.append((idx, col, val))
print(f"Cells needing update: {len(writes_needed)}")
# Write single cells
success = 0
fail = 0
for i, (row_idx, col_idx, val) in enumerate(writes_needed):
row_num = row_idx + 3
col = col_letters[col_idx]
range_str = f"{SHEET_ID}!{col}{row_num}:{col}{row_num}"
# For empty values, write empty string
body = {"valueRange": {"range": range_str, "values": [[val]]}}
for attempt in range(3):
try:
r = requests.put(
f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values',
headers={'Authorization': f'Bearer {TOKEN}', 'Content-Type': 'application/json'},
json=body, timeout=15
)
resp = r.json()
if resp.get('code') == 0:
success += 1
break
elif resp.get('code') == 90217:
time.sleep((attempt + 1) * 3)
elif resp.get('code') == 99991663:
TOKEN = get_token()
time.sleep(0.5)
else:
if attempt < 2:
time.sleep(1)
else:
fail += 1
print(f" FAIL [{range_str}]: {resp.get('code')} {resp.get('msg')}")
except Exception as e:
if attempt < 2:
time.sleep(1)
else:
fail += 1
print(f" EXCEPTION [{range_str}]: {e}")
if (i + 1) % 200 == 0:
print(f" Progress: {i+1}/{len(writes_needed)} (success={success}, fail={fail})")
time.sleep(0.15)
print(f"\n=== Cleanup Complete ===")
print(f"Success: {success}, Fail: {fail}")