新增技能: studytime-analysis — 角色学习时间分析(一周分布+跨周趋势+寒暑假排除)

This commit is contained in:
xiaoban 2026-05-24 09:36:41 +08:00
parent e279bc78a6
commit aa6b689d52
3 changed files with 496 additions and 0 deletions

View File

@ -7,3 +7,4 @@ lark_wiki_operate_as_bot 2a37701f568849f03eb46dd938baeda171380fe252b698ac8bda69c
vala_git_workspace_backup 4cf352bec88fe84af065ba1ffcbb06647b77df0e01860faaf0bca9fd64b968ec
cron-schedule b1879fa59d60e3d99cea1138674f7abac84a4aecd32743b801d41bfd6ed7181d
study-analysis 33217dc132073ecd47b921800834f6df89494da9e7708fa90f15b3de7742e37f
studytime-analysis f83392be24766898c5cfc206cc0edaf4f44b81ed0cb353c3f31157154964a7c2

View File

@ -0,0 +1,52 @@
---
name: studytime-analysis
description: 分析指定角色的完课时间分布与学习规律包含一周内时间分布、跨周趋势、寒暑假排除。触发词「学习时间分析」「看看学习时间」「学习时间有什么特点」「studytime」。
slug: studytime-analysis
version: 1.0.0
---
# 学习时间分析技能
## 触发规则
用户通过飞书发送以下格式的消息时触发:
- 「学习时间分析 [角色ID]」
- 「[角色ID] 学习时间分析」
- 「看看 [角色ID] 的学习时间有什么特点」
- 「分析 [角色ID] 的完课时间规律」
角色ID 必须是纯数字。
## 工作流程
1. **参数提取** — 从用户消息中提取角色ID
2. **数据采集** — 执行 Python 脚本查询该角色全部完课记录自动排除寒假1-2月、暑假7-8月
3. **分析输出** — 脚本自动完成以下分析并输出格式化报告:
- 一周分布:周一至周日各天完课数量 + 周一至周五上课时段(上午/中午/晚上)
- 跨周趋势:覆盖总周数、周均完课数、连续性、断档、趋势变化
- 完课记录明细表格
## 调用方式
```bash
cd /root/.openclaw/workspace-xiaoban && \
export PG_DB_HOST=bj-postgres-16pob4sg.sql.tencentcdb.com \
PG_DB_PORT=28591 PG_DB_USER=ai_member \
PG_DB_PASSWORD='...' PG_DB_DATABASE=vala && \
python3 skills/studytime-analysis/scripts/studytime_analysis.py <role_id>
```
## 数据库说明
- **数据源**: PostgreSQL Online (`vala` 库)
- **核心表**: `user_chapter_play_record_0~7`完课记录8张分表
- **筛选**: `play_status = 1`(已完成),`EXTRACT(MONTH FROM updated_at) NOT IN (1,2,7,8)`排除寒假1-2月、暑假7-8月
- **注意**: 表名无 `bi_` 前缀;课程通过 `level` + `chapter_id` 字段展示
## 输出说明
脚本输出为 Markdown 格式的分析报告,可直接发送给用户。包含:
- 基本信息角色ID、有效完课总数、排除的寒暑假记录数
- 一周时间分布表 + 规律总结
- 跨周学习趋势表 + 趋势分析
- 完课记录明细表(日期、时间、星期、时段、课程、课时)

View File

