#!/usr/bin/env python3 """ 全覆盖刷新 — 销售表行课状态 + 过程数据累计人数 + 订单汇总 每次执行时对所有已有 user_id 的行做全量覆盖,不依赖「待查询」标记。 覆盖范围: 1. 销售表 (f975f0/qJF4I/qJF4J) — D/I/J/K/L/M/N/O/P/Q/R/S/T/U/V 列 2. 过程数据 (3aOvV6) — H/L/P/T/X (上课人数) + J/N/R/V/Z (转化人数) 3. 订单汇总 (2smjwA) — 新增订单追加写入,按(销售归属+微信昵称+金额)去重 """ import json, requests, os, sys, psycopg2, time from datetime import datetime from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone # ── 配置 ── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_DB = "vala_bi" SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" CRED_DIR = "/root/.openclaw/credentials/xiaoxi" SALES_SHEETS = [ ("f975f0", "吴迪", "A1:Z700"), ("qJF4I", "小龙", "A1:Z1200"), ("qJF4J", "成都", "A1:Z2500"), ] PROCESS_SHEET = "3aOvV6" ORDER_SHEET = "2smjwA" CS_MAP = {"吴迪": "吴迪", "小龙": "小龙", "Tom": "Tom", "Bob": "Bob"} GOODS_NAMES = { 57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季", 31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包", 61: "瓦拉英语level1+2", } # 销售转化渠道分类规则(仅用于销售转化场景) # [王虹茗确认,李承龙确认] 2026-06-15 def classify_sales_channel(key_from): """将 key_from 归为四类:端内 / 销转 / 达人 / 直购""" if not key_from: return "直购" kf = key_from.strip() # 端内 if kf in ('app-active-h5-0-0', 'app-sales-bj-qhm-0', 'app-sales-bj-wd-0'): return "端内" # 销转 if kf.startswith('sales-adp-'): return "销转" # 达人:newmedia-daren-* + 万物(newmedia-dianpu-wwxx-0-0) if kf.startswith('newmedia-daren-') or kf == 'newmedia-dianpu-wwxx-0-0': return "达人" # 直购:newmedia-dianpu-*(不含 wwxx)+ 其余所有杂项 return "直购" LOG_FILE = "/var/log/xiaoxi_full_refresh.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_pg_password(): with open(os.path.join(SCRIPTS_DIR, "..", "secrets.env")) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id, range_str=None): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" if range_str: url += f"!{range_str}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" ❌ {range_str}: {r.get('code')} {r.get('msg')}") return False return True def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i+chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results # ── Step 1: 解析销售表 ── def parse_sales_sheets(token): """返回 {sheet_id: [(row_idx, sales_name, nickname, date, phone, user_id), ...]}""" all_data = {} for sid, sname, rng in SALES_SHEETS: rows = read_sheet(token, sid, rng) entries = [] for idx, row in enumerate(rows[2:], start=3): # row 1=header, row 2=sub-header if not row: continue sr = str(row[0]).strip() if row[0] else "" sales = None for k, v in CS_MAP.items(): if k in sr: sales = v; break if not sales: continue nickname = str(row[1]).strip() if len(row) > 1 and row[1] else "" date_str = str(row[2]).strip() if len(row) > 2 and row[2] else "" phone = "" if len(row) > 4 and row[4]: try: phone = str(int(float(row[4]))) except: pass uid = "" if len(row) > 7 and row[7]: try: uid = str(int(float(row[7]))) except: pass entries.append((idx, sales, nickname, date_str, phone, uid)) all_data[sid] = entries log(f" {sname}: {len(entries)} rows, {sum(1 for e in entries if e[5] and e[5].isdigit() and int(e[5])>0)} with uid") return all_data # ── Step 2: 数据库批量查询 ── def query_all_db(conn, all_entries): """查询所有需要的数据,返回 uid -> info dict""" # 收集所有有效 user_id uid_set = set() uid_to_sales = {} uid_to_nickname = {} uid_to_date = {} for sid, entries in all_entries.items(): for idx, sales, nick, date_str, phone, uid in entries: if uid and uid.isdigit() and int(uid) > 0: aid = int(uid) uid_set.add(aid) uid_to_sales[aid] = sales uid_to_nickname[aid] = nick uid_to_date[aid] = date_str uid_list = list(uid_set) log(f" 有效 user_id: {len(uid_list)}") cur = conn.cursor() info = {uid: { "sales": uid_to_sales.get(uid, ""), "nickname": uid_to_nickname.get(uid, ""), "date": uid_to_date.get(uid, ""), "reg_date": "", "download_channel": "", "has_order": "否", "order_date": "", "order_channel": "", "product": "", "gmv": 0, "refund": 0, "gsv": 0, "activation": "", "lesson_progress": "", "lesson_time": "", "lesson_minutes": 0, "max_lesson": 0, "is_paid": False, } for uid in uid_set} # 2a. 注册信息 log(" 查询注册信息...") reg_info = batch_in(cur, "SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN (%s) AND status=1 AND deleted_at IS NULL", uid_list ) for aid, created_at, dc in reg_info: if aid in info: info[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else "" info[aid]["download_channel"] = dc or "" # 2b. 订单信息 log(" 查询订单信息...") orders = batch_in(cur, "SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) ORDER BY pay_success_date DESC", uid_list ) # 按用户聚合 user_orders = defaultdict(list) for o in orders: user_orders[o[0]].append(o) # 退款 trade_nos = [o[1] for o in orders if o[1]] refund_map = {} if trade_nos: refunds = batch_in(cur, "SELECT trade_no, refund_amount_int FROM bi_refund_order WHERE trade_no IN (%s) AND status=3", trade_nos ) for tn, amt in refunds: refund_map[tn] = amt for aid, olist in user_orders.items(): if aid not in info: continue info[aid]["has_order"] = "是" # 最新订单 latest = olist[0] info[aid]["order_date"] = latest[2].strftime("%Y-%m-%d") if latest[2] else "" info[aid]["order_channel"] = latest[3] or "" info[aid]["product"] = GOODS_NAMES.get(latest[4], f"商品{latest[4]}") # 汇总 total_gmv = sum(o[5] for o in olist) / 100.0 total_refund = sum(refund_map.get(o[1], 0) for o in olist) / 100.0 info[aid]["gmv"] = total_gmv info[aid]["refund"] = total_refund info[aid]["gsv"] = total_gmv - total_refund info[aid]["is_paid"] = True # 2c. 激活课程 log(" 查询激活课程...") try: activations = batch_in(cur, "SELECT t.account_id, t.season_package_level FROM bi_vala_seasonal_ticket t WHERE t.account_id IN (%s) AND t.status=1 AND t.deleted_at IS NULL AND t.season_package_level IN ('A1','A2')", uid_list ) for aid, lvl in activations: if aid in info: info[aid]["activation"] = lvl except Exception as e: log(f" 激活查询异常: {e}") # 2d. 角色信息 log(" 查询角色信息...") char_info = batch_in(cur, "SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL", uid_list ) account_chars = defaultdict(list) char_to_account = {} for aid, cid in char_info: account_chars[aid].append(cid) char_to_account[cid] = aid char_ids = list(char_to_account.keys()) log(f" 角色数: {len(char_ids)}") # 2e. 课程映射 cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") chapter_map = {} for ch_id, cl, cs, cu, cl2 in cur.fetchall(): chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") # 2f. 课时完成记录 log(" 查询课时完成记录...") char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "max_lesson_idx": 0, "total_ms": 0}) for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", (char_ids,) ) for uid, ch_id, created_at in cur.fetchall(): ch_data = chapter_map.get(ch_id) if not ch_data: continue rec = char_plays[uid] if rec["latest_time"] is None or created_at > rec["latest_time"]: rec["latest_time"] = created_at rec["latest_chapter"] = (ch_id, ch_data) # 计算 lesson index 用于 max_lesson cl, cs, cu, cl2 = ch_data try: u_num = int(cu[1:]) if cu and len(cu) >= 2 else 0 l_num = int(cl2[1:]) if cl2 and len(cl2) >= 2 else 0 lesson_idx = u_num * 5 + l_num if lesson_idx > rec["max_lesson_idx"]: rec["max_lesson_idx"] = lesson_idx except: pass except Exception as e: log(f" 警告 {table}: {e}") # 2g. 学习总耗时 log(" 查询学习耗时...") for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", (char_ids,) ) for uid, total_ms in cur.fetchall(): if uid in char_plays: char_plays[uid]["total_ms"] += (total_ms or 0) except Exception as e: log(f" 警告 {table}: {e}") cur.close() # 汇总到 account 级别 for aid in uid_set: chars = account_chars.get(aid, []) best_time = None best_ch = None max_lesson = 0 total_ms = 0 for cid in chars: play = char_plays.get(cid) if not play: continue if play["latest_chapter"]: if best_time is None or play["latest_time"] > best_time: best_time = play["latest_time"] best_ch = play["latest_chapter"] if play["max_lesson_idx"] > max_lesson: max_lesson = play["max_lesson_idx"] total_ms += play["total_ms"] info[aid]["max_lesson"] = max_lesson info[aid]["lesson_minutes"] = round(total_ms / 60000, 1) if info[aid]["lesson_minutes"] == int(info[aid]["lesson_minutes"]): info[aid]["lesson_minutes"] = int(info[aid]["lesson_minutes"]) if best_ch: ch_id, (cl, cs, cu, cl2) = best_ch info[aid]["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}" info[aid]["lesson_time"] = best_time.strftime("%Y-%m-%d") if best_time else "" log(f" 数据库查询完成") return info # ── Step 3: 写入销售表 ── def write_sales_sheets(token, all_entries, db_info): """全覆盖写入销售表的自动列""" now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for sid, sname, _ in SALES_SHEETS: entries = all_entries[sid] log(f" 写入 {sname} ({sid})...") # 按行分组(连续行合并写入) groups = [] cur_grp = [] for idx, sales, nick, date_str, phone, uid in entries: if not cur_grp or idx == cur_grp[-1]["row"] + 1: cur_grp.append({"row": idx, "uid": uid}) else: groups.append(cur_grp) cur_grp = [{"row": idx, "uid": uid}] if cur_grp: groups.append(cur_grp) for g in groups: sr, er = g[0]["row"], g[-1]["row"] # 构建每列的值 d_vals = [] # 体验节数 i_vals = [] # 注册日期 j_vals = [] # 下载渠道 k_vals = [] # 是否下单 l_vals = [] # 下单日期 m_vals = [] # 成交渠道 n_vals = [] # 产品 o_vals = [] # GMV p_vals = [] # 退款 q_vals = [] # GSV r_vals = [] # 激活课程 s_vals = [] # 行课进度 t_vals = [] # 最近行课 u_vals = [] # 学习时长 v_vals = [] # 更新时间 for item in g: uid = item["uid"] aid = int(uid) if uid and uid.isdigit() and int(uid) > 0 else 0 if aid > 0 and aid in db_info: di = db_info[aid] # 体验节数:用 max_lesson 换算 trial_count = di["max_lesson"] d_vals.append([trial_count if trial_count > 0 else ""]) i_vals.append([di["reg_date"]]) j_vals.append([di["download_channel"]]) k_vals.append([di["has_order"]]) l_vals.append([di["order_date"]]) m_vals.append([di["order_channel"]]) n_vals.append([di["product"] if di["has_order"] == "是" else ""]) o_vals.append([int(di["gmv"]) if di["gmv"] > 0 else ""]) p_vals.append([int(di["refund"]) if di["refund"] > 0 else ""]) q_vals.append([int(di["gsv"]) if di["gsv"] > 0 else ""]) # 激活课程 act = di["activation"] if act: r_vals.append([f"{act}体验课" if act in ("A1","A2") else act]) else: r_vals.append([""]) s_vals.append([di["lesson_progress"]]) t_vals.append([di["lesson_time"]]) u_vals.append([di["lesson_minutes"]]) else: # 无有效 user_id,留空 for arr in [d_vals, i_vals, j_vals, k_vals, l_vals, m_vals, n_vals, o_vals, p_vals, q_vals, r_vals, s_vals, t_vals, u_vals]: arr.append([""]) v_vals.append([now_str]) # 写入各列 cols = [ ("D", d_vals), ("I", i_vals), ("J", j_vals), ("K", k_vals), ("L", l_vals), ("M", m_vals), ("N", n_vals), ("O", o_vals), ("P", p_vals), ("Q", q_vals), ("R", r_vals), ("S", s_vals), ("T", t_vals), ("U", u_vals), ("V", v_vals), ] for col_letter, vals in cols: put_values(token, sid, f"{col_letter}{sr}:{col_letter}{er}", vals) time.sleep(0.1) log(f" {sname}: {len(entries)} rows done") # ── Step 4: 过程数据 ── def write_process_data(token, all_entries, db_info): """全量重算过程数据""" log(" 写入过程数据...") # 按月份+销售聚合 # 先收集所有 user 的 month+sales user_month_sales = {} for sid, entries in all_entries.items(): for idx, sales, nick, date_str, phone, uid in entries: if not uid or not uid.isdigit() or int(uid) <= 0: continue aid = int(uid) if aid not in db_info: continue # 从 date_str 提取月份 month = "" if date_str: m = date_str.replace("月", "").replace("日", "").strip() # 尝试解析 for prefix in ["3", "4", "5", "6", "7", "8", "9", "10", "11", "12"]: if m.startswith(prefix): month = prefix + "月" break if not month: # 从注册日期推断 reg = db_info[aid].get("reg_date", "") if reg and len(reg) >= 7: m = int(reg[5:7]) month = f"{m}月" if not month: month = "未知" key = (month, sales) if key not in user_month_sales: user_month_sales[key] = [] user_month_sales[key].append(aid) # 计算每个 month+sales 的累计指标 process_rows = {} for (month, sales), uids in user_month_sales.items(): total = len(uids) l1 = sum(1 for uid in uids if db_info[uid]["max_lesson"] >= 1) l2 = sum(1 for uid in uids if db_info[uid]["max_lesson"] >= 2) l3 = sum(1 for uid in uids if db_info[uid]["max_lesson"] >= 3) l4 = sum(1 for uid in uids if db_info[uid]["max_lesson"] >= 4) l5 = sum(1 for uid in uids if db_info[uid]["max_lesson"] >= 5) # 转化:付费 + 达到对应 lesson c1 = sum(1 for uid in uids if db_info[uid]["is_paid"] and db_info[uid]["max_lesson"] >= 1) c2 = sum(1 for uid in uids if db_info[uid]["is_paid"] and db_info[uid]["max_lesson"] >= 2) c3 = sum(1 for uid in uids if db_info[uid]["is_paid"] and db_info[uid]["max_lesson"] >= 3) c4 = sum(1 for uid in uids if db_info[uid]["is_paid"] and db_info[uid]["max_lesson"] >= 4) c5 = sum(1 for uid in uids if db_info[uid]["is_paid"] and db_info[uid]["max_lesson"] >= 5) process_rows[(month, sales)] = { "total": total, "l1": l1, "l2": l2, "l3": l3, "l4": l4, "l5": l5, "c1": c1, "c2": c2, "c3": c3, "c4": c4, "c5": c5, } # 读取现有过程数据找到行映射 rows = read_sheet(token, PROCESS_SHEET, "A1:AA25") row_map = {} # (month, sales) -> row number for i, row in enumerate(rows[1:], start=2): if not row: continue m = str(row[0]).strip() if row[0] else "" s = str(row[1]).strip() if row[1] else "" if m and s: row_map[(m, s)] = i # 写入(跳过合计行,合计由用户的 AI 写 SUM 公式) # 按列批量写入,只写数据列(H/J/L/N/P/R/T/V/X/Z),不动公式列(I/K/M/O/Q/S/U/W/Y) sales_order = ["小龙", "吴迪", "Bob", "Tom"] months_order = ["3月", "4月", "5月", "6月"] # 列映射: (列字母, 数据key) col_keys = [ ("H", "l1"), ("J", "c1"), ("L", "l2"), ("N", "c2"), ("P", "l3"), ("R", "c3"), ("T", "l4"), ("V", "c4"), ("X", "l5"), ("Z", "c5"), ] for col, key in col_keys: col_vals = [] for month in months_order: for sales in sales_order: mk = (month, sales) if mk not in row_map: continue r = row_map[mk] data = process_rows.get(mk, {}) val = data.get(key, 0) col_vals.append((r, val)) if not col_vals: continue # 按连续行分组 col_vals.sort() groups = [] cur_start = col_vals[0][0] cur_vals = [col_vals[0][1]] for i in range(1, len(col_vals)): if col_vals[i][0] == cur_start + len(cur_vals): cur_vals.append(col_vals[i][1]) else: groups.append((cur_start, cur_vals)) cur_start = col_vals[i][0] cur_vals = [col_vals[i][1]] groups.append((cur_start, cur_vals)) for sr, vals in groups: er = sr + len(vals) - 1 put_values(token, PROCESS_SHEET, f"{col}{sr}:{col}{er}", [[v] for v in vals]) time.sleep(0.15) log(f" 过程数据写入完成") # ── Step 5: 订单汇总 ── def write_order_summary(token, all_entries, db_info): """从销售三表筛选 Y=1 的行,全量替换订单汇总 A-X(r3+),按 K 下单日降序""" log(" 写入订单汇总(全量替换)...") # 从销售三表读取已更新的数据,筛选 Y=1 order_rows = [] for sid, sname, rng in SALES_SHEETS: rows = read_sheet(token, sid, rng) sheet_count = 0 for idx, row in enumerate(rows[2:], start=3): if not row: continue # Y 列 (index 24) = 1 y_val = "" if len(row) > 24 and row[24] not in (None, ""): y_val = str(row[24]).strip() if y_val not in ("1", 1): continue # 复制 A-X (indices 0-23) row_data = [] for ci in range(24): if ci < len(row): row_data.append(row[ci]) else: row_data.append("") order_rows.append(row_data) sheet_count += 1 log(f" {sname}: {sheet_count} 条") # 按 K 列 (index 10, 下单日期) 降序 order_rows.sort(key=lambda r: str(r[10]) if len(r) > 10 and r[10] else "", reverse=True) total = len(order_rows) log(f" 共 {total} 条订单,写入订单汇总 r3+") # 全量写入 A-X 从 row 3 开始 for batch_start in range(0, total, 20): batch = order_rows[batch_start:batch_start+20] sr = 3 + batch_start er = sr + len(batch) - 1 put_values(token, ORDER_SHEET, f"A{sr}:X{er}", batch) time.sleep(0.5) # 清除多余旧行 try: existing = read_sheet(token, ORDER_SHEET, "A3:A2000") old_count = len([r for r in existing if r and any(c for c in r if c)]) if old_count > total: clear_start = 3 + total clear_end = 3 + old_count - 1 empty_rows = [[""] * 24 for _ in range(clear_end - clear_start + 1)] put_values(token, ORDER_SHEET, f"A{clear_start}:X{clear_end}", empty_rows) log(f" 清除多余行 A{clear_start}:X{clear_end}") except Exception as e: log(f" 清除多余行跳过: {e}") log(f" 订单汇总写入完成") # ── Main ── def main(): log("=" * 50) log("全覆盖刷新 启动") try: token = get_fs_token() conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=get_pg_password(), dbname=PG_DB, connect_timeout=30 ) # Step 1: 解析销售表 log("Step 1: 解析销售表") all_entries = parse_sales_sheets(token) # Step 2: 数据库查询 log("Step 2: 数据库查询") db_info = query_all_db(conn, all_entries) conn.close() # Step 3: 写入销售表 log("Step 3: 写入销售表") write_sales_sheets(token, all_entries, db_info) # Step 4: 过程数据 log("Step 4: 过程数据") write_process_data(token, all_entries, db_info) # Step 5: 订单汇总 log("Step 5: 订单汇总") write_order_summary(token, all_entries, db_info) log("✅ 全覆盖刷新完成") return 0 except Exception as e: log(f"❌ ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())