ai_member_xiaoxi/scripts/refresh_may_course_data.py
2026-05-24 08:00:01 +08:00

398 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
刷新 5 月行课记录:查询学情数据并回填 Sheet1 D 列
"""
import json
import subprocess
import sys
import os
from datetime import datetime
# ========== 1. 读取目标行 ==========
target_rows = json.load(open('/tmp/target_rows.json'))
print(f"[1/4] 读取目标行: {len(target_rows)}")
# 提取 account_id 列表
uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int)
print(f" 去重用户数: {len(uid_set)}")
uid_csv = ','.join(uid_set)
# ========== 2. 数据库查询 ==========
print(f"[2/4] 查询学情数据...")
# 从 secrets.env 获取密码
secrets = {}
with open('/root/.openclaw/workspace/secrets.env') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
secrets[k] = v.strip('"').strip("'")
pg_pass = secrets.get('PG_ONLINE_PASSWORD', '')
pg_host = 'bj-postgres-16pob4sg.sql.tencentcdb.com'
pg_port = '28591'
# 2a. 课程信息 + 角色ID
print(" 查询课程信息...")
course_sql = f"""
SELECT
a.id AS account_id,
c.id AS user_id,
d.course_level,
CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type
FROM bi_vala_app_account a
JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL
LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL
WHERE a.id IN ({uid_csv})
AND a.status = 1
AND a.deleted_at IS NULL
ORDER BY a.id, c.id, d.course_level
"""
result = subprocess.run(
['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi',
'-t', '-A', '-F', '\t'],
input=course_sql,
capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': pg_pass}
)
# 构建 account_id -> {user_id列表, 课程信息} 映射
user_courses = {} # account_id -> [{'user_id': ..., 'level': ..., 'type': ...}, ...]
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 4:
acc_id, user_id, level, ctype = parts[0], parts[1], parts[2], parts[3]
if acc_id not in user_courses:
user_courses[acc_id] = []
user_courses[acc_id].append({
'user_id': user_id,
'level': level,
'type': ctype
})
print(f" 课程信息: {len(user_courses)} 个账号有角色")
# 2b. 最近行课记录 (分表查询)
print(" 查询最近行课记录...")
# 收集所有 user_id
all_user_ids = set()
for acc_id, chars in user_courses.items():
for c in chars:
all_user_ids.add(c['user_id'])
# 分表查询
play_records = {} # user_id -> {'recent_date': ..., 'chapter_id': ..., 'chapter_unique_id': ...}
tables = [f'bi_user_chapter_play_record_{i}' for i in range(8)]
for table in tables:
uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])]
if not uid_subset:
continue
uid_csv_sub = ','.join(uid_subset)
sql = f"""
SELECT DISTINCT ON (user_id)
user_id,
created_at::date AS recent_date,
chapter_id,
chapter_unique_id
FROM {table}
WHERE user_id IN ({uid_csv_sub})
AND play_status = 1
ORDER BY user_id, created_at DESC
"""
result = subprocess.run(
['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi',
'-t', '-A', '-F', '\t'],
input=sql,
capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': pg_pass}
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 4:
play_records[parts[0]] = {
'recent_date': parts[1],
'chapter_id': parts[2],
'chapter_unique_id': parts[3]
}
print(f" 行课记录: {len(play_records)} 个角色有行课")
# 2c. 课程结构映射 (chapter_id -> 课程名称)
print(" 查询课程结构...")
chapter_ids = set(r['chapter_id'] for r in play_records.values())
chapter_map = {}
if chapter_ids:
ch_csv = ','.join(chapter_ids)
ch_sql = f"""
SELECT id, course_level, course_season, course_unit, course_lesson
FROM bi_level_unit_lesson
WHERE id IN ({ch_csv})
"""
result = subprocess.run(
['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi',
'-t', '-A', '-F', '\t'],
input=ch_sql,
capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': pg_pass}
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 5:
ch_id = parts[0]
chapter_map[ch_id] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}"
# 2d. 学习时长
print(" 查询学习时长...")
study_times = {} # user_id -> total_minutes
comp_tables = [f'bi_user_component_play_record_{i}' for i in range(8)]
for table in comp_tables:
uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])]
if not uid_subset:
continue
uid_csv_sub = ','.join(uid_subset)
sql = f"""
SELECT user_id, COALESCE(SUM(interval_time), 0) / 60000.0 AS total_min
FROM {table}
WHERE user_id IN ({uid_csv_sub})
GROUP BY user_id
"""
result = subprocess.run(
['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi',
'-t', '-A', '-F', '\t'],
input=sql,
capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': pg_pass}
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 2:
study_times[parts[0]] = float(parts[1])
# 2e. 付费状态
print(" 查询付费状态...")
pay_sql = f"""
SELECT account_id,
CASE WHEN COUNT(*) > 0 THEN '已付费' ELSE '未付费' END AS pay_status
FROM bi_vala_order
WHERE account_id IN ({uid_csv})
AND pay_success_date IS NOT NULL
AND order_status = 3
GROUP BY account_id
"""
result = subprocess.run(
['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi',
'-t', '-A', '-F', '\t'],
input=pay_sql,
capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': pg_pass}
)
paid_users = set()
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 2 and parts[1] == '已付费':
paid_users.add(parts[0])
# 查询所有用户(包括未付费的)
all_paid = set()
for acc_id in uid_set:
if acc_id in paid_users:
all_paid.add(acc_id)
print(f" 付费用户: {len(paid_users)}")
# ========== 3. 组装学情数据 ==========
print(f"[3/4] 组装学情数据...")
def get_course_info(acc_id):
"""获取用户最优先的课程信息"""
chars = user_courses.get(acc_id, [])
if not chars:
return None, None, None
# 选最近有过行课记录的角色
best_char = None
best_date = None
for c in chars:
uid = c['user_id']
if uid in play_records:
d = play_records[uid]['recent_date']
if best_date is None or d > best_date:
best_date = d
best_char = c
if best_char is None and chars:
best_char = chars[0] # 回退到第一个角色
if best_char is None:
return None, None, None
uid = best_char['user_id']
level = best_char['level'] or '?'
ctype = best_char['type'] or '体验课'
# 获取当前进度
pr = play_records.get(uid)
if pr and pr['chapter_id'] in chapter_map:
chapter_name = chapter_map[pr['chapter_id']]
current = f"{level}{ctype}-{chapter_name}"
else:
current = f"{level}{ctype}-?"
recent_date = pr['recent_date'] if pr else '无记录'
study_min = study_times.get(uid, 0)
return current, recent_date, int(study_min)
# 为每个目标行生成 D 列文本
now_str = datetime.now().strftime('%Y-%m-%d %H:%M')
updates = [] # [(row_num, d_text)]
for row_num, acc_id, phone in target_rows:
acc_id = str(acc_id)
current, recent_date, study_min = get_course_info(acc_id)
pay_status = '已付费' if acc_id in paid_users else '未付费'
# 从原始表取销售名通过B列用户ID
# 销售名需要从原始表A列获取但这里我们直接用target_rows中没有销售名
# 实际上我们需要重新读取A列。为了效率暂时用"用户"占位
# 让我从之前的扫描中保留销售名...
# 实际上 target_rows 目前只有 (row_num, acc_id, phone),没有 sales_name
# 需要补读销售名列
# 先临时处理,后续补全
if current is None:
d_text = f"用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status}"
else:
d_text = f"用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}"
updates.append((row_num, d_text, pay_status, recent_date, current))
print(f" 生成 {len(updates)} 条更新")
# ========== 需要补读A列销售名==========
# 从 target_rows 中批量读取A列
print(" 补读销售名列...")
import urllib.request
# 获取 TAT
config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json'))
app = config['apps'][0]
tat_data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode()
tat_req = urllib.request.Request(
'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
data=tat_data, headers={'Content-Type': 'application/json; charset=utf-8'})
tat = json.loads(urllib.request.urlopen(tat_req).read())['tenant_access_token']
# 读取所有目标行的 A 列
TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac'
SHEET = '55b0eb'
# 按行号排序
target_rows_sorted = sorted(updates, key=lambda x: x[0])
sales_map = {} # row_num -> sales_name
# 分批读取A列
batch_size = 200
for i in range(0, len(target_rows_sorted), batch_size):
batch = target_rows_sorted[i:i+batch_size]
row_nums = [str(r[0]) for r in batch]
range_str = f"{SHEET}!A{min(row_nums, key=int)}:A{max(row_nums, key=int)}"
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{range_str}"
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {tat}')
try:
resp = json.loads(urllib.request.urlopen(req).read())
values = resp.get('data', {}).get('valueRange', {}).get('values', [])
start_row = int(min(row_nums, key=int))
for j, v in enumerate(values):
row_num = start_row + j
if v:
sales_map[row_num] = v[0]
except Exception as e:
print(f" Error reading A col: {e}")
print(f" 销售名: {len(sales_map)}")
# 重新组装 D 列文本(加入销售名)
final_updates = []
for row_num, d_text, pay_status, recent_date, current in target_rows_sorted:
sales = sales_map.get(row_num, '?')
# 从原始 target_rows 中找到这个 row_num 对应的 acc_id
orig = next((r for r in target_rows if r[0] == row_num), None)
acc_id = str(orig[1]) if orig else '?'
if current and current != 'None':
d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}"
else:
# 需要重新计算
current2, recent_date2, study_min2 = get_course_info(acc_id)
pay_status2 = '已付费' if acc_id in paid_users else '未付费'
if current2:
d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current2} | 最近行课:{recent_date2} | 学习{study_min2}min | {pay_status2}"
else:
d_text = f"销售:{sales} | 用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status2}"
final_updates.append((row_num, d_text))
updates = final_updates
# ========== 4. 批量回填 ==========
print(f"[4/4] 批量回填 {len(updates)} 条...")
# 构建批量更新请求
# 飞书 API 支持范围更新,但 D 列不连续(有大量跳行)
# 策略逐行更新每50行一批
def update_cell(tat, token, sheet, row, col, value):
"""更新单个单元格"""
range_str = f"{sheet}!{col}{row}:{col}{row}"
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values"
body = json.dumps({
"valueRange": {
"range": range_str,
"values": [[value]]
}
}).encode()
req = urllib.request.Request(url, data=body, method='PUT')
req.add_header('Authorization', f'Bearer {tat}')
req.add_header('Content-Type', 'application/json; charset=utf-8')
resp = json.loads(urllib.request.urlopen(req).read())
return resp.get('code') == 0
# 分批处理每50个一批D列和C列一起更新
batch_size = 50
success_d = 0
success_c = 0
for i in range(0, len(updates), batch_size):
batch = updates[i:i+batch_size]
# 收集 D 列和 C 列的更新
# 由于行不连续,需要逐行更新
for row_num, d_text in batch:
# 更新 D 列
if update_cell(tat, TOKEN, SHEET, row_num, 'D', d_text):
success_d += 1
# 更新 C 列为"已返回"
if update_cell(tat, TOKEN, SHEET, row_num, 'C', '已返回'):
success_c += 1
print(f" 进度: {min(i+batch_size, len(updates))}/{len(updates)} (D:{success_d}, C:{success_c})")
print(f"\n✅ 完成D列更新: {success_d}/{len(updates)}C列更新: {success_c}/{len(updates)}")