ai_member_xiaoxi/scripts/xiaolong_retry.py
2026-06-05 08:00:01 +08:00

271 lines
9.0 KiB
Python

#!/usr/bin/env python3
"""
Retry failed writes for 小龙 sheet - with rate limiting handling
"""
import json
import sys
import time
import re
import psycopg2
import psycopg2.extras
import requests
sys.path.insert(0, '/root/.openclaw/workspace/scripts')
from phone_encrypt import encrypt_phone
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SHEET_ID = "qJF4I"
FEISHU_APP_ID = "cli_a929ae22e0b8dcc8"
FEISHU_APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ"
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*"
PG_DB = "vala_bi"
def get_token():
r = requests.post('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET})
return r.json()['tenant_access_token']
TOKEN = get_token()
# Re-read the sheet to find empty cells that should have data
r = requests.get(
f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{SHEET_ID}!A3:V2512?valueRenderOption=ToString',
headers={'Authorization': f'Bearer {TOKEN}'}
)
rows = r.json()['data']['valueRange']['values']
print(f"Re-read {len(rows)} rows")
# Extract phones again
phone_map = {}
for i, row in enumerate(rows):
if len(row) > 4 and row[4]:
phone = str(row[4]).strip()
if re.match(r'^1\d{10}$', phone):
phone_map[i] = phone
print(f"Found {len(phone_map)} phones")
# Encrypt
enc_to_idx = {}
for idx, phone in phone_map.items():
try:
enc = encrypt_phone(phone)
enc_to_idx[enc] = idx
except:
pass
print(f"Encrypted {len(enc_to_idx)}")
# Match UIDs
conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB, connect_timeout=30)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
enc_list = list(enc_to_idx.keys())
uid_map = {}
batch_size = 500
for start in range(0, len(enc_list), batch_size):
batch = enc_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT id, tel_encrypt, created_at::date, download_channel
FROM bi_vala_app_account
WHERE tel_encrypt IN ({placeholders})
AND status = 1 AND deleted_at IS NULL
""", batch)
for row in cur.fetchall():
enc = row['tel_encrypt']
if enc in enc_to_idx:
idx = enc_to_idx[enc]
uid_map[idx] = {'uid': row['id'], 'created_at': str(row['created_at']) if row['created_at'] else None, 'download_channel': row['download_channel']}
print(f"Matched {len(uid_map)} UIDs")
# Query trial counts
uid_list = list(set(v['uid'] for v in uid_map.values()))
trial_counts = {}
for start in range(0, len(uid_list), batch_size):
batch = uid_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT account_id, COUNT(*) as cnt FROM bi_user_course_detail
WHERE account_id IN ({placeholders}) AND expire_time IS NULL AND deleted_at IS NULL
GROUP BY account_id
""", batch)
for row in cur.fetchall():
trial_counts[row['account_id']] = row['cnt']
# Query orders
order_data = {}
for start in range(0, len(uid_list), batch_size):
batch = uid_list[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT o.account_id, o.trade_no, o.pay_success_date::date as pay_date,
o.key_from, o.pay_amount_int, o.order_status, o.out_trade_no
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 AND a.deleted_at IS NULL
WHERE o.account_id IN ({placeholders})
AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4)
ORDER BY o.account_id, o.pay_success_date
""", batch)
for row in cur.fetchall():
uid = row['account_id']
if uid not in order_data:
order_data[uid] = []
order_data[uid].append({
'trade_no': row['trade_no'], 'pay_date': str(row['pay_date']) if row['pay_date'] else None,
'key_from': row['key_from'], 'pay_amount_int': row['pay_amount_int'],
'order_status': row['order_status'], 'out_trade_no': row['out_trade_no']
})
# Query refunds
all_trade_nos = []
for uid, orders in order_data.items():
for o in orders:
all_trade_nos.append(o['trade_no'])
refund_map = {}
if all_trade_nos:
for start in range(0, len(all_trade_nos), batch_size):
batch = all_trade_nos[start:start+batch_size]
placeholders = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT r.trade_no, r.refund_amount FROM bi_refund_order r
JOIN bi_vala_order o ON r.trade_no = o.trade_no AND o.order_status = 4
WHERE r.trade_no IN ({placeholders}) AND r.status = 3
""", batch)
for row in cur.fetchall():
refund_map[row['trade_no']] = int(float(row['refund_amount'])) if row['refund_amount'] else 0
cur.close()
conn.close()
# Build expected values
expected = {} # (row_idx, col_idx) -> value
for idx in range(len(rows)):
if idx not in uid_map:
continue
info = uid_map[idx]
uid = info['uid']
expected[(idx, 7)] = str(uid) # H
tc = trial_counts.get(uid, 0)
expected[(idx, 3)] = tc # D
if info['created_at']:
expected[(idx, 8)] = info['created_at'] # I
if info['download_channel']:
expected[(idx, 9)] = info['download_channel'] # J
orders = order_data.get(uid, [])
if orders:
expected[(idx, 10)] = '' # K
first = orders[0]
if first['pay_date']:
expected[(idx, 11)] = first['pay_date'] # L
if first['key_from']:
expected[(idx, 12)] = first['key_from'] # M
total_gmv = sum(o['pay_amount_int'] for o in orders)
gmv_yuan = total_gmv / 100.0
if gmv_yuan > 0:
expected[(idx, 14)] = gmv_yuan # O
total_refund = sum(refund_map.get(o['trade_no'], 0) for o in orders)
if total_refund > 0:
expected[(idx, 15)] = total_refund # P (already yuan)
gsv = gmv_yuan - total_refund
expected[(idx, 16)] = gsv # Q
# Now check which cells are empty in the sheet
missing = []
for (idx, col), val in expected.items():
row_data = rows[idx] if idx < len(rows) else []
current = str(row_data[col]) if col < len(row_data) and row_data[col] is not None else ''
current = current.strip()
# Check if current value matches expected
expected_str = str(val).strip()
if current != expected_str:
missing.append((idx, col, val))
print(f"Found {len(missing)} missing/incorrect cells to retry")
# Write missing cells with longer delays
col_letters = 'ABCDEFGHIJKLMNOPQRSTUV'
def col_letter(idx):
return col_letters[idx] if idx < len(col_letters) else f'Col{idx}'
# Group by row
from collections import defaultdict
by_row = defaultdict(list)
for idx, col, val in missing:
by_row[idx].append((col, val))
# Sort each row's columns
for idx in by_row:
by_row[idx].sort(key=lambda x: x[0])
success = 0
fail = 0
total = len(by_row)
for i, (row_idx, cols) in enumerate(sorted(by_row.items())):
row_num = row_idx + 3
start_col = cols[0][0]
end_col = cols[-1][0]
start_letter = col_letter(start_col)
end_letter = col_letter(end_col)
range_str = f"{SHEET_ID}!{start_letter}{row_num}:{end_letter}{row_num}"
values = [None] * (end_col - start_col + 1)
for col, val in cols:
values[col - start_col] = val
body = {"valueRange": {"range": range_str, "values": [values]}}
max_retries = 3
for attempt in range(max_retries):
try:
r = requests.put(
f'https://open.feishu.cn/open-apis/spreadsheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values',
headers={'Authorization': f'Bearer {TOKEN}', 'Content-Type': 'application/json'},
json=body, timeout=15
)
resp = r.json()
if resp.get('code') == 0:
success += 1
break
elif resp.get('code') == 90217: # rate limit
if attempt < max_retries - 1:
wait = (attempt + 1) * 2
time.sleep(wait)
else:
fail += 1
print(f" FAILED (rate limit) [{range_str}]: {resp.get('msg')}")
else:
if attempt < max_retries - 1:
time.sleep(1)
else:
fail += 1
print(f" FAILED [{range_str}]: {resp.get('code')} {resp.get('msg')}")
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1)
else:
fail += 1
print(f" EXCEPTION [{range_str}]: {e}")
if (i + 1) % 50 == 0:
print(f" Retry progress: {i+1}/{total} (success={success}, fail={fail})")
time.sleep(0.1) # Longer delay for retries
print(f"\n=== Retry Complete ===")
print(f"Success: {success}, Fail: {fail}")