| name | loading-insurance-data |
| description | 加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。 |
保险数据加载器
核心功能
处理保险业务周度CSV数据的完整加载流程:
- ✅ 智能检测可用周次
- ✅ 灵活周期范围设置
- ✅ 多年度数据支持(2024/2025)
- ✅ 数据质量验证
- ✅ 标准化预处理
立即使用
from pathlib import Path
import pandas as pd
# 1. 检测可用周次
def detect_available_weeks(data_folder="2025年保单"):
import re
available = set()
for file in Path(data_folder).glob("*保单第*周*.csv"):
match = re.search(r'第(\d+)周', file.name)
if match:
available.add(int(match.group(1)))
return sorted(available)
# 2. 加载单周数据
def load_week_data(week, data_folder="2025年保单"):
pattern = f"*保单第{week}周*.csv"
files = list(Path(data_folder).glob(pattern))
if not files:
return None
df = pd.read_csv(files[0], encoding='utf-8-sig')
df['week_number'] = week
return df
# 3. 批量加载
weeks = detect_available_weeks()
print(f"可用周次: {weeks}")
data = {}
for week in weeks:
df = load_week_data(week)
if df is not None:
data[week] = df
print(f"✅ 第{week}周: {len(df):,}行")
数据文件结构
文件命名规范
{YEAR}保单第{WEEK}周变动成本明细表.csv
示例:
- 2025保单第28周变动成本明细表.csv
- 2025保单第43周变动成本明细表.csv
关键字段
| 字段名 | 类型 | 说明 |
|---|---|---|
policy_start_year |
int | 保单年度 |
third_level_organization |
str | 三级机构 |
business_type_category |
str | 业务类型 |
is_new_energy_vehicle |
bool | 是否新能源车 |
signed_premium_yuan |
float | 签单保费 |
matured_premium_yuan |
float | 满期保费 |
reported_claim_payment_yuan |
float | 已报告赔款 |
expense_amount_yuan |
float | 费用金额 |
claim_case_count |
int | 赔案件数 |
policy_count |
int | 保单件数 |
数据处理流程
步骤1: 周期检测
# 自动扫描目录
available_weeks = detect_available_weeks()
# 确定分析周期
start_week = 28
end_week = 43
analysis_weeks = list(range(start_week, end_week + 1))
# 检查缺失
missing = [w for w in analysis_weeks if w not in available_weeks]
if missing:
print(f"⚠️ 缺失周次: {missing}")
步骤2: 数据加载
loaded_data = {}
for week in analysis_weeks:
if week in available_weeks:
df = load_week_data(week)
if df is not None:
loaded_data[week] = df
步骤3: 数据清洗
def preprocess_data(df):
"""标准化数据处理"""
# 过滤本部
df = df[df['third_level_organization'] != '本部'].copy()
# 提取保单年度
df['policy_year'] = df['policy_start_year'].astype(str).str.extract(r'(202[45])')[0]
# 数值型字段转换
numeric_cols = [
'signed_premium_yuan',
'matured_premium_yuan',
'reported_claim_payment_yuan',
'expense_amount_yuan',
'claim_case_count',
'policy_count'
]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
return df
# 应用清洗
for week in loaded_data:
loaded_data[week] = preprocess_data(loaded_data[week])
步骤4: 数据验证
def validate_data(df, week):
"""数据质量检查"""
issues = []
# 检查空数据
if len(df) == 0:
issues.append("数据为空")
# 检查关键字段
required = ['signed_premium_yuan', 'third_level_organization']
missing = [col for col in required if col not in df.columns]
if missing:
issues.append(f"缺失字段: {missing}")
# 检查负值
if 'signed_premium_yuan' in df.columns:
if (df['signed_premium_yuan'] < 0).any():
issues.append("存在负保费")
if issues:
print(f"⚠️ 第{week}周问题: {', '.join(issues)}")
return False
return True
数据输出结构
按年度分组
def group_by_year(loaded_data):
"""按保单年度分组"""
grouped = {'2024': {}, '2025': {}}
for week, df in loaded_data.items():
for year in ['2024', '2025']:
year_df = df[df['policy_year'] == year]
if len(year_df) > 0:
grouped[year][week] = year_df
return grouped
# 使用示例
data_by_year = group_by_year(loaded_data)
print(f"2024保单周次: {list(data_by_year['2024'].keys())}")
print(f"2025保单周次: {list(data_by_year['2025'].keys())}")
常见问题
Q1: 文件编码错误
问题: UnicodeDecodeError
解决: 使用 encoding='utf-8-sig'
df = pd.read_csv(file, encoding='utf-8-sig')
Q2: 周次文件缺失
问题: 第32周、第38周等文件不存在
解决: 自动跳过并记录
if week not in available_weeks:
print(f"⚠️ 第{week}周: 文件不存在,跳过")
continue
Q3: 数据类型错误
问题: 保费字段被识别为字符串
解决: 强制数值转换
df['signed_premium_yuan'] = pd.to_numeric(
df['signed_premium_yuan'],
errors='coerce'
).fillna(0)
Q4: 内存占用过大
问题: 加载多周数据内存不足
解决: 按需加载或只读取必要列
# 只读取需要的列
usecols = [
'policy_year',
'third_level_organization',
'signed_premium_yuan',
'matured_premium_yuan',
'reported_claim_payment_yuan'
]
df = pd.read_csv(file, usecols=usecols, encoding='utf-8-sig')
性能优化
批量加载优化
from concurrent.futures import ThreadPoolExecutor
def load_weeks_parallel(weeks, data_folder):
"""并行加载多周数据"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(load_week_data, week, data_folder): week
for week in weeks
}
results = {}
for future in futures:
week = futures[future]
try:
df = future.result()
if df is not None:
results[week] = df
except Exception as e:
print(f"❌ 第{week}周加载失败: {e}")
return results
内存管理
import gc
# 加载和处理后释放内存
for week in weeks:
df = load_week_data(week)
processed = preprocess_data(df)
# ... 使用数据 ...
del df, processed
gc.collect()
参考资源
脚本工具:
scripts/quick_load.py- 快速数据加载工具scripts/data_validator.py- 数据质量检查工具
参考文档:
reference/data_schema.md- 完整字段说明reference/data_quality_rules.md- 数据质量标准
更新日志
- v1.0 (2025-11-04): 初始版本
- 基础加载功能
- 智能周期检测
- 数据验证和清洗
- 多年度支持