Claude Code Plugins

Community-maintained marketplace

Feedback

loading-insurance-data

@alongor666/weekly_report
0
0

加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name loading-insurance-data
description 加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。

保险数据加载器

核心功能

处理保险业务周度CSV数据的完整加载流程:

  • ✅ 智能检测可用周次
  • ✅ 灵活周期范围设置
  • ✅ 多年度数据支持(2024/2025)
  • ✅ 数据质量验证
  • ✅ 标准化预处理

立即使用

from pathlib import Path
import pandas as pd

# 1. 检测可用周次
def detect_available_weeks(data_folder="2025年保单"):
    import re
    available = set()

    for file in Path(data_folder).glob("*保单第*周*.csv"):
        match = re.search(r'第(\d+)周', file.name)
        if match:
            available.add(int(match.group(1)))

    return sorted(available)

# 2. 加载单周数据
def load_week_data(week, data_folder="2025年保单"):
    pattern = f"*保单第{week}周*.csv"
    files = list(Path(data_folder).glob(pattern))

    if not files:
        return None

    df = pd.read_csv(files[0], encoding='utf-8-sig')
    df['week_number'] = week
    return df

# 3. 批量加载
weeks = detect_available_weeks()
print(f"可用周次: {weeks}")

data = {}
for week in weeks:
    df = load_week_data(week)
    if df is not None:
        data[week] = df
        print(f"✅ 第{week}周: {len(df):,}行")

数据文件结构

文件命名规范

{YEAR}保单第{WEEK}周变动成本明细表.csv

示例:
- 2025保单第28周变动成本明细表.csv
- 2025保单第43周变动成本明细表.csv

关键字段

字段名 类型 说明
policy_start_year int 保单年度
third_level_organization str 三级机构
business_type_category str 业务类型
is_new_energy_vehicle bool 是否新能源车
signed_premium_yuan float 签单保费
matured_premium_yuan float 满期保费
reported_claim_payment_yuan float 已报告赔款
expense_amount_yuan float 费用金额
claim_case_count int 赔案件数
policy_count int 保单件数

数据处理流程

步骤1: 周期检测

# 自动扫描目录
available_weeks = detect_available_weeks()

# 确定分析周期
start_week = 28
end_week = 43
analysis_weeks = list(range(start_week, end_week + 1))

# 检查缺失
missing = [w for w in analysis_weeks if w not in available_weeks]
if missing:
    print(f"⚠️  缺失周次: {missing}")

步骤2: 数据加载

loaded_data = {}

for week in analysis_weeks:
    if week in available_weeks:
        df = load_week_data(week)
        if df is not None:
            loaded_data[week] = df

步骤3: 数据清洗

def preprocess_data(df):
    """标准化数据处理"""
    # 过滤本部
    df = df[df['third_level_organization'] != '本部'].copy()

    # 提取保单年度
    df['policy_year'] = df['policy_start_year'].astype(str).str.extract(r'(202[45])')[0]

    # 数值型字段转换
    numeric_cols = [
        'signed_premium_yuan',
        'matured_premium_yuan',
        'reported_claim_payment_yuan',
        'expense_amount_yuan',
        'claim_case_count',
        'policy_count'
    ]

    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

    return df

# 应用清洗
for week in loaded_data:
    loaded_data[week] = preprocess_data(loaded_data[week])

步骤4: 数据验证

def validate_data(df, week):
    """数据质量检查"""
    issues = []

    # 检查空数据
    if len(df) == 0:
        issues.append("数据为空")

    # 检查关键字段
    required = ['signed_premium_yuan', 'third_level_organization']
    missing = [col for col in required if col not in df.columns]
    if missing:
        issues.append(f"缺失字段: {missing}")

    # 检查负值
    if 'signed_premium_yuan' in df.columns:
        if (df['signed_premium_yuan'] < 0).any():
            issues.append("存在负保费")

    if issues:
        print(f"⚠️  第{week}周问题: {', '.join(issues)}")
        return False

    return True

数据输出结构

按年度分组

def group_by_year(loaded_data):
    """按保单年度分组"""
    grouped = {'2024': {}, '2025': {}}

    for week, df in loaded_data.items():
        for year in ['2024', '2025']:
            year_df = df[df['policy_year'] == year]
            if len(year_df) > 0:
                grouped[year][week] = year_df

    return grouped

# 使用示例
data_by_year = group_by_year(loaded_data)
print(f"2024保单周次: {list(data_by_year['2024'].keys())}")
print(f"2025保单周次: {list(data_by_year['2025'].keys())}")

常见问题

Q1: 文件编码错误

问题: UnicodeDecodeError

解决: 使用 encoding='utf-8-sig'

df = pd.read_csv(file, encoding='utf-8-sig')

Q2: 周次文件缺失

问题: 第32周、第38周等文件不存在

解决: 自动跳过并记录

if week not in available_weeks:
    print(f"⚠️  第{week}周: 文件不存在,跳过")
    continue

Q3: 数据类型错误

问题: 保费字段被识别为字符串

解决: 强制数值转换

df['signed_premium_yuan'] = pd.to_numeric(
    df['signed_premium_yuan'],
    errors='coerce'
).fillna(0)

Q4: 内存占用过大

问题: 加载多周数据内存不足

解决: 按需加载或只读取必要列

# 只读取需要的列
usecols = [
    'policy_year',
    'third_level_organization',
    'signed_premium_yuan',
    'matured_premium_yuan',
    'reported_claim_payment_yuan'
]
df = pd.read_csv(file, usecols=usecols, encoding='utf-8-sig')

性能优化

批量加载优化

from concurrent.futures import ThreadPoolExecutor

def load_weeks_parallel(weeks, data_folder):
    """并行加载多周数据"""
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(load_week_data, week, data_folder): week
            for week in weeks
        }

        results = {}
        for future in futures:
            week = futures[future]
            try:
                df = future.result()
                if df is not None:
                    results[week] = df
            except Exception as e:
                print(f"❌ 第{week}周加载失败: {e}")

        return results

内存管理

import gc

# 加载和处理后释放内存
for week in weeks:
    df = load_week_data(week)
    processed = preprocess_data(df)
    # ... 使用数据 ...
    del df, processed
    gc.collect()

参考资源

脚本工具:

  • scripts/quick_load.py - 快速数据加载工具
  • scripts/data_validator.py - 数据质量检查工具

参考文档:

  • reference/data_schema.md - 完整字段说明
  • reference/data_quality_rules.md - 数据质量标准

更新日志

  • v1.0 (2025-11-04): 初始版本
    • 基础加载功能
    • 智能周期检测
    • 数据验证和清洗
    • 多年度支持