@ -0,0 +1,443 @@
#!/usr/bin/env python3
"""
studytime-analysis 角色学习时间分析工具
用法: python3 studytime_analysis.py <role_id>
输出: Markdown 格式的分析报告
数据源: PostgreSQL Online (vala )
核心表: user_chapter_play_record_0~7
"""
import os
import sys
import psycopg2
import psycopg2.extras
from datetime import datetime, timedelta
from collections import defaultdict, OrderedDict
# ── 配置 ──────────────────────────────────────────────
PG_CONFIG = {
"host": os.environ.get("PG_DB_HOST", "bj-postgres-16pob4sg.sql.tencentcdb.com"),
"port": int(os.environ.get("PG_DB_PORT", "28591")),
"user": os.environ.get("PG_DB_USER", "ai_member"),
"password": os.environ.get("PG_DB_PASSWORD", ""),
"dbname": os.environ.get("PG_DB_DATABASE", "vala"),
}
EXCLUDED_MONTHS = (1, 2, 7, 8) # 寒假1-2月, 暑假7-8月
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
PERIODS = OrderedDict([
("凌晨", (0, 6)),
("上午", (6, 12)),
("中午", (12, 14)),
("下午", (14, 18)),
("晚上", (18, 24)),
])
# ── 数据库查询 ────────────────────────────────────────
def get_connection():
"""连接 PostgreSQL"""
conn = psycopg2.connect(
host=PG_CONFIG["host"],
port=PG_CONFIG["port"],
user=PG_CONFIG["user"],
password=PG_CONFIG["password"],
dbname=PG_CONFIG["dbname"],
)
return conn
def fetch_completion_records(role_id):
"""查询指定角色全部完课记录(排除寒暑假)"""
params = {}
union_parts = []
for i in range(8):
param_name = f"rid_{i}"
params[param_name] = role_id
union_parts.append(f"""
SELECT user_id, chapter_id, chapter_unique_id, level, updated_at
FROM user_chapter_play_record_{i}
WHERE user_id = %({param_name})s
AND play_status = 1
AND EXTRACT(MONTH FROM updated_at) NOT IN (1, 2, 7, 8)
""")
union_sql = " UNION ALL ".join(union_parts)
sql = f"""
SELECT * FROM (
{union_sql}
) t
ORDER BY updated_at ASC
"""
conn = get_connection()
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = cur.fetchall()
finally:
conn.close()
return rows
def count_excluded_records(role_id):
"""统计被寒暑假排除的记录数"""
params = {}
union_parts = []
for i in range(8):
param_name = f"rid_{i}"
params[param_name] = role_id
union_parts.append(f"""
SELECT COUNT(*) as cnt
FROM user_chapter_play_record_{i}
WHERE user_id = %({param_name})s
AND play_status = 1
AND EXTRACT(MONTH FROM updated_at) IN (1, 2, 7, 8)
""")
union_sql = " UNION ALL ".join(union_parts)
sql = f"SELECT SUM(cnt) as total FROM ({union_sql}) t"
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(sql, params)
result = cur.fetchone()
finally:
conn.close()
return result[0] if result and result[0] else 0
# ── 分析函数 ──────────────────────────────────────────
def classify_period(hour):
"""根据小时数返回时段名称"""
for name, (lo, hi) in PERIODS.items():
if lo <= hour < hi:
return name
return "未知"
def analyze_weekly_distribution(records):
"""
分析一周内分布: 周一至周日各天完课数 + 周一至周五时段分布
返回: (day_counts, weekday_periods)
"""
day_counts = defaultdict(int)
weekday_periods = defaultdict(lambda: defaultdict(int))
today = datetime.now().date()
for r in records:
dt = r["updated_at"]
if dt is None:
continue
# dt is timezone-aware, convert to local naive for analysis
if hasattr(dt, 'tzinfo') and dt.tzinfo is not None:
# PostgreSQL returns tz-aware, but we just need local time
pass
weekday = dt.weekday() # 0=Mon
hour = dt.hour
period = classify_period(hour)
day_counts[weekday] += 1
if weekday < 5:
weekday_periods[period][weekday] += 1
return day_counts, weekday_periods
def analyze_weekly_trend(records):
"""
按周统计完课趋势
返回: (weeks_data, analysis_dict)
"""
if not records:
return [], {}
week_counts = defaultdict(int)
for r in records:
dt = r["updated_at"]
if dt is None:
continue
iso = dt.isocalendar()
year, week_num = iso[0], iso[1]
week_counts[(year, week_num)] += 1
sorted_weeks = sorted(week_counts.keys())
weeks_data = [(y, w, week_counts[(y, w)]) for y, w in sorted_weeks]
total_weeks = len(weeks_data)
total_lessons = sum(c for _, _, c in weeks_data)
avg_per_week = round(total_lessons / total_weeks, 1) if total_weeks > 0 else 0
# 时间跨度(含空周)
if sorted_weeks:
first = datetime.fromisocalendar(sorted_weeks[0][0], sorted_weeks[0][1], 1)
last = datetime.fromisocalendar(sorted_weeks[-1][0], sorted_weeks[-1][1], 1)
total_span_weeks = ((last - first).days // 7) + 1
all_weeks_in_span = set()
cur = first
while cur <= last:
iso = cur.isocalendar()
all_weeks_in_span.add((iso[0], iso[1]))
cur += timedelta(days=7)
active_weeks = set(sorted_weeks)
empty_weeks = sorted(all_weeks_in_span - active_weeks)
else:
total_span_weeks = 0
empty_weeks = []
consecutive = (len(empty_weeks) == 0)
# 趋势: 前半段 vs 后半段
mid = len(weeks_data) // 2
first_half_data = weeks_data[:mid]
first_half_avg = sum(c for _, _, c in first_half_data) / mid if mid > 0 else 0
second_half_start = mid if len(weeks_data) % 2 == 0 else mid + 1
second_half_data = weeks_data[second_half_start:]
second_half_avg = sum(c for _, _, c in second_half_data) / len(second_half_data) if second_half_data else 0
trend = "持平"
if first_half_avg > 0:
ratio = second_half_avg / first_half_avg
if ratio > 1.15:
trend = "上涨 ↑"
elif ratio < 0.85:
trend = "下降 ↓"
return weeks_data, {
"total_weeks": total_weeks,
"total_span_weeks": total_span_weeks,
"total_lessons": total_lessons,
"avg_per_week": avg_per_week,
"consecutive": consecutive,
"empty_weeks": empty_weeks,
"first_half_avg": round(first_half_avg, 1),
"second_half_avg": round(second_half_avg, 1),
"trend": trend,
}
# ── 输出格式化 ────────────────────────────────────────
def format_report(role_id, records, excluded_count, day_counts, weekday_periods, weeks_data, analysis):
"""生成 Markdown 格式分析报告"""
lines = []
now_str = datetime.now().strftime('%Y-%m-%d %H:%M')
lines.append(f"# 📊 学习时间分析报告 — 角色 {role_id}")
lines.append(f"")
lines.append(f"**分析时间**: {now_str}")
lines.append(f"**有效完课记录**: {len(records)}")
if excluded_count > 0:
lines.append(f"**已排除寒暑假记录**: {excluded_count}寒假1-2月、暑假7-8月不算入分析")
lines.append(f"")
if not records:
lines.append("> ⚠️ 该角色没有非寒暑假期间的完课记录,无法进行分析。")
return "\n".join(lines)
# ═══ 一、一周时间分布 ═══
lines.append(f"---")
lines.append(f"## 一、一周时间分布")
lines.append(f"")
# 日分布表
lines.append(f"### 各天完课数量")
lines.append(f"")
total = sum(day_counts.values())
max_day = max(day_counts.values()) if day_counts else 1
lines.append(f"| 星期 | 完课数 | 占比 |")
lines.append(f"|------|--------|------|")
for i, name in enumerate(WEEKDAY_NAMES):
cnt = day_counts.get(i, 0)
pct = f"{cnt / total * 100:.1f}%" if total > 0 else "0%"
bar = "" * max(1, int(cnt / max_day * 20)) if cnt > 0 else ""
lines.append(f"| {name} | {cnt} {bar} | {pct} |")
lines.append(f"")
# 规律小结
weekday_total = sum(day_counts.get(i, 0) for i in range(5))
weekend_total = sum(day_counts.get(i, 0) for i in range(5, 7))
lines.append(f"### 规律小结")
lines.append(f"")
if weekend_total > 0:
sat = day_counts.get(5, 0)
sun = day_counts.get(6, 0)
lines.append(f"- **周末上课**: ✅ 是 — 周六 {sat} 节,周日 {sun}")
else:
lines.append(f"- **周末上课**: ❌ 否 — 周末无完课记录")
# 时段分布(周一至周五)
lines.append(f"")
lines.append(f"### 周一至周五上课时段分布")
lines.append(f"")
lines.append(f"| 时段 | 周一 | 周二 | 周三 | 周四 | 周五 | 合计 |")
lines.append(f"|------|------|------|------|------|------|------|")
for period in ["上午", "中午", "下午", "晚上", "凌晨"]:
period_data = weekday_periods.get(period, {})
period_total = sum(period_data.values())
if period_total == 0:
continue
row = [period]
for d in range(5):
cnt = period_data.get(d, 0)
row.append(str(cnt) if cnt > 0 else "-")
row.append(str(period_total))
lines.append(f"| {' | '.join(row)} |")
lines.append(f"")
# 时段规律
lines.append(f"**时段规律分析**:")
for period in ["上午", "中午", "下午", "晚上"]:
period_data = weekday_periods.get(period, {})
period_sum = sum(period_data.values())
if period_sum == 0:
continue
pct = period_sum / weekday_total * 100 if weekday_total > 0 else 0
active_days = [WEEKDAY_NAMES[d] for d in range(5) if period_data.get(d, 0) > 0]
if active_days:
lines.append(f"- **{period}**{period_sum}节, {pct:.0f}%)→ 集中在 {''.join(active_days)}")
else:
lines.append(f"- **{period}**{period_sum}节, {pct:.0f}%")
lines.append(f"")
# ═══ 二、跨周学习趋势 ═══
lines.append(f"---")
lines.append(f"## 二、跨周学习趋势")
lines.append(f"")
lines.append(f"### 基本数据")
lines.append(f"- 完课跨越 **{analysis['total_span_weeks']}** 个自然周(含空周),有课周数 **{analysis['total_weeks']}** 周")
lines.append(f"- 有效完课总数 **{analysis['total_lessons']}** 节")
lines.append(f"- 平均每周完课 **{analysis['avg_per_week']}** 节")
lines.append(f"- 连续性: {'✅ 每周连续上课,无中断' if analysis['consecutive'] else '⚠️ 存在中断周(见下方)'}")
lines.append(f"")
if analysis["empty_weeks"]:
lines.append(f"### 中断周明细")
empty_list = []
for y, w in sorted(analysis["empty_weeks"]):
monday = datetime.fromisocalendar(y, w, 1)
empty_list.append(f"{y}年W{w:02d}{monday.strftime('%m/%d')}起)")
lines.append(f"- {', '.join(empty_list)}")
lines.append(f"")
lines.append(f"### 各周完课详情")
lines.append(f"")
lines.append(f"| 周次 | 起止日期 | 完课数 | 趋势 |")
lines.append(f"|------|----------|--------|------|")
max_count = max(c for _, _, c in weeks_data) if weeks_data else 1
for i, (y, w, cnt) in enumerate(weeks_data):
monday = datetime.fromisocalendar(y, w, 1)
sunday = monday + timedelta(days=6)
date_range = f"{monday.strftime('%m/%d')}-{sunday.strftime('%m/%d')}"
marker = ""
if i > 0:
prev_cnt = weeks_data[i - 1][2]
if prev_cnt > 0 and cnt >= prev_cnt * 2:
marker = "📈 突增"
elif cnt > prev_cnt * 1.3:
marker = "📈"
elif prev_cnt > 0 and cnt < prev_cnt * 0.7:
marker = "📉"
bar_len = max(1, int(cnt / max_count * 15)) if cnt > 0 else 0
bar = "" * bar_len if bar_len > 0 else ""
lines.append(f"| {y}W{w:02d} | {date_range} | {cnt} {bar} | {marker} |")
lines.append(f"")
# 趋势总结
lines.append(f"### 趋势分析")
lines.append(f"- **整体趋势**: {analysis['trend']}")
first_half_weeks = len(weeks_data) // 2
second_half_weeks = len(weeks_data) - first_half_weeks
lines.append(f" - 前半段(前 {first_half_weeks} 周)平均: {analysis['first_half_avg']} 节/周")
lines.append(f" - 后半段(后 {second_half_weeks} 周)平均: {analysis['second_half_avg']} 节/周")
lines.append(f"")
# 特殊事件
if len(weeks_data) >= 2:
counts = [c for _, _, c in weeks_data]
events_found = []
for i in range(1, len(counts)):
if counts[i - 1] > 0 and counts[i] >= counts[i - 1] * 2:
y, w, _ = weeks_data[i]
monday = datetime.fromisocalendar(y, w, 1)
events_found.append(f"⚡ **{y}年W{w:02d}周({monday.strftime('%m/%d')}起)完课量突增**{counts[i-1]}{counts[i]}")
break
for i in range(1, len(counts)):
if counts[i - 1] >= 3 and counts[i - 1] > 0 and counts[i] <= 1:
y, w, _ = weeks_data[i]
monday = datetime.fromisocalendar(y, w, 1)
events_found.append(f"🔻 **{y}年W{w:02d}周({monday.strftime('%m/%d')}起)完课量骤降**{counts[i-1]}{counts[i]}")
break
if events_found:
lines.append(f"**值得关注的变化**:")
for ev in events_found:
lines.append(f"- {ev}")
lines.append(f"")
# ═══ 三、完课记录明细 ═══
lines.append(f"---")
lines.append(f"## 三、完课记录明细")
lines.append(f"")
lines.append(f"| 序号 | 日期 | 时间 | 星期 | 时段 | 级别 | 课程ID |")
lines.append(f"|------|------|------|------|------|------|--------|")
for i, r in enumerate(records, 1):
dt = r["updated_at"]
if dt is None:
continue
date_str = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H:%M")
weekday = WEEKDAY_NAMES[dt.weekday()]
period = classify_period(dt.hour)
level = r.get("level") or "-"
chapter_id = r.get("chapter_id") or "-"
lines.append(f"| {i} | {date_str} | {time_str} | {weekday} | {period} | {level} | {chapter_id} |")
lines.append(f"")
return "\n".join(lines)
# ── 主函数 ────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print("用法: python3 studytime_analysis.py <role_id>", file=sys.stderr)
sys.exit(1)
try:
role_id = int(sys.argv[1])
except ValueError:
print(f"错误: 角色ID必须是数字收到: {sys.argv[1]}", file=sys.stderr)
sys.exit(1)
records = fetch_completion_records(role_id)
excluded_count = count_excluded_records(role_id)
day_counts, weekday_periods = analyze_weekly_distribution(records)
weeks_data, analysis = analyze_weekly_trend(records)
report = format_report(role_id, records, excluded_count, day_counts, weekday_periods, weeks_data, analysis)
print(report)
if __name__ == "__main__":
main()