#!/usr/bin/env python3 """ 小龙 sheet (qJF4I) 全量刷新脚本 读取 A3:V2512 → 加密手机号 → 匹配UID → 查询各列 → 回写飞书 """ import json import sys import time import re import psycopg2 import psycopg2.extras import requests sys.path.insert(0, '/root/.openclaw/workspace/scripts') from phone_encrypt import encrypt_phone # ── Config ── SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SHEET_ID = "qJF4I" FEISHU_APP_ID = "cli_a929ae22e0b8dcc8" FEISHU_APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ" PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*" PG_DB = "vala_bi" # ── Feishu token ── def get_token(): r = requests.post('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}) return r.json()['tenant_access_token'] TOKEN = get_token() # ── Load raw data ── with open('/tmp/xiaolong_raw.json') as f: raw = json.load(f) rows = raw['data']['valueRange']['values'] print(f"Loaded {len(rows)} rows from sheet") # ── Extract phone numbers (column E, index 4) ── phone_map = {} # row_index -> phone row_phones = [] # (row_index, phone) for i, row in enumerate(rows): if len(row) > 4 and row[4]: phone = str(row[4]).strip() if re.match(r'^1\d{10}$', phone): phone_map[i] = phone row_phones.append((i, phone)) print(f"Found {len(phone_map)} valid phone numbers") # ── Encrypt phones ── enc_to_phone = {} for idx, phone in row_phones: try: enc = encrypt_phone(phone) enc_to_phone[enc] = (idx, phone) except Exception as e: print(f" Encrypt error for {phone}: {e}") print(f"Encrypted {len(enc_to_phone)} phones") # ── PostgreSQL: match UIDs ── conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB, connect_timeout=30 ) cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Batch match: get all tel_encrypt values that match enc_list = list(enc_to_phone.keys()) uid_map = {} # row_index -> account_id batch_size = 500 for start in range(0, len(enc_list), batch_size): batch = enc_list[start:start+batch_size] placeholders = ','.join(['%s'] * len(batch)) cur.execute(f""" SELECT id, tel_encrypt, created_at::date, download_channel FROM bi_vala_app_account WHERE tel_encrypt IN ({placeholders}) AND status = 1 AND deleted_at IS NULL """, batch) for row in cur.fetchall(): enc = row['tel_encrypt'] if enc in enc_to_phone: idx, phone = enc_to_phone[enc] uid_map[idx] = { 'uid': row['id'], 'created_at': str(row['created_at']) if row['created_at'] else None, 'download_channel': row['download_channel'] } print(f"Matched {len(uid_map)} UIDs from PostgreSQL") # ── Query D column: trial course count ── uid_list = list(set(v['uid'] for v in uid_map.values())) trial_counts = {} # uid -> count for start in range(0, len(uid_list), batch_size): batch = uid_list[start:start+batch_size] placeholders = ','.join(['%s'] * len(batch)) cur.execute(f""" SELECT account_id, COUNT(*) as cnt FROM bi_user_course_detail WHERE account_id IN ({placeholders}) AND expire_time IS NULL AND deleted_at IS NULL GROUP BY account_id """, batch) for row in cur.fetchall(): trial_counts[row['account_id']] = row['cnt'] print(f"Got trial counts for {len(trial_counts)} users") # ── Query K-V: orders ── # K: has order (是/空), L: pay_success_date, M: key_from, O: GMV order_data = {} # uid -> list of orders for start in range(0, len(uid_list), batch_size): batch = uid_list[start:start+batch_size] placeholders = ','.join(['%s'] * len(batch)) cur.execute(f""" SELECT o.account_id, o.trade_no, o.pay_success_date::date as pay_date, o.key_from, o.pay_amount_int, o.order_status, o.out_trade_no FROM bi_vala_order o JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 AND a.deleted_at IS NULL WHERE o.account_id IN ({placeholders}) AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4) ORDER BY o.account_id, o.pay_success_date """, batch) for row in cur.fetchall(): uid = row['account_id'] if uid not in order_data: order_data[uid] = [] order_data[uid].append({ 'trade_no': row['trade_no'], 'pay_date': str(row['pay_date']) if row['pay_date'] else None, 'key_from': row['key_from'], 'pay_amount_int': row['pay_amount_int'], 'order_status': row['order_status'], 'out_trade_no': row['out_trade_no'] }) print(f"Got orders for {len(order_data)} users") # ── Query refunds ── # Get all trade_nos from orders all_trade_nos = [] for uid, orders in order_data.items(): for o in orders: all_trade_nos.append(o['trade_no']) refund_map = {} # trade_no -> refund_amount if all_trade_nos: for start in range(0, len(all_trade_nos), batch_size): batch = all_trade_nos[start:start+batch_size] placeholders = ','.join(['%s'] * len(batch)) cur.execute(f""" SELECT r.trade_no, r.refund_amount FROM bi_refund_order r JOIN bi_vala_order o ON r.trade_no = o.trade_no AND o.order_status = 4 WHERE r.trade_no IN ({placeholders}) AND r.status = 3 """, batch) for row in cur.fetchall(): refund_map[row['trade_no']] = int(float(row['refund_amount'])) if row['refund_amount'] else 0 print(f"Got refunds for {len(refund_map)} trade_nos") cur.close() conn.close() # ── Build write data ── # Columns: A=0,B=1,C=2,D=3,E=4,F=5,G=6,H=7,I=8,J=9,K=10,L=11,M=12,N=13,O=14,P=15,Q=16,R=17,S=18,T=19,U=20,V=21 # H(7): UID, D(3): trial count, I(8): register date, J(9): download channel # K(10): has order, L(11): pay date, M(12): key_from, O(14): GMV, P(15): refund, Q(16): GSV writes = [] # (row_index, col_index, value) stats = { 'H': 0, 'D': 0, 'I': 0, 'J': 0, 'K': 0, 'L': 0, 'M': 0, 'O': 0, 'P': 0, 'Q': 0 } suspicious_refund_rows = [] # rows with refund but order_status=3 (shouldn't happen per filter) for idx in range(len(rows)): if idx not in uid_map: continue info = uid_map[idx] uid = info['uid'] # H: UID writes.append((idx, 7, str(uid))) stats['H'] += 1 # D: trial count tc = trial_counts.get(uid, 0) writes.append((idx, 3, tc)) stats['D'] += 1 # I: register date if info['created_at']: writes.append((idx, 8, info['created_at'])) stats['I'] += 1 # J: download channel if info['download_channel']: writes.append((idx, 9, info['download_channel'])) stats['J'] += 1 # K-V: orders orders = order_data.get(uid, []) if orders: writes.append((idx, 10, '是')) stats['K'] += 1 # L: first pay_success_date first_order = orders[0] if first_order['pay_date']: writes.append((idx, 11, first_order['pay_date'])) stats['L'] += 1 # M: first key_from if first_order['key_from']: writes.append((idx, 12, first_order['key_from'])) stats['M'] += 1 # O: total GMV = sum(pay_amount_int/100) total_gmv = 0 for o in orders: total_gmv += o['pay_amount_int'] gmv_yuan = total_gmv / 100.0 if gmv_yuan > 0: writes.append((idx, 14, gmv_yuan)) stats['O'] += 1 # P: total refund total_refund = 0 for o in orders: refund_amt = refund_map.get(o['trade_no'], 0) total_refund += refund_amt # Check for suspicious: refund exists but order_status=3 if refund_amt > 0 and o['order_status'] == 3: suspicious_refund_rows.append(idx + 3) # +3 for 1-based row refund_yuan = int(total_refund) # refund_amount already in yuan if refund_yuan > 0: writes.append((idx, 15, int(refund_yuan))) stats['P'] += 1 # Q: GSV = O - P gsv = gmv_yuan - refund_yuan if gsv > 0: writes.append((idx, 16, gsv)) stats['Q'] += 1 elif gsv == 0: # GSV is 0, still write it writes.append((idx, 16, 0)) stats['Q'] += 1 print(f"\n=== Stats ===") print(f"H (UID): {stats['H']}") print(f"D (trial): {stats['D']}") print(f"I (register): {stats['I']}") print(f"J (channel): {stats['J']}") print(f"K (has order): {stats['K']}") print(f"L (pay date): {stats['L']}") print(f"M (key_from): {stats['M']}") print(f"O (GMV): {stats['O']}") print(f"P (refund): {stats['P']}") print(f"Q (GSV): {stats['Q']}") print(f"Total writes: {len(writes)}") if suspicious_refund_rows: print(f"Suspicious refund rows: {suspicious_refund_rows}") # ── Write back to Feishu ── # Group writes by row and column range for batch efficiency # We'll write row by row for simplicity, but merge consecutive columns # Actually, let's group by row and write per-row ranges # Sort writes by row then column writes.sort(key=lambda x: (x[0], x[1])) # Group consecutive columns in same row def group_writes(writes): """Group writes into ranges of consecutive columns per row""" if not writes: return [] groups = [] current = [writes[0]] for w in writes[1:]: last = current[-1] if w[0] == last[0] and w[1] == last[1] + 1: current.append(w) else: groups.append(current) current = [w] groups.append(current) return groups groups = group_writes(writes) print(f"\nGrouped into {len(groups)} write ranges") # Write in batches col_letters = 'ABCDEFGHIJKLMNOPQRSTUV' def col_letter(idx): return col_letters[idx] if idx < len(col_letters) else f'Col{idx}' batch_writes = [] for g in groups: row_idx = g[0][0] row_num = row_idx + 3 # 1-based row in sheet start_col = g[0][1] end_col = g[-1][1] start_letter = col_letter(start_col) end_letter = col_letter(end_col) range_str = f"{SHEET_ID}!{start_letter}{row_num}:{end_letter}{row_num}" # Build values: one row with values for each column in range values = [None] * (end_col - start_col + 1) for w in g: values[w[1] - start_col] = w[2] batch_writes.append((range_str, [values])) print(f"Prepared {len(batch_writes)} batch writes") # Execute writes success = 0 fail = 0 for i, (range_str, values) in enumerate(batch_writes): body = {"valueRange": {"range": range_str, "values": values}} try: r = requests.put( f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values', headers={'Authorization': f'Bearer {TOKEN}', 'Content-Type': 'application/json'}, json=body, timeout=15 ) resp = r.json() if resp.get('code') == 0: success += 1 else: fail += 1 if fail <= 5: print(f" Write error [{range_str}]: {resp.get('code')} {resp.get('msg')}") except Exception as e: fail += 1 if fail <= 5: print(f" Exception [{range_str}]: {e}") if (i + 1) % 100 == 0: print(f" Progress: {i+1}/{len(batch_writes)} (success={success}, fail={fail})") time.sleep(0.03) print(f"\n=== Write Complete ===") print(f"Success: {success}, Fail: {fail}") # ── Final summary ── print(f"\n=== FINAL SUMMARY ===") print(f"H列(UID): {stats['H']} 行") print(f"D列(体验节数): {stats['D']} 行") print(f"I列(注册日期): {stats['I']} 行") print(f"J列(下载渠道): {stats['J']} 行") print(f"K列(有订单): {stats['K']} 行") print(f"L列(下单日): {stats['L']} 行") print(f"M列(成交渠道): {stats['M']} 行") print(f"O列(GMV): {stats['O']} 行") print(f"P列(退款): {stats['P']} 行") print(f"Q列(GSV): {stats['Q']} 行") if suspicious_refund_rows: print(f"疑似假退款行号: {suspicious_refund_rows}") else: print("疑似假退款: 无")