Pandas大文件分块读取与内存优化技巧
本节学习目标
- 理解 Pandas 的内存消耗机制
- 掌握
chunksize分块读取大文件 - 学会数据类型降级以减少内存占用
- 了解 Parquet 格式的优势与用法
- 了解 Dask 框架的基本使用
- 学会使用内存分析工具定位瓶颈
为什么学这个?
在前面的课程中,我们处理的都是小型数据集(几十行到几百行)。但在真实工作场景中,数据量往往远超你的想象:
- 电商平台的日交易记录:500 万条
- 日志文件:每天 10GB
- 传感器数据:每秒一条,一个月就有 260 万条
当你尝试用 pd.read_csv('big_data.csv') 读取一个 5GB 的 CSV 文件时,很可能会遇到:
- MemoryError:内存不够,程序崩溃
- 极慢的速度:操作一个 DataFrame 要等几分钟
本节将教你一套完整的"大数据处理工具箱",让你的 Pandas 代码能处理比你电脑内存还大的文件。
打个比方:如果说前面的课程教你怎么"开车",本节教你怎么"给车升级引擎"——让原本只能跑 60km/h 的车,能够承载更重的货物、跑得更快。
核心知识点讲解
Pandas 的内存消耗分析
1. 查看 DataFrame 的内存占用
import pandas as pd
import numpy as np
# 创建一个示例 DataFrame
df = pd.DataFrame({
'整数列': np.random.randint(0, 100, 10000),
'浮点列': np.random.randn(10000),
'字符串列': np.random.choice(['A', 'B', 'C', 'D', 'E'], 10000),
'布尔列': np.random.choice([True, False], 10000)
})
# ===== 查看内存占用 =====
print("各列内存占用:")
print(df.memory_usage(deep=True))
# deep=True 会精确计算字符串的内存(否则只计算指针)
print(f"\n总内存占用:{df.memory_usage(deep=True).sum() / 1024:.2f} KB")
print(f"总行数:{len(df)}")
2. 数据类型对内存的影响
# 不同数据类型的内存对比
df_int64 = pd.DataFrame({'A': np.random.randint(0, 100, 10000)})
df_int32 = pd.DataFrame({'A': np.random.randint(0, 100, 10000)}).astype('int32')
df_int8 = pd.DataFrame({'A': np.random.randint(0, 100, 10000)}).astype('int8')
df_float64 = pd.DataFrame({'A': np.random.randn(10000)})
df_float32 = pd.DataFrame({'A': np.random.randn(10000)}).astype('float32')
print(f"int64:{df_int64.memory_usage(deep=True).sum()} bytes")
print(f"int32:{df_int32.memory_usage(deep=True).sum()} bytes")
print(f"int8:{df_int8.memory_usage(deep=True).sum()} bytes")
print(f"float64:{df_float64.memory_usage(deep=True).sum()} bytes")
print(f"float32:{df_float32.memory_usage(deep=True).sum()} bytes")
关键理解:Pandas 默认使用 64 位数据类型(int64、float64),但很多情况下并不需要这么大的范围。通过降级数据类型,可以大幅减少内存。
数据类型降级(Memory Optimization)
1. 数值列降级
def optimize_numerical(df):
"""
自动优化数值列的数据类型
"""
df_optimized = df.copy()
for col in df.select_dtypes(include=['int64']).columns:
col_min = df[col].min()
col_max = df[col].max()
# int8: -128 到 127
if col_min >= -128 and col_max <= 127:
df_optimized[col] = df[col].astype('int8')
# int16: -32768 到 32767
elif col_min >= -32768 and col_max <= 32767:
df_optimized[col] = df[col].astype('int16')
# int32: -2147483648 到 2147483647
elif col_min >= -2147483648 and col_max <= 2147483647:
df_optimized[col] = df[col].astype('int32')
for col in df.select_dtypes(include=['float64']).columns:
df_optimized[col] = df[col].astype('float32')
return df_optimized
# 测试
df = pd.DataFrame({
'小整数': np.random.randint(0, 100, 10000), # 可以用 int8
'中整数': np.random.randint(0, 50000, 10000), # 可以用 int16
'大整数': np.random.randint(0, 1000000000, 10000), # 需要 int32
'浮点数': np.random.randn(10000) # 可以用 float32
})
print("优化前:")
print(df.memory_usage(deep=True))
df_opt = optimize_numerical(df)
print("\n优化后:")
print(df_opt.memory_usage(deep=True))
reduction = (1 - df_opt.memory_usage(deep=True).sum() / df.memory_usage(deep=True).sum()) * 100
print(f"\n内存减少:{reduction:.1f}%")
2. 字符串列优化(Category 类型)
# 当字符串列的**唯一值数量远少于总行数**时,使用 category 类型
df = pd.DataFrame({
'性别': np.random.choice(['男', '女'], 100000), # 只有 2 个唯一值
'城市': np.random.choice(['北京', '上海', '广州', '深圳'], 100000), # 4 个
'部门': np.random.choice(['技术', '销售', '管理', '人事', '财务'], 100000), # 5 个
'姓名': [f'用户{i}' for i in range(100000)] # 10 万个唯一值,不适合 category
})
print("优化前:")
for col in df.columns:
print(f" {col}: {df[col].memory_usage(deep=True) / 1024:.1f} KB, "
f"唯一值: {df[col].nunique()}")
# 将低唯一值比例的列转为 category
for col in ['性别', '城市', '部门']:
df[col] = df[col].astype('category')
print("\n优化后:")
for col in df.columns:
print(f" {col}: {df[col].memory_usage(deep=True) / 1024:.1f} KB, "
f"唯一值: {df[col].nunique()}")
total_before = df.memory_usage(deep=True).sum()
df_opt = df.copy()
df_opt['姓名'] = df_opt['姓名'].astype('str') # 不变
reduction = (1 - df.memory_usage(deep=True).sum() / total_before) * 100
print(f"\n总内存减少约:{(1 - df.memory_usage(deep=True).sum() / total_before) * 100:.1f}%")
何时使用 category:
- 字符串列的唯一值数量占总行数的比例小于 50%
- 比例越低,效果越好
- 分类变量(如性别、地区、状态)非常适合
chunksize 分块读取大文件
当文件太大无法一次性读入内存时,可以用 chunksize 参数分块读取。
# ===== 模拟大文件 =====
# 先创建一个"大" CSV 文件(100 万行)
np.random.seed(42)
n = 1_000_000
large_df = pd.DataFrame({
'ID': range(n),
'类别': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'数值': np.random.randn(n),
'金额': np.random.randint(10, 10000, n)
})
# large_df.to_csv('large_data.csv', index=False) # 实际文件约 30MB
# ===== 分块读取 =====
# 方法一:逐块处理
chunk_size = 100_000 # 每次读取 10 万行
results = []
for chunk in pd.read_csv('large_data.csv', chunksize=chunk_size):
# 对每个 chunk 进行处理
# 例如:计算统计量
result = chunk.groupby('类别')['金额'].sum()
results.append(result)
# 合并所有 chunk 的结果
final_result = pd.concat(results).groupby(level=0).sum()
print("分块读取汇总结果:")
print(final_result)
# 方法二:只读取需要的列
chunk = pd.read_csv('large_data.csv', usecols=['类别', '金额'], nrows=1000)
print("\n只读取指定列:")
print(chunk.head())
# 方法三:在读取时指定数据类型(省内存)
dtypes = {
'ID': 'int32',
'类别': 'category',
'数值': 'float32',
'金额': 'int32'
}
chunk = pd.read_csv('large_data.csv', dtype=dtypes, nrows=1000)
print("\n指定数据类型读取:")
print(chunk.memory_usage(deep=True))
分块处理的典型模式
def process_large_file(filepath, chunksize=100000):
"""
分块处理大文件的典型模式
"""
# 场景1:逐步累加统计
total_count = 0
total_sum = 0
for chunk in pd.read_csv(filepath, chunksize=chunksize):
total_count += len(chunk)
total_sum += chunk['金额'].sum()
print(f"总行数:{total_count}")
print(f"总金额:{total_sum}")
# 场景2:逐步构建结果
results = []
for chunk in pd.read_csv(filepath, chunksize=chunksize):
# 对每个块进行 groupby 聚合
agg = chunk.groupby('类别').agg(
总金额=('金额', 'sum'),
平均金额=('金额', 'mean'),
数量=('金额', 'count')
)
results.append(agg)
# 合并所有块的结果
final = pd.concat(results)
final = final.groupby(level=0).agg({
'总金额': 'sum',
'平均金额': 'mean',
'数量': 'sum'
})
return final
Parquet 格式
1. 什么是 Parquet?
Parquet 是一种列式存储的文件格式,相比 CSV 有以下优势:
| 特性 | CSV | Parquet |
|---|---|---|
| 存储方式 | 行式 | 列式 |
| 文件大小 | 大 | 小(通常压缩到 CSV 的 10%-30%) |
| 读取速度 | 慢 | 快(可按列读取) |
| 数据类型 | 全是字符串 | 保留原始类型 |
| 压缩 | 无(需手动) | 内置压缩 |
2. Parquet 的基本操作
# ===== 写入 Parquet =====
# df.to_parquet('data.parquet')
# ===== 读取 Parquet =====
# df = pd.read_parquet('data.parquet')
# ===== 指定列读取(只读需要的列)=====
# df = pd.read_parquet('data.parquet', columns=['类别', '金额'])
# ===== 指定引擎 =====
# pip install pyarrow 或 pip install fastparquet
# df.to_parquet('data.parquet', engine='pyarrow')
3. 格式对比
import os
import time
# 创建示例数据
np.random.seed(42)
n = 500_000
df = pd.DataFrame({
'ID': range(n),
'类别': np.random.choice(list('ABCDEFGHIJ'), n),
'数值': np.random.randn(n),
'金额': np.random.randint(10, 10000, n)
})
# 保存为 CSV
csv_path = 'test_data.csv'
df.to_csv(csv_path, index=False)
csv_size = os.path.getsize(csv_path)
# 保存为 Parquet
parquet_path = 'test_data.parquet'
df.to_parquet(parquet_path, index=False)
parquet_size = os.path.getsize(parquet_path)
print(f"CSV 大小:{csv_size / 1024 / 1024:.2f} MB")
print(f"Parquet 大小:{parquet_size / 1024 / 1024:.2f} MB")
print(f"压缩率:{(1 - parquet_size / csv_size) * 100:.1f}%")
# 读取速度对比
t0 = time.time()
df_csv = pd.read_csv(csv_path)
t1 = time.time()
t0 = time.time()
df_parquet = pd.read_parquet(parquet_path)
t2 = time.time()
print(f"\nCSV 读取时间:{t1 - t0:.3f} 秒")
print(f"Parquet 读取时间:{t2 - t0:.3f} 秒")
# 读取后检查
print(f"\n数据类型保持:")
print(df_parquet.dtypes)
建议:在数据处理流水线中,将清洗后的数据保存为 Parquet 格式,后续分析直接读取 Parquet,速度快、省空间。
Dask 简介
当数据量超过单机内存时,Dask 是一个强大的选择。它可以理解为**“分布式的 Pandas”**。
# pip install dask
import dask.dataframe as dd
# ===== 读取大文件 =====
# dask_df = dd.read_csv('large_data.csv')
# ===== 大部分 Pandas API 在 Dask 中都有 =====
# result = dask_df.groupby('类别')['金额'].sum()
# ===== 触发计算 =====
# result_computed = result.compute()
# ===== Dask 的延迟计算 =====
# Dask 不会立即执行,而是构建一个"计算图"
# 只有调用 .compute() 时才会真正执行
# 这使得 Dask 能优化执行计划
# ===== 与 Pandas 的主要区别 =====
# 1. 不支持 inplace 操作
# 2. 部分 API 不兼容
# 3. 适合"读取→转换→输出"的流水线式处理
# 4. 可以跨多台机器运行
何时使用 Dask:
- 数据量超过 10GB
- Pandas 内存不足
- 需要利用多核 CPU 加速
- 不需要复杂的交互式操作
内存分析工具
1. memory_profiler
# pip install memory_profiler
# 在 Jupyter 中使用:
# %load_ext memory_profiler
# %memit df = pd.read_csv('large_data.csv')
2. 实用内存监控函数
import psutil
import os
def get_memory_usage():
"""获取当前进程的内存使用量"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return mem_info.rss / 1024 / 1024 # MB
print(f"当前内存使用:{get_memory_usage():.2f} MB")
# 对比处理前后的内存
mem_before = get_memory_usage()
df = pd.DataFrame({
'A': np.random.randn(1_000_000),
'B': np.random.choice(list('ABCDE'), 1_000_000)
})
mem_after = get_memory_usage()
print(f"创建 DataFrame 消耗:{mem_after - mem_before:.2f} MB")
3. 减少内存的实用技巧总结
# ===== 技巧汇总 =====
# 1. 读取时只选择需要的列
df = pd.read_csv('data.csv', usecols=['列1', '列2', '列3'])
# 2. 在读取时指定合适的数据类型
df = pd.read_csv('data.csv', dtype={
'ID': 'int32',
'类别': 'category',
'数值': 'float32'
})
# 3. 用更小的数值类型
# int64 → int32/int16/int8
# float64 → float32
# 4. 字符串列转 category(唯一值少时)
df['分类'] = df['分类'].astype('category')
# 5. 删除不需要的列释放内存
# del df['不需要的列']
# 或
df = df.drop(columns=['不需要的列'])
# 6. 及时释放不需要的变量
# import gc
# del big_df
# gc.collect()
# 7. 使用 Parquet 格式存储中间结果
# df.to_parquet('cleaned.parquet')
# 8. 大文件分块读取
# for chunk in pd.read_csv('big.csv', chunksize=100000):
# process(chunk)
实战练习
练习 1:内存优化挑战
题目:对一个"大"DataFrame 进行内存优化,目标是将内存减少 50% 以上。
# 参考答案
import pandas as pd
import numpy as np
# 创建模拟大数据集
np.random.seed(42)
n = 500_000
df = pd.DataFrame({
'ID': range(n),
'性别': np.random.choice(['男', '女'], n),
'省份': np.random.choice([
'北京', '上海', '广东', '浙江', '江苏', '山东', '河南', '四川'
], n),
'年龄段': np.random.choice(['18-25', '26-35', '36-45', '46-55', '56+'], n),
'评分': np.random.randint(1, 6, n),
'消费金额': np.random.uniform(10, 10000, n).round(2),
'订单数': np.random.randint(0, 100, n),
'标签': np.random.choice(['A', 'B', 'C'], n),
'备注': [f'备注{i}' for i in range(n)]
})
# 原始内存
original_mem = df.memory_usage(deep=True).sum()
print(f"原始内存:{original_mem / 1024 / 1024:.2f} MB")
# ===== 优化 =====
df_opt = df.copy()
# 1. 数值列降级
df_opt['ID'] = df_opt['ID'].astype('int32')
df_opt['评分'] = df_opt['评分'].astype('int8')
df_opt['订单数'] = df_opt['订单数'].astype('int16')
df_opt['消费金额'] = df_opt['消费金额'].astype('float32')
# 2. 字符串列转 category
for col in ['性别', '省份', '年龄段', '标签']:
df_opt[col] = df_opt[col].astype('category')
optimized_mem = df_opt.memory_usage(deep=True).sum()
print(f"优化后内存:{optimized_mem / 1024 / 1024:.2f} MB")
reduction = (1 - optimized_mem / original_mem) * 100
print(f"内存减少:{reduction:.1f}%")
# 验证数据一致性
assert (df['ID'] == df_opt['ID']).all()
assert (df['消费金额'].round(2) == df_opt['消费金额'].round(2)).all()
print("数据一致性验证通过!")
练习 2:Parquet 格式对比实验
题目:比较 CSV 和 Parquet 的读写性能。
# 参考答案
import pandas as pd
import numpy as np
import time
import os
# 创建测试数据
np.random.seed(42)
n = 300_000
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=n, freq='H'),
'category': np.random.choice(list('ABCDEFGH'), n),
'value': np.random.randn(n),
'amount': np.random.randint(1, 1000, n)
})
# 写入测试
t0 = time.time()
df.to_csv('test.csv', index=False)
csv_write = time.time() - t0
t0 = time.time()
df.to_parquet('test.parquet', index=False)
parquet_write = time.time() - t0
# 读取测试
t0 = time.time()
df1 = pd.read_csv('test.csv')
csv_read = time.time() - t0
t0 = time.time()
df2 = pd.read_parquet('test.parquet')
parquet_read = time.time() - t0
# 文件大小
csv_size = os.path.getsize('test.csv')
parquet_size = os.path.getsize('test.parquet')
print("=== 性能对比 ===")
print(f"写入时间 - CSV: {csv_write:.3f}s, Parquet: {parquet_write:.3f}s")
print(f"读取时间 - CSV: {csv_read:.3f}s, Parquet: {parquet_read:.3f}s")
print(f"文件大小 - CSV: {csv_size/1024/1024:.2f}MB, Parquet: {parquet_size/1024/1024:.2f}MB")
print(f"压缩率: {(1 - parquet_size/csv_size)*100:.1f}%")
# 数据类型保持
print("\n=== 数据类型 ===")
print(f"CSV 读取后:\n{df1.dtypes}")
print(f"\nParquet 读取后:\n{df2.dtypes}")
本节总结
本节我们学习了 Pandas 性能优化的核心技术:
内存优化:
- 使用
memory_usage(deep=True)查看内存占用 - 数值列降级:
int64 → int32/int16/int8,float64 → float32 - 字符串列转
category(唯一值少时效果显著)
大文件处理:
chunksize分块读取:逐块处理,避免内存溢出usecols只读取需要的列- 读取时指定
dtype减少初始内存
高效存储格式:
- Parquet:列式存储,体积小、读取快、保留数据类型
- 推荐在数据处理流水线中使用 Parquet 存储中间结果
大数据框架:
- Dask:兼容 Pandas API,支持超出内存的数据处理
- 适合数据量超过 10GB 的场景
核心原则:
- 只在需要时加载数据(usecols、chunksize)
- 用最小的数据类型(类型降级、category)
- 用高效的存储格式(Parquet)
- 数据量超过内存限制时考虑 Dask
到此这篇关于Pandas大文件分块读取与内存优化技巧的文章就介绍到这了,更多相关Pandas大文件读取内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
python类别数据数字化LabelEncoder VS OneHotEncoder区别
这篇文章主要为大家介绍了机器学习:数据预处理之将类别数据数字化的方法LabelEncoder VS OneHotEncoder区别详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-09-09


最新评论