#!/usr/bin/env python3 """ 小红书用户行课全量表 — 每日刷新 执行频率:每天 6:00 AM 归属 Agent:小溪 (xiaoxi) 数据来源:bi_vala_app_account + bi_vala_app_character + bi_user_chapter_play_record (8分表) + bi_user_component_play_record (8分表) + bi_vala_order + bi_refund_order + bi_user_course_detail + bi_vala_seasonal_ticket 筛选条件:从飞书表格 Yzs0sPw2KhZ03gtKmuuctZ5LnJf (Sheet 75934f) 读取 uid 清单,按 uid 出全量表 不再按订单渠道/付费状态筛选,所有 uid 一律纳入 输出:飞书表格 Af1psbiYphO5N0txTkAcnJnInmc (Sheet 630066) """ import json, requests, os, psycopg2, sys from datetime import datetime from collections import defaultdict from psycopg2.extras import execute_values # ── 配置 ── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_DB = "vala_bi" CRED_DIR = "/root/.openclaw/credentials/xiaoxi" SPREADSHEET_TOKEN = "Af1psbiYphO5N0txTkAcnJnInmc" SHEET_ID = "630066" LOG_FILE = "/var/log/xiaoxi_xhs_table_refresh.log" HEADERS = [ '用户ID(account_id)', '我方脱敏手机号(tel)', '对方脱敏手机号', '销售归属', '进线日期', '注册时间', '渠道来源', '角色昵称', '课程等级', '课程类型', '首课时间', '首课章节', '体验L01时间', '体验L02时间', '体验L03时间', '体验L04时间', '体验L05时间', '已完体验节数', '最近行课时间', '最近行课章节', '学习总时长(分钟)', '付费状态', '首次付费时间', '总GMV', '总GSV', '付费渠道', '最近登录时间' ] def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_pg_password(): secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env") with open(secrets_path) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def fmt(val): if val is None: return '' if isinstance(val, datetime): return val.strftime('%Y-%m-%d %H:%M:%S') return str(val) def main(): log("=" * 50) log("小红书用户行课全量表刷新 启动") try: conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=get_pg_password(), dbname=PG_DB, connect_timeout=60 ) cur = conn.cursor() # Step 1: Read uid list from Feishu sheet (user-maintained) log("Step 1: 读取 uid 清单") UID_SHEET_TOKEN = 'Yzs0sPw2KhZ03gtKmuuctZ5LnJf' UID_SHEET_ID = '75934f' resp = requests.get( f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{UID_SHEET_TOKEN}/values/{UID_SHEET_ID}', headers={'Authorization': f'Bearer {token}'}, timeout=30 ) uid_values = resp.json()['data']['valueRange']['values'] aids = [] uid_meta = {} # aid -> {their_phone, sales, lead_date} for row in uid_values[1:]: if row and row[0]: try: aid = int(str(row[0]).strip()) aids.append(aid) uid_meta[aid] = { 'their_phone': row[1] if len(row) > 1 else '', 'sales': row[2] if len(row) > 2 else '', 'lead_date': row[3] if len(row) > 3 else '', } except: pass aids = sorted(set(aids)) log(f" uid 清单: {len(aids)} 个") if not aids: log("无小红书用户,退出") return 0 # Temp table cur.execute("CREATE TEMP TABLE _tmp_xhs_aids (id int PRIMARY KEY) ON COMMIT DROP") execute_values(cur, "INSERT INTO _tmp_xhs_aids (id) VALUES %s", [(a,) for a in aids]) # Step 2: Account info log("Step 2: 账户信息") cur.execute(""" SELECT id, tel, created_at, download_channel FROM bi_vala_app_account WHERE id IN (SELECT id FROM _tmp_xhs_aids) AND status = 1 AND deleted_at IS NULL """) accounts = {} for aid, tel, cat, ch in cur.fetchall(): accounts[aid] = {'tel': tel or '', 'reg_time': cat, 'channel': ch or ''} # Step 3: Characters log("Step 3: 角色信息") cur.execute(""" SELECT account_id, id, nickname, latest_login FROM bi_vala_app_character WHERE account_id IN (SELECT id FROM _tmp_xhs_aids) AND nickname IS NOT NULL AND nickname != '' AND deleted_at IS NULL """) account_chars = defaultdict(list) account_nicknames = defaultdict(list) latest_login = {} all_char_ids = [] for aid, cid, nick, ll in cur.fetchall(): account_chars[aid].append(cid) account_nicknames[aid].append(nick) all_char_ids.append(cid) if ll: if aid not in latest_login or ll > latest_login[aid]: latest_login[aid] = ll # Step 4: Chapter plays log("Step 4: 课时记录") cur.execute("CREATE TEMP TABLE _tmp_xhs_cids (id int PRIMARY KEY) ON COMMIT DROP") execute_values(cur, "INSERT INTO _tmp_xhs_cids (id) VALUES %s", [(c,) for c in all_char_ids]) char_plays = defaultdict(list) for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute(f""" SELECT cpr.user_id, cpr.chapter_id, cpr.created_at FROM {table} cpr JOIN _tmp_xhs_cids t ON cpr.user_id = t.id WHERE cpr.play_status = 1 AND cpr.deleted_at IS NULL """) for uid, ch_id, cat in cur.fetchall(): char_plays[uid].append((ch_id, cat)) except Exception as e: pass log(f" 有记录角色: {len(char_plays)}") # Step 5: U00 chapter map log("Step 5: 课程映射") cur.execute(""" SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE course_unit LIKE '%U00%' """) u00_map = {r[0]: (r[1], r[2], r[3], r[4]) for r in cur.fetchall()} # Step 6: Study time log("Step 6: 学习耗时") char_total_ms = defaultdict(int) for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute(f""" SELECT comp.user_id, SUM(COALESCE(comp.interval_time, 0)) FROM {table} comp JOIN _tmp_xhs_cids t ON comp.user_id = t.id WHERE comp.deleted_at IS NULL GROUP BY comp.user_id """) for uid, tms in cur.fetchall(): char_total_ms[uid] += (tms or 0) except: pass # Step 7: Orders & refunds log("Step 7: 订单与退费") cur.execute(""" SELECT o.account_id, o.pay_success_date, o.pay_amount_int, o.order_status, CASE WHEN r.status = 3 AND o.order_status = 4 THEN 1 ELSE 0 END as is_refunded FROM bi_vala_order o LEFT JOIN bi_refund_order r ON o.trade_no = r.trade_no WHERE o.account_id IN (SELECT id FROM _tmp_xhs_aids) AND o.deleted_at IS NULL """) orders = defaultdict(list) for aid, psd, pai, os, is_ref in cur.fetchall(): orders[aid].append({'pay_date': psd, 'amount': pai, 'status': os, 'refunded': is_ref}) # Step 8: Course details log("Step 8: 课程分配") cur.execute(""" SELECT account_id, course_level, CASE WHEN expire_time IS NULL THEN '体验课' ELSE '正式课' END FROM bi_user_course_detail WHERE account_id IN (SELECT id FROM _tmp_xhs_aids) AND deleted_at IS NULL """) course_info = {} for aid, cl, ct in cur.fetchall(): if aid not in course_info: course_info[aid] = (cl, ct) cur.close() conn.close() # Step 9: Assemble log("Step 9: 组装数据") results = [] for aid in sorted(aids): acc = accounts.get(aid) if not acc: continue chars = account_chars.get(aid, []) first_lesson = None first_chapter = None u00_completed = set() recent_lesson = None recent_chapter = None for cid in chars: for ch_id, cat in char_plays.get(cid, []): if ch_id in u00_map: cl, cs, cu, cl2 = u00_map[ch_id] u00_completed.add(cl2) if first_lesson is None or cat < first_lesson: first_lesson = cat first_chapter = (cl, cs, cu, cl2) elif first_lesson is None: first_lesson = cat if recent_lesson is None or cat > recent_lesson: recent_lesson = cat if ch_id in u00_map: recent_chapter = u00_map[ch_id] u00_times = {} for cid in chars: for ch_id, cat in char_plays.get(cid, []): if ch_id in u00_map: cl2 = u00_map[ch_id][3] if cl2 not in u00_times or cat < u00_times[cl2]: u00_times[cl2] = cat total_min = round(sum(char_total_ms.get(cid, 0) for cid in chars) / 60000, 1) user_orders = orders.get(aid, []) has_paid = any(o['pay_date'] and o['status'] in (3, 4) for o in user_orders) refunded_count = sum(1 for o in user_orders if o['refunded']) all_refunded = refunded_count > 0 and all(o['refunded'] or not o['pay_date'] or o['status'] not in (3, 4) for o in user_orders) pay_status = '已付费' if not has_paid: pay_status = '未付费' elif all_refunded: pay_status = '已退款' elif refunded_count > 0: pay_status = '已付费(部分退费)' first_pay = None total_gmv = 0 total_refund = 0 for o in user_orders: if o['pay_date']: if first_pay is None or o['pay_date'] < first_pay: first_pay = o['pay_date'] amt = o['amount'] / 100 total_gmv += amt if o['refunded']: total_refund += amt ci = course_info.get(aid) nicknames = ' / '.join(account_nicknames.get(aid, [])) meta = uid_meta.get(aid, {}) results.append([ str(aid), # A acc['tel'] if acc else '', # B meta.get('their_phone', ''), # C meta.get('sales', ''), # D meta.get('lead_date', ''), # E fmt(acc['reg_time'] if acc else None), # F acc['channel'] if acc else '', # G nicknames, # H ci[0] if ci else '', # I ci[1] if ci else '', # J fmt(first_lesson), # K f"{first_chapter[0]}-{first_chapter[1]}-{first_chapter[2]}-{first_chapter[3]}" if first_chapter else '', # L fmt(u00_times.get('L01')), # M fmt(u00_times.get('L02')), # N fmt(u00_times.get('L03')), # O fmt(u00_times.get('L04')), # P fmt(u00_times.get('L05')), # Q str(len(u00_completed)), # R fmt(recent_lesson), # S f"{recent_chapter[0]}-{recent_chapter[1]}-{recent_chapter[2]}-{recent_chapter[3]}" if recent_chapter else '', # T str(total_min), # U pay_status, # V fmt(first_pay), # W str(round(total_gmv, 2)), # X str(round(total_gmv - total_refund, 2)), # Y ' / '.join(pay_channels) if pay_channels else '', # Z fmt(latest_login.get(aid)), # AA ]) # Step 10: Write to sheet log(f"Step 10: 写入表格 ({len(results)} 行)") token = get_fs_token() values = [HEADERS] + results total_rows = len(values) range_str = f"A1:AA{total_rows}" resp = requests.put( f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values', headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}, json={'valueRange': {'range': f'{SHEET_ID}!{range_str}', 'values': values}}, timeout=120 ) r = resp.json() if r.get('code') == 0: log(f"✅ 刷新完成: {len(results)} 行, {len(HEADERS)} 列") else: log(f"❌ 写入失败: {r}") return 1 # Step 11: Log summary paid_count = sum(1 for r in results if r[18] not in ('未付费',)) first_lesson_count = sum(1 for r in results if r[7]) log(f"摘要: 总{len(results)} | 有首课{first_lesson_count} | 付费{paid_count}") return 0 except Exception as e: log(f"ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())