Python使用csv模块进行文件读写的操作详解
一、核心概念解析
1.1 基础定义:CSV到底是什么?
CSV的全称是"Comma-Separated Values"(逗号分隔值),但它有个小秘密:并不总是用逗号。
看看这几个常见的CSV变体:
# 典型的CSV格式 name,age,city 张三,30,北京 李四,25,上海 # 用分号分隔(欧洲常见) name;age;city 张三;30;北京 李四;25;上海 # 用制表符分隔(TSV文件) name age city 张三 30 北京 李四 25 上海 # 带引号的CSV(包含逗号时) product,price,description "苹果,红富士",5.8,"新鲜,甜" 香蕉,3.2,"进口,黄色"
CSV的本质是纯文本表格,由三部分组成:
- 表头行(可选):定义每列的名称
- 数据行:每行一条记录
- 分隔符:通常是逗号,但也可能是其他字符
1.2 基本语法:csv模块的四大金刚
csv模块的核心是四个类/函数:
import csv
# 1. 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for row in reader:
print(row) # 每行是一个列表
# 2. 写入CSV文件
data = [
['name', 'age', 'city'],
['张三', '30', '北京'],
['李四', '25', '上海']
]
with open('output.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
# 3. 用字典方式读取(推荐!)
with open('data.csv', 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
print(row['name'], row['age']) # 通过列名访问
# 4. 用字典方式写入
data = [
{'name': '张三', 'age': '30', 'city': '北京'},
{'name': '李四', 'age': '25', 'city': '上海'}
]
with open('output_dict.csv', 'w', encoding='utf-8', newline='') as file:
fieldnames = ['name', 'age', 'city']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(data)
注意那个newline=''参数:在Windows上,如果不加这个,每行后面会多一个空行。这是Python处理文本文件的一个坑,记住就好。
1.3 核心特点:为什么选择csv模块?
- 零依赖:Python自带,不用安装任何东西
- 内存友好:流式处理,再大的文件也不怕
- 灵活配置:分隔符、引号字符、编码全都可以自定义
- 简单易用:几行代码就能完成复杂操作
- 兼容性好:处理各种"奇怪"的CSV文件
二、应用场景详解
2.1 读取和分析数据
假设你有一个销售数据文件sales.csv:
date,product,quantity,price,region 2023-01-15,Widget-A,10,29.99,North 2023-01-15,Gadget-X,5,99.99,South 2023-01-16,Widget-A,8,29.99,East 2023-01-16,Thingy-B,3,149.99,West
让我们用csv模块来分析它:
# analyze_sales.py
import csv
from collections import defaultdict
from datetime import datetime
def analyze_sales_data(filepath):
"""
分析销售数据
"""
# 存储统计结果
stats = {
'total_sales': 0,
'total_quantity': 0,
'by_product': defaultdict(float),
'by_region': defaultdict(float),
'by_date': defaultdict(float)
}
with open(filepath, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
try:
# 转换数据类型
quantity = int(row['quantity'])
price = float(row['price'])
date_str = row['date']
# 计算销售额
sales = quantity * price
# 更新统计
stats['total_sales'] += sales
stats['total_quantity'] += quantity
stats['by_product'][row['product']] += sales
stats['by_region'][row['region']] += sales
stats['by_date'][date_str] += sales
except (ValueError, KeyError) as e:
print(f"数据格式错误: {row}, 错误: {e}")
continue
return stats
def print_statistics(stats):
"""
打印统计结果
"""
print("=" * 50)
print("销售数据分析报告")
print("=" * 50)
print(f"\n总销售额: ${stats['total_sales']:.2f}")
print(f"总销售数量: {stats['total_quantity']}")
print("\n按产品统计:")
for product, sales in sorted(stats['by_product'].items(),
key=lambda x: x[1], reverse=True):
print(f" {product}: ${sales:.2f}")
print("\n按地区统计:")
for region, sales in sorted(stats['by_region'].items(),
key=lambda x: x[1], reverse=True):
print(f" {region}: ${sales:.2f}")
print("\n按日期统计:")
for date, sales in sorted(stats['by_date'].items()):
print(f" {date}: ${sales:.2f}")
# 生成测试数据
def create_sample_data():
"""创建示例销售数据"""
data = [
['date', 'product', 'quantity', 'price', 'region'],
['2023-01-15', 'Widget-A', 10, 29.99, 'North'],
['2023-01-15', 'Gadget-X', 5, 99.99, 'South'],
['2023-01-16', 'Widget-A', 8, 29.99, 'East'],
['2023-01-16', 'Thingy-B', 3, 149.99, 'West'],
['2023-01-16', 'Gadget-X', 12, 99.99, 'North'],
['2023-01-17', 'Widget-A', 15, 27.50, 'South'],
]
with open('sales.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
print("示例数据已创建: sales.csv")
if __name__ == "__main__":
# 创建测试数据
create_sample_data()
# 分析数据
stats = analyze_sales_data('sales.csv')
# 打印结果
print_statistics(stats)
2.2 数据清洗和转换
现实中的数据很少是完美的。让我们看看如何处理各种"脏数据":
# clean_data.py
import csv
import re
def clean_csv_file(input_file, output_file):
"""
清洗CSV数据
"""
cleaned_rows = []
with open(input_file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
fieldnames = reader.fieldnames
for i, row in enumerate(reader, 1):
cleaned_row = {}
for field in fieldnames:
original_value = row.get(field, '')
cleaned_value = clean_field(field, original_value)
cleaned_row[field] = cleaned_value
# 检查是否有必要的数据
if is_valid_row(cleaned_row):
cleaned_rows.append(cleaned_row)
else:
print(f"跳过第{i}行无效数据: {row}")
# 写入清洗后的数据
with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(cleaned_rows)
print(f"数据清洗完成: {len(cleaned_rows)} 行有效数据")
def clean_field(field_name, value):
"""
根据字段类型清洗数据
"""
if not value:
return value
# 去除首尾空白
value = value.strip()
# 根据不同字段类型进行清洗
if 'email' in field_name.lower():
return clean_email(value)
elif 'phone' in field_name.lower():
return clean_phone(value)
elif 'date' in field_name.lower():
return clean_date(value)
elif 'price' in field_name.lower() or 'amount' in field_name.lower():
return clean_number(value)
else:
return value
def clean_email(email):
"""清洗邮箱地址"""
email = email.lower().strip()
# 简单的邮箱格式验证
if '@' in email and '.' in email.split('@')[1]:
return email
return ''
def clean_phone(phone):
"""清洗电话号码"""
# 移除非数字字符
digits = re.sub(r'\D', '', phone)
if len(digits) == 11: # 中国手机号
return digits
elif len(digits) == 10: # 固定电话
return digits
else:
return ''
def clean_date(date_str):
"""清洗日期"""
# 尝试不同的日期格式
formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日', '%d/%m/%Y']
for fmt in formats:
try:
from datetime import datetime
dt = datetime.strptime(date_str, fmt)
return dt.strftime('%Y-%m-%d') # 统一格式
except ValueError:
continue
return date_str # 无法解析,返回原值
def clean_number(number_str):
"""清洗数字"""
# 移除货币符号、千分位分隔符等
cleaned = re.sub(r'[^\d\.-]', '', number_str)
try:
return str(float(cleaned))
except ValueError:
return '0'
def is_valid_row(row):
"""检查行是否有效"""
# 必须有姓名
if not row.get('name', '').strip():
return False
# 邮箱必须有效(如果有的话)
email = row.get('email', '')
if email and '@' not in email:
return False
return True
def create_dirty_data():
"""创建包含脏数据的测试文件"""
dirty_data = [
['name', 'email', 'phone', 'join_date', 'amount'],
[' 张三 ', 'ZHANGSAN@EXAMPLE.COM', '138-0013-8000', '2023-01-15', '¥1,000.50'],
['李四', 'lisi.example.com', '010-62345678', '2023/02/20', '2,500.00'],
['', 'wangwu@example.com', '13912345678', '2023年03月15日', '1500'],
['赵六', 'zhaoliu@example', '123-4567', '无效日期', 'ABC'],
['钱七', 'qianqi@example.com', '15012345678', '20/04/2023', '3,000.00'],
]
with open('dirty_data.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(dirty_data)
print("脏数据测试文件已创建: dirty_data.csv")
if __name__ == "__main__":
# 创建测试数据
create_dirty_data()
# 清洗数据
clean_csv_file('dirty_data.csv', 'cleaned_data.csv')
# 显示清洗结果
print("\n清洗前后对比:")
print("-" * 50)
with open('dirty_data.csv', 'r', encoding='utf-8') as f:
print("原始数据:")
for line in f:
print(" ", line.strip())
print("\n清洗后数据:")
with open('cleaned_data.csv', 'r', encoding='utf-8') as f:
for line in f:
print(" ", line.strip())
2.3 合并和拆分文件
工作中经常需要处理多个CSV文件,比如每月的数据分开存储,但分析时需要合并:
# merge_split_csv.py
import csv
import os
from glob import glob
def merge_csv_files(pattern, output_file):
"""
合并多个CSV文件
参数:
pattern: 文件匹配模式,如 'data/*.csv'
output_file: 合并后的输出文件
"""
all_data = []
headers = None
for filename in glob(pattern):
print(f"处理文件: {filename}")
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
# 检查表头是否一致
if headers is None:
headers = reader.fieldnames
elif headers != reader.fieldnames:
print(f"警告: {filename} 的表头不一致,跳过")
continue
# 读取数据
for row in reader:
all_data.append(row)
if not all_data:
print("没有找到可合并的数据")
return
# 写入合并后的文件
with open(output_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=headers)
writer.writeheader()
writer.writerows(all_data)
print(f"合并完成: 共 {len(all_data)} 行数据,保存到 {output_file}")
def split_csv_by_column(input_file, column_name, output_dir):
"""
按某一列的值拆分CSV文件
参数:
input_file: 输入文件
column_name: 按此列拆分
output_dir: 输出目录
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 按列值分组数据
groups = {}
with open(input_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames
for row in reader:
key = row.get(column_name, 'unknown')
if key not in groups:
groups[key] = []
groups[key].append(row)
# 为每个组创建文件
for key, rows in groups.items():
# 创建安全的文件名
safe_key = "".join(c for c in str(key) if c.isalnum() or c in (' ', '-', '_')).rstrip()
output_file = os.path.join(output_dir, f"{safe_key}.csv")
with open(output_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=headers)
writer.writeheader()
writer.writerows(rows)
print(f"创建文件: {output_file} ({len(rows)} 行)")
print(f"拆分完成: 共创建 {len(groups)} 个文件")
def create_monthly_data():
"""创建月度测试数据"""
months = ['2023-01', '2023-02', '2023-03']
for month in months:
filename = f"monthly_data_{month}.csv"
data = [
['date', 'product', 'sales', 'region'],
[f'{month}-10', 'Product-A', '1000', 'North'],
[f'{month}-15', 'Product-B', '1500', 'South'],
[f'{month}-20', 'Product-A', '800', 'East'],
]
with open(filename, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
print(f"创建: {filename}")
if __name__ == "__main__":
print("=== CSV文件合并与拆分演示 ===\n")
# 创建测试数据
print("1. 创建月度测试数据...")
create_monthly_data()
# 合并文件
print("\n2. 合并月度数据...")
merge_csv_files('monthly_data_*.csv', 'merged_quarterly.csv')
# 拆分文件
print("\n3. 按地区拆分数据...")
split_csv_by_column('merged_quarterly.csv', 'region', 'split_by_region')
# 显示结果
print("\n" + "=" * 50)
print("生成的文件:")
for file in ['merged_quarterly.csv', 'split_by_region']:
if os.path.exists(file):
if os.path.isdir(file):
print(f"目录: {file}/")
for f in os.listdir(file):
print(f" - {f}")
else:
print(f"文件: {file}")
三、高级技巧
3.1 处理特殊格式的CSV
不是所有的CSV文件都那么"标准"。让我们看看如何处理各种特殊情况:
# special_csv_formats.py
import csv
def read_excel_csv():
"""
处理从Excel导出的CSV
Excel导出的CSV常有BOM和特殊编码
"""
files_to_try = [
('utf-8-sig', 'excel_utf8_bom.csv'), # UTF-8 with BOM
('gbk', 'excel_gbk.csv'), # GBK编码(中文Windows)
('utf-8', 'excel_utf8.csv'), # 普通UTF-8
]
for encoding, filename in files_to_try:
try:
with open(filename, 'r', encoding=encoding) as file:
reader = csv.reader(file)
data = list(reader)
print(f"成功读取 {filename} (编码: {encoding})")
print(f" 表头: {data[0]}")
print(f" 行数: {len(data)}")
break
except (UnicodeDecodeError, FileNotFoundError):
continue
def handle_custom_delimiters():
"""处理自定义分隔符"""
# 用分号分隔的文件
with open('european_format.csv', 'w', encoding='utf-8') as f:
f.write("name;age;city\n")
f.write("张三;30;北京\n")
f.write("李四;25;上海\n")
# 读取时指定分隔符
with open('european_format.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file, delimiter=';')
for row in reader:
print(f"分隔符';': {row}")
# TSV文件(制表符分隔)
with open('data.tsv', 'w', encoding='utf-8') as f:
f.write("name\tage\tcity\n")
f.write("张三\t30\t北京\n")
f.write("李四\t25\t上海\n")
with open('data.tsv', 'r', encoding='utf-8') as file:
reader = csv.reader(file, delimiter='\t')
for row in reader:
print(f"分隔符'\\t': {row}")
def handle_quoted_data():
"""处理带引号的数据"""
# 创建包含逗号和引号的数据
tricky_data = [
['name', 'description', 'price'],
['Apple', 'Red, delicious apple', '5.8'],
['Banana', 'Yellow "sweet" banana', '3.2'],
['Orange Juice', 'Fresh, 100% pure orange "juice"', '12.5'],
]
with open('quoted_data.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file, quoting=csv.QUOTE_ALL) # 所有字段都加引号
writer.writerows(tricky_data)
print("\n带引号的数据文件已创建")
# 读取时处理引号
with open('quoted_data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for row in reader:
print(f"原始行: {row}")
def handle_large_files():
"""处理大文件的内存友好方式"""
print("\n大文件处理策略:")
print("1. 流式处理,一次一行,不加载到内存")
print("2. 分批处理,特别是需要排序或聚合时")
print("3. 使用生成器避免内存累积")
def process_large_file(filename, chunk_size=1000):
"""分批处理大文件"""
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
chunk = []
for i, row in enumerate(reader, 1):
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk: # 最后一批
yield chunk
# 模拟处理
print("\n模拟处理大文件(分块):")
for chunk_num, chunk in enumerate(process_large_file('large_data.csv', chunk_size=2), 1):
print(f" 处理第{chunk_num}块: {len(chunk)} 行")
# 这里可以处理每个数据块
# 比如写入数据库、进行聚合计算等
if __name__ == "__main__":
print("=== 特殊CSV格式处理 ===\n")
# 处理自定义分隔符
print("1. 自定义分隔符处理:")
handle_custom_delimiters()
# 处理带引号的数据
print("\n2. 带引号数据处理:")
handle_quoted_data()
# 大文件处理
print("\n3. 大文件处理策略:")
handle_large_files()
3.2 性能优化技巧
# csv_performance.py
import csv
import time
import random
from memory_profiler import profile
def create_large_csv(filename, num_rows=100000):
"""创建大型测试CSV文件"""
print(f"创建大型测试文件: {filename} ({num_rows} 行)")
with open(filename, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
# 写入表头
writer.writerow(['id', 'name', 'value', 'category', 'timestamp'])
# 写入数据
for i in range(num_rows):
writer.writerow([
i + 1,
f'Item-{random.randint(1, 1000)}',
random.uniform(1, 1000),
random.choice(['A', 'B', 'C', 'D']),
f'2023-{random.randint(1,12):02d}-{random.randint(1,28):02d}'
])
print("文件创建完成")
def naive_processing(filename):
"""原始方法:一次性加载所有数据"""
print("\n方法1: 一次性加载所有数据")
start = time.time()
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
all_data = list(reader) # 这里会加载所有数据到内存
# 处理数据
total = 0
for row in all_data:
total += float(row['value'])
elapsed = time.time() - start
print(f" 处理了 {len(all_data)} 行数据")
print(f" 总和: {total:.2f}")
print(f" 耗时: {elapsed:.2f} 秒")
return elapsed
def stream_processing(filename):
"""流式处理:一次处理一行"""
print("\n方法2: 流式处理(一次一行)")
start = time.time()
total = 0
count = 0
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
total += float(row['value'])
count += 1
elapsed = time.time() - start
print(f" 处理了 {count} 行数据")
print(f" 总和: {total:.2f}")
print(f" 耗时: {elapsed:.2f} 秒")
return elapsed
def batch_processing(filename, batch_size=10000):
"""批量处理:分批处理数据"""
print(f"\n方法3: 批量处理(每批 {batch_size} 行)")
start = time.time()
total = 0
count = 0
batch = []
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
batch.append(float(row['value']))
count += 1
if len(batch) >= batch_size:
# 处理一个批次
total += sum(batch)
batch = [] # 清空批次
# 处理最后一批
if batch:
total += sum(batch)
elapsed = time.time() - start
print(f" 处理了 {count} 行数据")
print(f" 总和: {total:.2f}")
print(f" 耗时: {elapsed:.2f} 秒")
return elapsed
def optimized_writing(filename, num_rows=10000):
"""优化写入性能"""
print(f"\n写入性能优化 ({num_rows} 行):")
# 方法1: 逐行写入
start = time.time()
with open('slow_write.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow(['id', 'value'])
for i in range(num_rows):
writer.writerow([i, i * 1.5]) # 每次调用writerow
elapsed1 = time.time() - start
print(f" 逐行写入: {elapsed1:.2f} 秒")
# 方法2: 批量写入
start = time.time()
with open('fast_write.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow(['id', 'value'])
# 准备所有数据
all_data = [[i, i * 1.5] for i in range(num_rows)]
writer.writerows(all_data) # 一次写入所有行
elapsed2 = time.time() - start
print(f" 批量写入: {elapsed2:.2f} 秒")
print(f" 速度提升: {elapsed1/elapsed2:.1f} 倍")
if __name__ == "__main__":
print("=== CSV性能优化技巧 ===\n")
# 创建测试数据
test_file = 'large_test_data.csv'
create_large_csv(test_file, num_rows=50000)
# 测试不同处理方法的性能
times = []
times.append(('一次性加载', naive_processing(test_file)))
times.append(('流式处理', stream_processing(test_file)))
times.append(('批量处理', batch_processing(test_file)))
# 写入性能测试
optimized_writing('test_write.csv', num_rows=10000)
# 总结
print("\n" + "=" * 50)
print("性能总结:")
for method, t in times:
print(f" {method}: {t:.2f} 秒")
3.3 实战:构建CSV处理工具
让我们构建一个实用的CSV处理工具,集成各种常用功能:
# csv_toolkit.py
"""
CSV处理工具箱
一个实用的命令行工具,集成各种CSV处理功能
"""
import csv
import argparse
import sys
import os
from pathlib import Path
class CSVToolkit:
"""CSV处理工具箱"""
def __init__(self):
self.commands = {
'view': self.view_csv,
'head': self.head_csv,
'tail': self.tail_csv,
'stats': self.csv_stats,
'filter': self.filter_csv,
'select': self.select_columns,
'sample': self.sample_csv,
}
def run(self):
"""运行工具箱"""
parser = argparse.ArgumentParser(description='CSV处理工具箱')
parser.add_argument('command', choices=self.commands.keys(),
help='要执行的操作')
parser.add_argument('file', help='CSV文件路径')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('-n', '--lines', type=int, default=10,
help='显示的行数(用于head/tail)')
parser.add_argument('-c', '--columns', help='选择的列,用逗号分隔')
parser.add_argument('-f', '--filter', help='过滤条件,格式: 列名=值')
args = parser.parse_args()
# 检查文件是否存在
if not os.path.exists(args.file):
print(f"错误: 文件 '{args.file}' 不存在")
sys.exit(1)
# 执行命令
self.commands[args.command](args)
def view_csv(self, args):
"""查看CSV文件内容"""
with open(args.file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for i, row in enumerate(reader):
# 美化输出
formatted = ' | '.join(f'{cell:<20}' for cell in row)
print(formatted)
# 显示表头分隔线
if i == 0:
print('-' * len(formatted))
def head_csv(self, args):
"""显示文件前n行"""
with open(args.file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for i, row in enumerate(reader):
if i >= args.lines + 1: # +1 为了包含表头
break
print(', '.join(row))
def tail_csv(self, args):
"""显示文件后n行"""
with open(args.file, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 显示最后n行
for line in lines[-args.lines:]:
print(line.rstrip())
def csv_stats(self, args):
"""显示CSV文件统计信息"""
with open(args.file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
data = list(reader)
if not data:
print("文件为空")
return
headers = data[0]
rows = data[1:]
print(f"文件: {args.file}")
print(f"行数: {len(rows)} 行(不含表头)")
print(f"列数: {len(headers)} 列")
print(f"列名: {', '.join(headers)}")
# 每列的数据类型推测
if rows:
print("\n列信息:")
for i, header in enumerate(headers):
sample = rows[0][i] if len(rows[0]) > i else ''
print(f" {header}: 示例='{sample}'")
def filter_csv(self, args):
"""过滤CSV文件"""
if not args.filter:
print("错误: 需要指定过滤条件(-f 列名=值)")
return
# 解析过滤条件
col_name, value = args.filter.split('=', 1)
with open(args.file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
headers = reader.fieldnames
# 收集匹配的行
matching_rows = []
for row in reader:
if row.get(col_name) == value:
matching_rows.append(row)
# 输出结果
if args.output:
with open(args.output, 'w', encoding='utf-8', newline='') as outfile:
writer = csv.DictWriter(outfile, fieldnames=headers)
writer.writeheader()
writer.writerows(matching_rows)
print(f"过滤结果已保存到: {args.output} ({len(matching_rows)} 行)")
else:
# 打印到控制台
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(matching_rows)
def select_columns(self, args):
"""选择特定列"""
if not args.columns:
print("错误: 需要指定要选择的列(-c 列1,列2,...)")
return
selected = [col.strip() for col in args.columns.split(',')]
with open(args.file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
# 检查列是否存在
for col in selected:
if col not in reader.fieldnames:
print(f"警告: 列 '{col}' 不存在")
# 只保留选中的列
filtered_rows = []
for row in reader:
filtered_row = {col: row[col] for col in selected if col in row}
filtered_rows.append(filtered_row)
# 输出结果
if args.output:
with open(args.output, 'w', encoding='utf-8', newline='') as outfile:
writer = csv.DictWriter(outfile, fieldnames=selected)
writer.writeheader()
writer.writerows(filtered_rows)
print(f"结果已保存到: {args.output}")
else:
writer = csv.DictWriter(sys.stdout, fieldnames=selected)
writer.writeheader()
writer.writerows(filtered_rows)
def sample_csv(self, args):
"""随机抽样"""
import random
with open(args.file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
data = list(reader)
if len(data) <= 1:
print("文件数据不足")
return
headers = data[0]
rows = data[1:]
# 随机抽样
sample_size = min(args.lines, len(rows))
sampled_rows = random.sample(rows, sample_size)
# 输出结果
output_data = [headers] + sampled_rows
if args.output:
with open(args.output, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(output_data)
print(f"抽样结果已保存到: {args.output} ({sample_size} 行)")
else:
for row in output_data:
print(', '.join(row))
def create_example_csv():
"""创建示例CSV文件"""
data = [
['id', 'name', 'age', 'city', 'salary'],
['1', '张三', '30', '北京', '50000'],
['2', '李四', '25', '上海', '45000'],
['3', '王五', '35', '广州', '60000'],
['4', '赵六', '28', '深圳', '55000'],
['5', '钱七', '32', '杭州', '52000'],
['6', '孙八', '29', '南京', '48000'],
['7', '周九', '31', '成都', '51000'],
['8', '吴十', '27', '武汉', '47000'],
]
with open('employees.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
print("示例文件已创建: employees.csv")
def demo_toolkit():
"""演示工具箱功能"""
print("=== CSV工具箱功能演示 ===\n")
# 创建示例文件
create_example_csv()
print("\n1. 查看文件内容:")
print("-" * 50)
toolkit = CSVToolkit()
# 模拟命令行参数
class Args:
pass
args = Args()
args.file = 'employees.csv'
args.lines = 5
args.columns = 'name,city,salary'
args.filter = 'city=北京'
args.output = None
print("\n2. 显示前3行:")
print("-" * 50)
args.lines = 3
toolkit.head_csv(args)
print("\n3. 文件统计:")
print("-" * 50)
toolkit.csv_stats(args)
print("\n4. 选择特定列:")
print("-" * 50)
toolkit.select_columns(args)
print("\n5. 过滤数据:")
print("-" * 50)
toolkit.filter_csv(args)
print("\n6. 随机抽样:")
print("-" * 50)
args.lines = 2
toolkit.sample_csv(args)
if __name__ == "__main__":
if len(sys.argv) > 1:
# 命令行模式
toolkit = CSVToolkit()
toolkit.run()
else:
# 演示模式
demo_toolkit()
print("\n" + "=" * 50)
print("使用说明:")
print(" 命令行使用: python csv_toolkit.py <命令> <文件> [选项]")
print("\n可用命令:")
for cmd in CSVToolkit().commands:
print(f" {cmd}")
四、注意事项
4.1 使用限制
- 不适合嵌套数据:CSV是扁平结构,不适合存储复杂嵌套数据
- 类型信息丢失:所有值都是字符串,需要手动转换类型
- 无模式验证:需要自己验证数据完整性和一致性
- 性能限制:对于数GB的文件,建议使用数据库或专业工具
- 并发访问:CSV文件不适合多进程同时读写
4.2 常见问题
Q: 读取CSV时遇到编码错误怎么办?
A: 按顺序尝试这些编码:utf-8-sig→ gbk→ utf-8→ latin-1
encodings = ['utf-8-sig', 'gbk', 'utf-8', 'latin-1']
for enc in encodings:
try:
with open('file.csv', 'r', encoding=enc) as f:
content = f.read()
print(f"使用编码: {enc}")
break
except UnicodeDecodeError:
continue
Q: 数据中有逗号,导致列错位怎么办?
A: 使用csv.reader而不是手动分割,它会正确处理引号内的逗号。
Q: 文件太大,内存不够怎么办?
A: 使用流式处理,一次处理一行:
with open('large.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader: # 一次只加载一行到内存
process(row)
Q: 如何跳过CSV文件中的空行?
A: csv.reader会自动跳过完全空白的行,但对于只有空格的行,需要在代码中处理。
Q: 处理CSV时性能很慢怎么办?
A: 1. 使用csv.writerows()批量写入
- 考虑使用
pandas处理复杂操作 - 对于超大数据,考虑使用数据库
4.3 替代方案
- pandas:适合复杂的数据分析和处理
- 数据库:适合大量数据和复杂查询
- Excel/Google Sheets:适合需要手动查看和编辑的场景
- JSON/YAML:适合嵌套和结构化数据
- Parquet/Feather:适合大数据和机器学习场景
何时选择替代方案:
- 需要复杂的数据分析和处理 → pandas
- 数据量极大,需要高效查询 → 数据库
- 需要多人协作和手动编辑 → Excel/Google Sheets
- 数据结构复杂,有嵌套 → JSON/YAML
- 大数据和机器学习场景 → Parquet/Feather
五、总结
通过本文的学习,你应该已经掌握了:
- ✅ csv模块基础:如何读写各种格式的CSV文件
- ✅ 数据处理技巧:清洗、转换、合并、拆分
- ✅ 性能优化:流式处理、批量操作
- ✅ 实战工具:构建自己的CSV处理工具箱
- ✅ 最佳实践:编码处理、错误处理、性能考虑
csv模块的核心价值:
- 简单直接:处理表格数据最直接的方式
- 通用性好:几乎所有系统都支持CSV导入导出
- 零成本:Python自带,无需额外依赖
- 透明可控:纯文本格式,出现问题容易排查
- 生态丰富:与其他工具链完美集成
以上就是Python使用csv模块进行文件读写的操作详解的详细内容,更多关于Python csv模块文件读写的资料请关注脚本之家其它相关文章!
相关文章
keras实现theano和tensorflow训练的模型相互转换
这篇文章主要介绍了keras实现theano和tensorflow训练的模型相互转换,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2020-06-06
Python subprocess模块功能与常见用法实例详解
这篇文章主要介绍了Python subprocess模块功能与常见用法,结合实例形式详细分析了subprocess模块功能、常用函数相关使用技巧,需要的朋友可以参考下2018-06-06


最新评论