271 lines
9.0 KiB
Python
271 lines
9.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Retry failed writes for 小龙 sheet - with rate limiting handling
|
|
"""
|
|
import json
|
|
import sys
|
|
import time
|
|
import re
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
sys.path.insert(0, '/root/.openclaw/workspace/scripts')
|
|
from phone_encrypt import encrypt_phone
|
|
|
|
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
|
|
SHEET_ID = "qJF4I"
|
|
FEISHU_APP_ID = "cli_a929ae22e0b8dcc8"
|
|
FEISHU_APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ"
|
|
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
|
|
PG_PORT = 28591
|
|
PG_USER = "ai_member"
|
|
PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*"
|
|
PG_DB = "vala_bi"
|
|
|
|
def get_token():
|
|
r = requests.post('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
|
|
json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET})
|
|
return r.json()['tenant_access_token']
|
|
|
|
TOKEN = get_token()
|
|
|
|
# Re-read the sheet to find empty cells that should have data
|
|
r = requests.get(
|
|
f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{SHEET_ID}!A3:V2512?valueRenderOption=ToString',
|
|
headers={'Authorization': f'Bearer {TOKEN}'}
|
|
)
|
|
rows = r.json()['data']['valueRange']['values']
|
|
print(f"Re-read {len(rows)} rows")
|
|
|
|
# Extract phones again
|
|
phone_map = {}
|
|
for i, row in enumerate(rows):
|
|
if len(row) > 4 and row[4]:
|
|
phone = str(row[4]).strip()
|
|
if re.match(r'^1\d{10}$', phone):
|
|
phone_map[i] = phone
|
|
|
|
print(f"Found {len(phone_map)} phones")
|
|
|
|
# Encrypt
|
|
enc_to_idx = {}
|
|
for idx, phone in phone_map.items():
|
|
try:
|
|
enc = encrypt_phone(phone)
|
|
enc_to_idx[enc] = idx
|
|
except:
|
|
pass
|
|
|
|
print(f"Encrypted {len(enc_to_idx)}")
|
|
|
|
# Match UIDs
|
|
conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB, connect_timeout=30)
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
|
enc_list = list(enc_to_idx.keys())
|
|
uid_map = {}
|
|
batch_size = 500
|
|
for start in range(0, len(enc_list), batch_size):
|
|
batch = enc_list[start:start+batch_size]
|
|
placeholders = ','.join(['%s'] * len(batch))
|
|
cur.execute(f"""
|
|
SELECT id, tel_encrypt, created_at::date, download_channel
|
|
FROM bi_vala_app_account
|
|
WHERE tel_encrypt IN ({placeholders})
|
|
AND status = 1 AND deleted_at IS NULL
|
|
""", batch)
|
|
for row in cur.fetchall():
|
|
enc = row['tel_encrypt']
|
|
if enc in enc_to_idx:
|
|
idx = enc_to_idx[enc]
|
|
uid_map[idx] = {'uid': row['id'], 'created_at': str(row['created_at']) if row['created_at'] else None, 'download_channel': row['download_channel']}
|
|
|
|
print(f"Matched {len(uid_map)} UIDs")
|
|
|
|
# Query trial counts
|
|
uid_list = list(set(v['uid'] for v in uid_map.values()))
|
|
trial_counts = {}
|
|
for start in range(0, len(uid_list), batch_size):
|
|
batch = uid_list[start:start+batch_size]
|
|
placeholders = ','.join(['%s'] * len(batch))
|
|
cur.execute(f"""
|
|
SELECT account_id, COUNT(*) as cnt FROM bi_user_course_detail
|
|
WHERE account_id IN ({placeholders}) AND expire_time IS NULL AND deleted_at IS NULL
|
|
GROUP BY account_id
|
|
""", batch)
|
|
for row in cur.fetchall():
|
|
trial_counts[row['account_id']] = row['cnt']
|
|
|
|
# Query orders
|
|
order_data = {}
|
|
for start in range(0, len(uid_list), batch_size):
|
|
batch = uid_list[start:start+batch_size]
|
|
placeholders = ','.join(['%s'] * len(batch))
|
|
cur.execute(f"""
|
|
SELECT o.account_id, o.trade_no, o.pay_success_date::date as pay_date,
|
|
o.key_from, o.pay_amount_int, o.order_status, o.out_trade_no
|
|
FROM bi_vala_order o
|
|
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 AND a.deleted_at IS NULL
|
|
WHERE o.account_id IN ({placeholders})
|
|
AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4)
|
|
ORDER BY o.account_id, o.pay_success_date
|
|
""", batch)
|
|
for row in cur.fetchall():
|
|
uid = row['account_id']
|
|
if uid not in order_data:
|
|
order_data[uid] = []
|
|
order_data[uid].append({
|
|
'trade_no': row['trade_no'], 'pay_date': str(row['pay_date']) if row['pay_date'] else None,
|
|
'key_from': row['key_from'], 'pay_amount_int': row['pay_amount_int'],
|
|
'order_status': row['order_status'], 'out_trade_no': row['out_trade_no']
|
|
})
|
|
|
|
# Query refunds
|
|
all_trade_nos = []
|
|
for uid, orders in order_data.items():
|
|
for o in orders:
|
|
all_trade_nos.append(o['trade_no'])
|
|
|
|
refund_map = {}
|
|
if all_trade_nos:
|
|
for start in range(0, len(all_trade_nos), batch_size):
|
|
batch = all_trade_nos[start:start+batch_size]
|
|
placeholders = ','.join(['%s'] * len(batch))
|
|
cur.execute(f"""
|
|
SELECT r.trade_no, r.refund_amount FROM bi_refund_order r
|
|
JOIN bi_vala_order o ON r.trade_no = o.trade_no AND o.order_status = 4
|
|
WHERE r.trade_no IN ({placeholders}) AND r.status = 3
|
|
""", batch)
|
|
for row in cur.fetchall():
|
|
refund_map[row['trade_no']] = int(float(row['refund_amount'])) if row['refund_amount'] else 0
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Build expected values
|
|
expected = {} # (row_idx, col_idx) -> value
|
|
for idx in range(len(rows)):
|
|
if idx not in uid_map:
|
|
continue
|
|
info = uid_map[idx]
|
|
uid = info['uid']
|
|
|
|
expected[(idx, 7)] = str(uid) # H
|
|
|
|
tc = trial_counts.get(uid, 0)
|
|
expected[(idx, 3)] = tc # D
|
|
|
|
if info['created_at']:
|
|
expected[(idx, 8)] = info['created_at'] # I
|
|
|
|
if info['download_channel']:
|
|
expected[(idx, 9)] = info['download_channel'] # J
|
|
|
|
orders = order_data.get(uid, [])
|
|
if orders:
|
|
expected[(idx, 10)] = '是' # K
|
|
first = orders[0]
|
|
if first['pay_date']:
|
|
expected[(idx, 11)] = first['pay_date'] # L
|
|
if first['key_from']:
|
|
expected[(idx, 12)] = first['key_from'] # M
|
|
|
|
total_gmv = sum(o['pay_amount_int'] for o in orders)
|
|
gmv_yuan = total_gmv / 100.0
|
|
if gmv_yuan > 0:
|
|
expected[(idx, 14)] = gmv_yuan # O
|
|
|
|
total_refund = sum(refund_map.get(o['trade_no'], 0) for o in orders)
|
|
if total_refund > 0:
|
|
expected[(idx, 15)] = total_refund # P (already yuan)
|
|
gsv = gmv_yuan - total_refund
|
|
expected[(idx, 16)] = gsv # Q
|
|
|
|
# Now check which cells are empty in the sheet
|
|
missing = []
|
|
for (idx, col), val in expected.items():
|
|
row_data = rows[idx] if idx < len(rows) else []
|
|
current = str(row_data[col]) if col < len(row_data) and row_data[col] is not None else ''
|
|
current = current.strip()
|
|
|
|
# Check if current value matches expected
|
|
expected_str = str(val).strip()
|
|
if current != expected_str:
|
|
missing.append((idx, col, val))
|
|
|
|
print(f"Found {len(missing)} missing/incorrect cells to retry")
|
|
|
|
# Write missing cells with longer delays
|
|
col_letters = 'ABCDEFGHIJKLMNOPQRSTUV'
|
|
def col_letter(idx):
|
|
return col_letters[idx] if idx < len(col_letters) else f'Col{idx}'
|
|
|
|
# Group by row
|
|
from collections import defaultdict
|
|
by_row = defaultdict(list)
|
|
for idx, col, val in missing:
|
|
by_row[idx].append((col, val))
|
|
|
|
# Sort each row's columns
|
|
for idx in by_row:
|
|
by_row[idx].sort(key=lambda x: x[0])
|
|
|
|
success = 0
|
|
fail = 0
|
|
total = len(by_row)
|
|
|
|
for i, (row_idx, cols) in enumerate(sorted(by_row.items())):
|
|
row_num = row_idx + 3
|
|
start_col = cols[0][0]
|
|
end_col = cols[-1][0]
|
|
start_letter = col_letter(start_col)
|
|
end_letter = col_letter(end_col)
|
|
range_str = f"{SHEET_ID}!{start_letter}{row_num}:{end_letter}{row_num}"
|
|
|
|
values = [None] * (end_col - start_col + 1)
|
|
for col, val in cols:
|
|
values[col - start_col] = val
|
|
|
|
body = {"valueRange": {"range": range_str, "values": [values]}}
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
r = requests.put(
|
|
f'https://open.feishu.cn/open-apis/spreadsheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values',
|
|
headers={'Authorization': f'Bearer {TOKEN}', 'Content-Type': 'application/json'},
|
|
json=body, timeout=15
|
|
)
|
|
resp = r.json()
|
|
if resp.get('code') == 0:
|
|
success += 1
|
|
break
|
|
elif resp.get('code') == 90217: # rate limit
|
|
if attempt < max_retries - 1:
|
|
wait = (attempt + 1) * 2
|
|
time.sleep(wait)
|
|
else:
|
|
fail += 1
|
|
print(f" FAILED (rate limit) [{range_str}]: {resp.get('msg')}")
|
|
else:
|
|
if attempt < max_retries - 1:
|
|
time.sleep(1)
|
|
else:
|
|
fail += 1
|
|
print(f" FAILED [{range_str}]: {resp.get('code')} {resp.get('msg')}")
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
time.sleep(1)
|
|
else:
|
|
fail += 1
|
|
print(f" EXCEPTION [{range_str}]: {e}")
|
|
|
|
if (i + 1) % 50 == 0:
|
|
print(f" Retry progress: {i+1}/{total} (success={success}, fail={fail})")
|
|
|
|
time.sleep(0.1) # Longer delay for retries
|
|
|
|
print(f"\n=== Retry Complete ===")
|
|
print(f"Success: {success}, Fail: {fail}")
|