#!/usr/bin/env python3 """ Bot 销转看板 Step2 刷新 — XXTEA 精确匹配版 (v2) E列11位明文手机号 → XXTEA加密 → bi_vala_app_account.tel_encrypt精确匹配 → H列UID S2 规则: ① E→H: phone_encrypt.py XXTEA 精确匹配, 查不到留空 ② H→D/I/J: 只补空, 不覆盖已有值 ③ K=是: 仅当 L(下单日) >= C(线索日期) ④ 全额退清: 所有订单都退费 → K/O/P/Q 全部清空 ⑤ O/P/Q 0留空, P整元 ⑥ G列不动, 订单汇总不动 覆盖列: D/H/I/J + K-V + S/U """ import json, re, time, sys, os, requests, psycopg2 from datetime import datetime from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE = os.path.dirname(SCRIPTS_DIR) CRED_DIR = "/root/.openclaw/credentials/xiaoxi" # XXTEA 加密 sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SALES_SHEETS = [ ("qJF4I", "小龙", "A1:V1200"), ("f975f0", "吴迪", "A1:V700"), ("qJF4J", "成都", "A1:V2500"), ] CS_MAP = {"吴迪": "吴迪", "小龙": "小龙", "Tom": "Tom", "Bob": "Bob"} GOODS_NAMES = { 57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季", 31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包", 61: "瓦拉英语level1+2", } CHANNEL_MAP = { "Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思", "华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝", "希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度", "oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方", "步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族", "官网": "官网", } LOG_FILE = "/var/log/xiaoxi_step2_refresh.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_secret(key): with open(os.path.join(WORKSPACE, "secrets.env")) as f: for line in f: if line.startswith(f"{key}="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id, range_str=None): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" if range_str: url += f"!{range_str}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败 {sheet_id}: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" ❌ {range_str}: {r.get('code')} {r.get('msg')}") return False return True def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i+chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results # ── Step 1: 解析销售三表 ── def parse_sales_sheets(token): """返回 {sheet_id: [(row_idx, sales_name, nickname, date_str, phone, existing_uid, g_val, existing_d, existing_i, existing_j), ...]}""" all_data = {} for sid, sname, rng in SALES_SHEETS: rows = read_sheet(token, sid, rng) entries = [] for idx, row in enumerate(rows[2:], start=3): if not row: continue sr = str(row[0]).strip() if len(row) > 0 and row[0] else "" sales = None for k, v in CS_MAP.items(): if k in sr: sales = v break if not sales: continue nickname = str(row[1]).strip() if len(row) > 1 and row[1] else "" date_str = str(row[2]).strip() if len(row) > 2 and row[2] else "" phone = "" if len(row) > 4 and row[4]: try: phone = str(int(float(row[4]))) except: pass uid = "" if len(row) > 7 and row[7]: try: uid = str(int(float(row[7]))) except: pass g_val = str(row[6]).strip() if len(row) > 6 and row[6] else "" # 读取已有 D/I/J 值 (用于只补空判断) d_val = str(row[3]).strip() if len(row) > 3 and row[3] else "" i_val = str(row[8]).strip() if len(row) > 8 and row[8] else "" j_val = str(row[9]).strip() if len(row) > 9 and row[9] else "" entries.append((idx, sales, nickname, date_str, phone, uid, g_val, d_val, i_val, j_val)) all_data[sid] = entries log(f" {sname}: {len(entries)} rows, {sum(1 for e in entries if e[5] and e[5].isdigit() and int(e[5])>0)} with uid") return all_data # ── Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配 ── def phone_to_uid_xxtea(all_entries): """E列11位明文手机号 → XXTEA加密 → bi_vala_app_account.tel_encrypt精确匹配 → UID""" # 收集所有 11 位手机号 phone_rows = [] for sid, entries in all_entries.items(): for idx, sales, nick, date_str, phone, uid, g_val, d_val, i_val, j_val in entries: if re.match(r'^\d{11}$', phone): phone_rows.append((sid, idx, phone)) if not phone_rows: return {} log(f" XXTEA 加密匹配: {len(phone_rows)} 个手机号") # 加密所有手机号 phone_enc_map = {} # {encrypted: phone} for _, _, phone in phone_rows: try: enc = encrypt_phone(phone) phone_enc_map[enc] = phone except Exception as e: log(f" 加密失败 {phone}: {e}") log(f" 加密完成, 唯一密文: {len(phone_enc_map)}") # PG 精确查询 conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() enc_list = list(phone_enc_map.keys()) phone_to_uid = {} for i in range(0, len(enc_list), 500): chunk = enc_list[i:i+500] ph = ",".join(["%s"] * len(chunk)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", chunk ) for uid, tel_enc in cur.fetchall(): plain = phone_enc_map.get(tel_enc) if plain: phone_to_uid[plain] = str(uid) time.sleep(0.05) cur.close() conn.close() log(f" 精确匹配到 {len(phone_to_uid)} 个 UID (via XXTEA)") return phone_to_uid # ── Step 3: PostgreSQL 批量查询 ── def query_all_pg(all_entries, phone_map): """查询所有需要的数据""" uid_set = set() for sid, entries in all_entries.items(): for idx, sales, nick, date_str, phone, uid, g_val, d_val, i_val, j_val in entries: if re.match(r'^\d{11}$', phone) and phone in phone_map: uid_set.add(int(phone_map[phone])) if uid and uid.isdigit() and int(uid) > 0: uid_set.add(int(uid)) uid_list = list(uid_set) log(f" 有效 user_id: {len(uid_list)}") conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() info = {uid: { "reg_date": "", "download_channel": "", "trial_count": 0, "has_order": False, "order_date": "", "order_channel": "", "product": "", "gmv": 0, "refund": 0, "gsv": 0, "activation": "", "lesson_progress": "", "lesson_time": "", "lesson_minutes": 0, "max_lesson": 0, } for uid in uid_set} # 3a. 注册信息 log(" 查询注册信息...") reg_info = batch_in(cur, "SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN (%s) AND status=1 AND deleted_at IS NULL", uid_list ) for aid, created_at, dc in reg_info: if aid in info: info[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else "" raw_ch = dc or "" info[aid]["download_channel"] = CHANNEL_MAP.get(raw_ch, raw_ch) # 3b. 体验节数 log(" 查询体验节数...") trial_info = batch_in(cur, "SELECT account_id, COUNT(*) FROM bi_user_course_detail WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL GROUP BY account_id", uid_list ) for aid, cnt in trial_info: if aid in info: info[aid]["trial_count"] = cnt # 3c. 订单信息 log(" 查询订单信息...") orders = batch_in(cur, "SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) ORDER BY pay_success_date DESC", uid_list ) user_orders = defaultdict(list) for o in orders: user_orders[o[0]].append(o) trade_nos = [o[1] for o in orders if o[1]] refund_map = {} if trade_nos: refunds = batch_in(cur, "SELECT trade_no, refund_amount_int FROM bi_refund_order WHERE trade_no IN (%s) AND status=3", trade_nos ) for tn, amt in refunds: refund_map[tn] = amt for aid, olist in user_orders.items(): if aid not in info: continue info[aid]["has_order"] = True latest = olist[0] info[aid]["order_date"] = latest[2].strftime("%Y-%m-%d") if latest[2] else "" info[aid]["order_channel"] = latest[3] or "" info[aid]["product"] = GOODS_NAMES.get(latest[4], f"商品{latest[4]}") total_gmv = sum(o[5] for o in olist) / 100.0 total_refund = sum(refund_map.get(o[1], 0) for o in olist) / 100.0 info[aid]["gmv"] = total_gmv info[aid]["refund"] = total_refund info[aid]["gsv"] = total_gmv - total_refund # 3d. 激活课程 log(" 查询激活课程...") try: activations = batch_in(cur, "SELECT t.account_id, t.season_package_level FROM bi_vala_seasonal_ticket t WHERE t.account_id IN (%s) AND t.status=1 AND t.deleted_at IS NULL AND t.season_package_level IN ('A1','A2')", uid_list ) for aid, lvl in activations: if aid in info: info[aid]["activation"] = lvl except Exception as e: log(f" 激活查询异常: {e}") # 3e. 角色信息 log(" 查询角色信息...") char_info = batch_in(cur, "SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL", uid_list ) account_chars = defaultdict(list) char_to_account = {} for aid, cid in char_info: account_chars[aid].append(cid) char_to_account[cid] = aid char_ids = list(char_to_account.keys()) log(f" 角色数: {len(char_ids)}") # 3f. 课程映射 cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") chapter_map = {} for ch_id, cl, cs, cu, cl2 in cur.fetchall(): chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") # 3g. 课时完成记录 log(" 查询课时完成记录...") char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "max_lesson_idx": 0, "total_ms": 0}) for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", (char_ids,) ) for uid, ch_id, created_at in cur.fetchall(): ch_data = chapter_map.get(ch_id) if not ch_data: continue rec = char_plays[uid] if rec["latest_time"] is None or created_at > rec["latest_time"]: rec["latest_time"] = created_at rec["latest_chapter"] = (ch_id, ch_data) cl, cs, cu, cl2 = ch_data try: u_num = int(cu[1:]) if cu and len(cu) >= 2 else 0 l_num = int(cl2[1:]) if cl2 and len(cl2) >= 2 else 0 lesson_idx = u_num * 5 + l_num if lesson_idx > rec["max_lesson_idx"]: rec["max_lesson_idx"] = lesson_idx except: pass except Exception as e: log(f" 警告 {table}: {e}") # 3h. 学习总耗时 log(" 查询学习耗时...") for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", (char_ids,) ) for uid, total_ms in cur.fetchall(): if uid in char_plays: char_plays[uid]["total_ms"] += (total_ms or 0) except Exception as e: log(f" 警告 {table}: {e}") cur.close() conn.close() # 汇总到 account 级别 for aid in uid_set: chars = account_chars.get(aid, []) best_time = None best_ch = None max_lesson = 0 total_ms = 0 for cid in chars: play = char_plays.get(cid) if not play: continue if play["latest_chapter"]: if best_time is None or play["latest_time"] > best_time: best_time = play["latest_time"] best_ch = play["latest_chapter"] if play["max_lesson_idx"] > max_lesson: max_lesson = play["max_lesson_idx"] total_ms += play["total_ms"] info[aid]["max_lesson"] = max_lesson info[aid]["lesson_minutes"] = round(total_ms / 60000, 1) if info[aid]["lesson_minutes"] == int(info[aid]["lesson_minutes"]): info[aid]["lesson_minutes"] = int(info[aid]["lesson_minutes"]) if best_ch: ch_id, (cl, cs, cu, cl2) = best_ch info[aid]["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}" info[aid]["lesson_time"] = best_time.strftime("%Y-%m-%d") if best_time else "" log(f" 数据库查询完成") return info # ── Step 4: 写入销售三表 ── def write_sales_sheets(token, all_entries, phone_map, db_info): """全覆盖写入销售表的自动列""" now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for sid, sname, _ in SALES_SHEETS: entries = all_entries[sid] log(f" 写入 {sname} ({sid})...") groups = [] cur_grp = [] for idx, sales, nick, date_str, phone, uid, g_val, d_val, i_val, j_val in entries: item = {"row": idx, "phone": phone, "uid": uid, "g_val": g_val, "date": date_str, "d_val": d_val, "i_val": i_val, "j_val": j_val} if not cur_grp or idx == cur_grp[-1]["row"] + 1: cur_grp.append(item) else: groups.append(cur_grp) cur_grp = [item] if cur_grp: groups.append(cur_grp) for g in groups: sr, er = g[0]["row"], g[-1]["row"] d_vals, h_vals, i_vals, j_vals = [], [], [], [] k_vals, l_vals, m_vals, n_vals = [], [], [], [] o_vals, p_vals, q_vals, r_vals = [], [], [], [] s_vals, t_vals, u_vals, v_vals = [], [], [], [] for item in g: phone = item["phone"] existing_uid = item["uid"] existing_d = item.get("d_val", "") existing_i = item.get("i_val", "") existing_j = item.get("j_val", "") clue_date = item.get("date", "") # 确定 UID: XXTEA 精确匹配优先 aid = 0 uid_str = "" if re.match(r'^\d{11}$', phone) and phone in phone_map: uid_str = phone_map[phone] aid = int(uid_str) elif existing_uid and existing_uid.isdigit() and int(existing_uid) > 0: uid_str = existing_uid aid = int(existing_uid) # H: UID — XXTEA匹配到就写,否则留空 if re.match(r'^\d{11}$', phone) and phone in phone_map: h_vals.append([phone_map[phone]]) elif re.match(r'^\d{11}$', phone): h_vals.append([""]) elif existing_uid and existing_uid.isdigit(): h_vals.append([existing_uid]) else: h_vals.append([""]) if aid > 0 and aid in db_info: di = db_info[aid] # D: 体验节数 — 只补空 if existing_d: d_vals.append([existing_d]) else: tc = di["trial_count"] d_vals.append([tc if tc > 0 else ""]) # I: 注册日 — 只补空 if existing_i: i_vals.append([existing_i]) else: i_vals.append([di["reg_date"]]) # J: 下载渠道 — 只补空 if existing_j: j_vals.append([existing_j]) else: j_vals.append([di["download_channel"]]) # 判断全额退清: gmv == refund 且 gmv > 0 gmv_int = int(di["gmv"]) refund_int = int(di["refund"]) gsv_int = int(di["gsv"]) is_full_refund = (gmv_int > 0 and gmv_int == refund_int) # K=是: 有订单且非全额退清 # 注: C列(进线日期)实际存的是手机号, 无法做L≥C日期比较 order_date = di["order_date"] should_k_yes = di["has_order"] and not is_full_refund if is_full_refund: # 全额退清 → K/O/P/Q 全部清空 k_vals.append([""]) o_vals.append([""]) p_vals.append([""]) q_vals.append([""]) else: k_vals.append(["是" if should_k_yes else ""]) o_vals.append([gmv_int if gmv_int > 0 else ""]) p_vals.append([refund_int if refund_int > 0 else ""]) q_vals.append([gsv_int if gsv_int > 0 else ""]) l_vals.append([order_date]) m_vals.append([di["order_channel"]]) n_vals.append([di["product"] if di["has_order"] else ""]) act = di["activation"] if act: r_vals.append([f"{act}体验课" if act in ("A1", "A2") else act]) else: r_vals.append([""]) lp = di["lesson_progress"] s_vals.append([lp if lp else ""]) t_vals.append([di["lesson_time"]]) lm = di["lesson_minutes"] u_vals.append([lm if lm > 0 else ""]) else: for arr in [d_vals, i_vals, j_vals, k_vals, l_vals, m_vals, n_vals, o_vals, p_vals, q_vals, r_vals, s_vals, t_vals, u_vals]: arr.append([""]) v_vals.append([now_str]) cols = [ ("D", d_vals), ("H", h_vals), ("I", i_vals), ("J", j_vals), ("K", k_vals), ("L", l_vals), ("M", m_vals), ("N", n_vals), ("O", o_vals), ("P", p_vals), ("Q", q_vals), ("R", r_vals), ("S", s_vals), ("T", t_vals), ("U", u_vals), ("V", v_vals), ] for col_letter, vals in cols: put_values(token, sid, f"{col_letter}{sr}:{col_letter}{er}", vals) time.sleep(0.1) log(f" {sname}: {len(entries)} rows done") # ── Main ── def main(): log("=" * 50) log("Bot 销转看板 Step2 刷新 (XXTEA精确匹配版) 启动") try: token = get_fs_token() log("Step 1: 解析销售三表") all_entries = parse_sales_sheets(token) log("Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配") phone_map = phone_to_uid_xxtea(all_entries) log("Step 3: PostgreSQL 批量查询") db_info = query_all_pg(all_entries, phone_map) log("Step 4: 写入销售三表") write_sales_sheets(token, all_entries, phone_map, db_info) log("✅ Step2 刷新完成 (XXTEA)") return 0 except Exception as e: log(f"❌ ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())