Pandas高效读取CSV、Excel和SQL数据库的数据

 更新时间:2026年05月22日 09:05:43   作者:小庄-Python办公  
这篇文章主要为大家详细介绍了使用Pandas高效读取CSV、Excel和SQL数据库的数据的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下

本节学习目标

完成本节学习后,你将能够:

  1. 使用 pandas 从 CSV、TSV 等文本文件读取数据
  2. 读取 Excel 文件并处理多工作表
  3. 解决常见的编码问题和数据格式异常
  4. 使用分块读取处理超过内存容量的大文件
  5. 连接 SQLite 和 MySQL 数据库并执行查询
  6. 掌握数据读取的性能优化技巧

为什么学这个

在前面的课程中,我们用 NumPy 生成了各种模拟数据。但现实世界中,数据不会自动出现在你的代码里——它们散落在各处:

  • 业务系统导出的 CSV 文件
  • 同事发来的 Excel 表格
  • 公司数据库中的 SQL 表
  • 网页上的 JSON 数据
  • 传感器记录的 日志文件

数据分析的第一步,就是把这些分散的数据"搬"到你的分析环境中。如果这一步做不好,后面的一切分析都是空中楼阁。

比喻:如果把数据分析比作做菜,那么数据载入就是"采购食材"。食材买错了(格式不对)、买少了(数据不全)、或者买太多搬不动(内存溢出),这道菜就做不成了。

好消息是,pandas 提供了统一且强大的数据读取接口。无论你面对什么格式的数据,几乎都能用 pd.read_xxx() 一行代码搞定。但实际情况往往没那么简单——编码错误、格式异常、文件过大……这些问题才是真正考验你功力的地方。

核心知识点讲解

pandas 简介——数据分析的"瑞士军刀"

pandas 是基于 NumPy 构建的数据分析库,它的名字来源于 Panel Data(面板数据)和 Python Data Science。

pandas 的两个核心数据结构:

  • Series:一维带标签的数组(可以理解为"增强版的一维 NumPy 数组")
  • DataFrame:二维表格(可以理解为"增强版的 Excel 表格")
import pandas as pd
import numpy as np

# Series 示例
s = pd.Series([85, 92, 78, 96],
              index=['张三', '李四', '王五', '赵六'],
              name='数学成绩')
print("Series:")
print(s)

# DataFrame 示例
df = pd.DataFrame({
    '姓名': ['张三', '李四', '王五', '赵六'],
    '数学': [85, 92, 78, 96],
    '英语': [88, 85, 92, 79],
    '语文': [76, 80, 85, 88]
})
print("\nDataFrame:")
print(df)

读取 CSV 文件——最常见数据格式

基本读取

CSV(Comma-Separated Values,逗号分隔值)是最常见的数据交换格式。

import pandas as pd

# 首先,我们创建一个 CSV 文件用于演示
data = {
    '订单ID': range(1, 11),
    '商品': ['手机', '电脑', '平板', '耳机', '键盘',
            '鼠标', '显示器', '音箱', '充电器', '数据线'],
    '价格': [2999, 5999, 1999, 199, 399,
            99, 1599, 499, 79, 29],
    '数量': [2, 1, 3, 5, 2,
            4, 1, 2, 10, 15],
    '日期': pd.date_range('2024-01-01', periods=10)
}
df = pd.DataFrame(data)
df.to_csv('sample_orders.csv', index=False, encoding='utf-8-sig')
print("示例 CSV 文件已创建")

# 读取 CSV 文件
df_read = pd.read_csv('sample_orders.csv')
print("\n读取的 DataFrame:")
print(df_read)
print(f"\n数据形状: {df_read.shape}")
print(f"数据类型:\n{df_read.dtypes}")

read_csv 常用参数详解

# 演示各种参数的用法
# 假设我们有一个稍微复杂一点的 CSV 文件
complex_csv = """ID,Name,Score,Grade,Comment
001,张三,85.5,A,
002,李四,92.0,B,优秀
003,王五,,C,需要努力
004,赵六,78.5,A,进步很大
005,,88.0,B,继续加油"""

# 先写入文件
with open('complex_data.csv', 'w', encoding='utf-8') as f:
    f.write(complex_csv)

# 1. 基本读取
print("1. 基本读取:")
print(pd.read_csv('complex_data.csv'))

# 2. 指定列名
print("\n2. 指定列名:")
print(pd.read_csv('complex_data.csv',
                  names=['编号', '姓名', '分数', '等级', '评语'],
                  header=0))

# 3. 只读取指定列
print("\n3. 只读姓名列和分数列:")
print(pd.read_csv('complex_data.csv', usecols=['Name', 'Score']))

# 4. 处理缺失值
print("\n4. 默认缺失值处理:")
df = pd.read_csv('complex_data.csv')
print(df.isna().sum())  # 统计每列缺失值数量

# 5. 指定缺失值标记
print("\n5. 自定义缺失值标记:")
df = pd.read_csv('complex_data.csv', na_values=['', ' '])
print(df.isna().sum())

# 6. 解析日期列
df_with_date = pd.read_csv('sample_orders.csv', parse_dates=['日期'])
print("\n6. 日期列解析:")
print(df_with_date['日期'].dtype)  # 应该是 datetime64

# 7. 指定数据类型
print("\n7. 指定数据类型:")
df_typed = pd.read_csv('sample_orders.csv',
                       dtype={'订单ID': str, '商品': 'category'})
print(df_typed.dtypes)

# 8. 跳过行
print("\n8. 跳过前2行:")
print(pd.read_csv('complex_data.csv', skiprows=2))

# 9. 读取指定行数(用于预览大文件)
print("\n9. 只读前3行:")
print(pd.read_csv('sample_orders.csv', nrows=3))

# 10. 设置索引列
print("\n10. 设置索引列为订单ID:")
df_idx = pd.read_csv('sample_orders.csv', index_col='订单ID')
print(df_idx)

TSV 和其他分隔符

# TSV(制表符分隔)
tsv_data = "ID\tName\tScore\n1\t张三\t85\n2\t李四\t92"
with open('data.tsv', 'w', encoding='utf-8') as f:
    f.write(tsv_data)

print("TSV 文件读取:")
df_tsv = pd.read_csv('data.tsv', sep='\t')  # 或直接用 pd.read_table()
print(df_tsv)

# 其他分隔符示例
pipe_data = "ID|Name|Score\n1|张三|85\n2|李四|92"
with open('data_pipe.txt', 'w', encoding='utf-8') as f:
    f.write(pipe_data)

print("\n管道符分隔读取:")
df_pipe = pd.read_csv('data_pipe.txt', sep='|')
print(df_pipe)

读取 Excel 文件——职场中最常见的格式

基本读取

import pandas as pd

# 创建示例 Excel 文件
df1 = pd.DataFrame({
    '姓名': ['张三', '李四', '王五'],
    '部门': ['技术部', '市场部', '财务部'],
    '工资': [15000, 12000, 13000]
})
df2 = pd.DataFrame({
    '姓名': ['赵六', '钱七'],
    '部门': ['技术部', '人事部'],
    '工资': [16000, 11000]
})

# 写入 Excel(需要 openpyxl 库)
with pd.ExcelWriter('sample_data.xlsx', engine='openpyxl') as writer:
    df1.to_excel(writer, sheet_name='员工信息', index=False)
    df2.to_excel(writer, sheet_name='新员工', index=False)

print("Excel 文件已创建")

# 读取 Excel
# 注意:需要安装 openpyxl: pip install openpyxl
print("\n读取第一个工作表:")
df_excel = pd.read_excel('sample_data.xlsx', sheet_name='员工信息')
print(df_excel)

# 查看所有工作表名
print("\n所有工作表:")
xls = pd.ExcelFile('sample_data.xlsx')
print(xls.sheet_names)

# 读取所有工作表
print("\n读取所有工作表:")
all_sheets = pd.read_excel('sample_data.xlsx', sheet_name=None)
for name, df in all_sheets.items():
    print(f"\n--- {name} ---")
    print(df)

Excel 读取的实用技巧

# 1. 跳过表头行
# 有些 Excel 文件前两行是标题和说明
with pd.ExcelWriter('demo_skip.xlsx', engine='openpyxl') as writer:
    pd.DataFrame({'A': [1, 2, 3]}).to_excel(writer, sheet_name='data')

# 2. 指定列的数据类型
df = pd.read_excel('sample_data.xlsx', sheet_name='员工信息',
                   dtype={'工资': float})

# 3. 读取指定范围(需要引擎支持)
# df = pd.read_excel('file.xlsx', usecols='A:C', nrows=100)

# 4. 处理 Excel 中的空行
df = pd.read_excel('sample_data.xlsx', sheet_name='员工信息',
                   skip_blank_lines=True)

重要提示:读取 Excel 文件需要安装 openpyxl(用于 .xlsx)或 xlrd(用于旧版 .xls)库。安装方法:pip install openpyxl

编码问题处理——数据载入的"头号杀手"

什么是编码问题?

当你尝试读取一个 CSV 文件时,可能会看到这样的错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 0

这是因为文件的编码方式与读取时指定的编码不一致。

常见编码方式

编码说明适用场景
UTF-8国际通用编码,支持所有语言现代系统默认
GBK / GB2312中文编码Windows 中文系统
GB18030扩展中文编码生僻字支持
Latin-1西欧编码欧洲语言
ASCII基础编码纯英文

如何判断文件编码

# 方法1:使用 chardet 库自动检测
# pip install chardet
import chardet

with open('sample_orders.csv', 'rb') as f:
    raw_data = f.read(10000)  # 读取前10000字节
    result = chardet.detect(raw_data)
    print(f"检测到的编码: {result['encoding']}")
    print(f"置信度: {result['confidence']:.2%}")

# 方法2:用 pandas 的 errors 参数处理
# ignore:忽略无法解码的字符
# replace:用 ? 替换无法解码的字符
# surrogateescape:特殊处理(Python 3.1+)

# 方法3:尝试多种编码
def read_with_encoding(filepath, encodings=['utf-8', 'gbk', 'gb18030', 'latin-1']):
    for enc in encodings:
        try:
            df = pd.read_csv(filepath, encoding=enc)
            print(f"成功使用 {enc} 编码读取")
            return df
        except UnicodeDecodeError:
            continue
    raise ValueError("无法用任何编码读取文件")

df = read_with_encoding('sample_orders.csv')

实际解决方案

# Windows 系统导出的 CSV 通常使用 GBK 编码
# 解决方案1:指定编码
df = pd.read_csv('data.csv', encoding='gbk')

# 解决方案2:使用 utf-8-sig(处理带 BOM 的 UTF-8)
# BOM (Byte Order Mark) 是 UTF-8 文件开头的特殊标记
df = pd.read_csv('data.csv', encoding='utf-8-sig')

# 解决方案3:用 Notepad++ 或 VS Code 转换文件编码
# 打开文件 -> 转换为 UTF-8 -> 保存

最佳实践:在团队协作中,统一规定所有数据文件使用 UTF-8 编码,可以省去大量麻烦。

大数据分块读取——处理超内存文件

什么时候需要分块读取?

当你的 CSV 文件有 10GB,而你的电脑只有 8GB 内存时,直接 pd.read_csv() 会失败(MemoryError)。

分块读取的策略是:每次只读取一小部分,处理完后释放内存,再读取下一部分。

import pandas as pd
import numpy as np

# 先创建一个大文件用于演示(模拟 100 万行数据)
np.random.seed(42)
n_rows = 1_000_000
large_df = pd.DataFrame({
    'id': range(n_rows),
    'value1': np.random.normal(100, 20, n_rows),
    'value2': np.random.normal(50, 10, n_rows),
    'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
    'date': pd.date_range('2020-01-01', periods=n_rows, freq='min')
})
large_df.to_csv('large_data.csv', index=False)
print(f"已创建包含 {n_rows} 行数据的文件")

方式1:使用 chunksize 参数

# 每次读取 10 万行
chunk_size = 100_000
total_rows = 0
total_value1 = 0
total_value2 = 0

for chunk in pd.read_csv('large_data.csv', chunksize=chunk_size):
    total_rows += len(chunk)
    total_value1 += chunk['value1'].sum()
    total_value2 += chunk['value2'].sum()
    print(f"已处理 {total_rows} 行...")

print(f"\n总行数: {total_rows}")
print(f"value1 总和: {total_value1:,.2f}")
print(f"value2 总和: {total_value2:,.2f}")

方式2:分块聚合(计算统计量)

# 计算大文件的整体统计信息
chunk_size = 100_000
stats = {
    'count': 0,
    'sum_v1': 0, 'sum_v2': 0,
    'sum_sq_v1': 0, 'sum_sq_v2': 0,
    'min_v1': float('inf'), 'max_v1': float('-inf'),
    'min_v2': float('inf'), 'max_v2': float('-inf')
}

for chunk in pd.read_csv('large_data.csv',
                         usecols=['value1', 'value2'],  # 只读需要的列
                         chunksize=chunk_size):

    stats['count'] += len(chunk)
    stats['sum_v1'] += chunk['value1'].sum()
    stats['sum_v2'] += chunk['value2'].sum()
    stats['sum_sq_v1'] += (chunk['value1'] ** 2).sum()
    stats['sum_sq_v2'] += (chunk['value2'] ** 2).sum()
    stats['min_v1'] = min(stats['min_v1'], chunk['value1'].min())
    stats['max_v1'] = max(stats['max_v1'], chunk['value1'].max())
    stats['min_v2'] = min(stats['min_v2'], chunk['value2'].min())
    stats['max_v2'] = max(stats['max_v2'], chunk['value2'].max())

# 计算均值和标准差
mean_v1 = stats['sum_v1'] / stats['count']
mean_v2 = stats['sum_v2'] / stats['count']
std_v1 = (stats['sum_sq_v1'] / stats['count'] - mean_v1 ** 2) ** 0.5
std_v2 = (stats['sum_sq_v2'] / stats['count'] - mean_v2 ** 2) ** 0.5

print("大文件统计信息:")
print(f"  value1: 均值={mean_v1:.2f}, 标准差={std_v1:.2f}, "
      f"范围=[{stats['min_v1']:.2f}, {stats['max_v1']:.2f}]")
print(f"  value2: 均值={mean_v2:.2f}, 标准差={std_v2:.2f}, "
      f"范围=[{stats['min_v2']:.2f}, {stats['max_v2']:.2f}]")

方式3:迭代器模式(更灵活的控制)

# 使用 iterator 参数,手动控制迭代
reader = pd.read_csv('large_data.csv', chunksize=50000, iterator=True)

# 可以跳过某些 chunk
for i, chunk in enumerate(reader):
    if i == 0:
        print(f"第 {i} 块,形状: {chunk.shape}")
        # 保存第一块作为参考
        sample = chunk.head()
    # 只处理 category 为 A 的数据
    filtered = chunk[chunk['category'] == 'A']
    if len(filtered) > 0:
        pass  # 处理过滤后的数据

分块读取的注意事项

  1. 只读需要的列:使用 usecols 参数,减少内存占用
  2. 指定数据类型:使用 dtype 参数,避免 pandas 自动推断导致的内存浪费
  3. 跳过不需要的行:使用 skiprows 参数
# 优化版分块读取
optimized_chunks = pd.read_csv(
    'large_data.csv',
    chunksize=100_000,
    usecols=['id', 'value1', 'category'],  # 只读3列
    dtype={'id': 'int32', 'value1': 'float32', 'category': 'category'}  # 节省内存
)

for chunk in optimized_chunks:
    # 处理每个块
    pass

连接数据库——从 SQL 读取数据

SQLite——内置数据库(无需安装服务器)

import pandas as pd
import sqlite3

# 创建 SQLite 数据库
conn = sqlite3.connect('sample_database.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
    CREATE TABLE IF NOT EXISTS employees (
        id INTEGER PRIMARY KEY,
        name TEXT,
        department TEXT,
        salary REAL,
        hire_date TEXT
    )
''')

# 插入数据
employees = [
    (1, '张三', '技术部', 15000, '2020-03-15'),
    (2, '李四', '市场部', 12000, '2021-06-01'),
    (3, '王五', '财务部', 13000, '2019-11-20'),
    (4, '赵六', '技术部', 18000, '2018-08-10'),
    (5, '钱七', '人事部', 11000, '2022-01-05'),
]
cursor.executemany('INSERT OR REPLACE INTO employees VALUES (?, ?, ?, ?, ?)',
                   employees)
conn.commit()

# 方式1:直接读取整个表
print("1. 读取整个表:")
df_sqlite = pd.read_sql('SELECT * FROM employees', conn)
print(df_sqlite)

# 方式2:带条件的查询
print("\n2. 查询技术部员工:")
df_tech = pd.read_sql(
    "SELECT name, salary FROM employees WHERE department = '技术部'",
    conn
)
print(df_tech)

# 方式3:使用 read_sql_query(等价于 read_sql)
print("\n3. 平均工资查询:")
df_avg = pd.read_sql_query(
    "SELECT department, AVG(salary) as avg_salary, COUNT(*) as count "
    "FROM employees GROUP BY department",
    conn
)
print(df_avg)

# 方式4:使用 read_sql_table(读取整张表,SQLite 不支持)
# df = pd.read_sql_table('employees', conn)  # 需要 SQLAlchemy

# 方式5:DataFrame 直接写入数据库
df_new = pd.DataFrame({
    'id': [6, 7],
    'name': ['孙八', '周九'],
    'department': ['技术部', '市场部'],
    'salary': [16000, 13500],
    'hire_date': ['2023-05-10', '2023-08-20']
})
df_new.to_sql('employees', conn, if_exists='append', index=False)
print("\n4. 新增数据后:")
print(pd.read_sql('SELECT * FROM employees', conn))

# 关闭连接
conn.close()

MySQL 连接(需要安装 pymysql)

# 注意:以下代码需要 MySQL 服务器运行
# 安装: pip install pymysql

# 连接 MySQL
# conn_mysql = create_engine('mysql+pymysql://username:password@localhost:3306/database_name')

# 读取数据
# df = pd.read_sql('SELECT * FROM table_name', conn_mysql)

# 示例代码(注释状态,实际运行需要 MySQL)
"""
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://root:password@localhost:3306/mydb')

# 读取数据
df = pd.read_sql('SELECT * FROM users', engine)

# 写入数据
df.to_sql('users_backup', engine, if_exists='replace', index=False)

# 复杂查询
df = pd.read_sql('''
    SELECT d.department_name,
           AVG(e.salary) as avg_salary,
           COUNT(e.id) as emp_count
    FROM employees e
    JOIN departments d ON e.dept_id = d.id
    GROUP BY d.department_name
    ORDER BY avg_salary DESC
''', engine)
"""

提示:如果你使用 PostgreSQL,安装 psycopg2 包,连接字符串改为 postgresql+psycopg2://...

其他数据格式读取

JSON

import json

# 创建示例 JSON
json_data = [
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"},
    {"name": "王五", "age": 28, "city": "广州"}
]
with open('sample.json', 'w', encoding='utf-8') as f:
    json.dump(json_data, f, ensure_ascii=False)

# 读取 JSON
df_json = pd.read_json('sample.json')
print("JSON 读取:")
print(df_json)

# 复杂 JSON(嵌套结构)
complex_json = '''
{
    "employees": {
        "张三": {"age": 25, "dept": "技术部"},
        "李四": {"age": 30, "dept": "市场部"}
    }
}
'''
with open('complex.json', 'w', encoding='utf-8') as f:
    f.write(complex_json)

# orient 参数处理不同 JSON 格式
df_complex = pd.read_json('complex.json', orient='index')
print("\n复杂 JSON 读取:")
print(df_complex)

HTML 表格

# 读取网页中的表格
# url = 'https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)'
# tables = pd.read_html(url)
# print(f"页面中找到 {len(tables)} 个表格")
# print(tables[0].head())  # 第一个表格

# 这里用本地 HTML 演示
html_table = """
<table>
    <tr><th>姓名</th><th>成绩</th></tr>
    <tr><td>张三</td><td>85</td></tr>
    <tr><td>李四</td><td>92</td></tr>
    <tr><td>王五</td><td>78</td></tr>
</table>
"""
with open('sample.html', 'w', encoding='utf-8') as f:
    f.write(html_table)

df_html = pd.read_html('sample.html')[0]
print("HTML 表格读取:")
print(df_html)

代码示例:综合实战

实战1:多源数据整合分析

import pandas as pd
import numpy as np
import sqlite3

# 场景:整合来自不同数据源的信息

# 数据源1:CSV 文件(员工基本信息)
emp_csv = pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 5],
    'name': ['张三', '李四', '王五', '赵六', '钱七'],
    'gender': ['男', '女', '男', '男', '女'],
    'birth_year': [1990, 1988, 1992, 1985, 1993]
})
emp_csv.to_csv('employees_basic.csv', index=False, encoding='utf-8-sig')

# 数据源2:Excel 文件(绩效数据)
perf_excel = pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 5],
    'q1_score': [85, 92, 78, 96, 88],
    'q2_score': [88, 90, 82, 94, 85],
    'q3_score': [90, 88, 85, 92, 90]
})
with pd.ExcelWriter('employees_perf.xlsx', engine='openpyxl') as writer:
    perf_excel.to_excel(writer, sheet_name='绩效', index=False)

# 数据源3:SQLite 数据库(薪资数据)
conn = sqlite3.connect('salary.db')
pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 5],
    'base_salary': [15000, 12000, 13000, 18000, 11000],
    'bonus': [3000, 5000, 2000, 8000, 2500]
}).to_sql('salary', conn, if_exists='replace', index=False)

# 整合分析
print("=" * 50)
print("多源数据整合分析")
print("=" * 50)

# 读取各数据源
basic = pd.read_csv('employees_basic.csv')
perf = pd.read_excel('employees_perf.xlsx', sheet_name='绩效')
salary = pd.read_sql('SELECT * FROM salary', conn)

# 合并数据
merged = basic.merge(perf, on='emp_id').merge(salary, on='emp_id')
merged['avg_score'] = merged[['q1_score', 'q2_score', 'q3_score']].mean(axis=1)
merged['total_income'] = merged['base_salary'] + merged['bonus']

print("\n整合后的完整数据:")
print(merged.to_string())

print("\n各季度平均绩效:")
print(f"  Q1: {perf['q1_score'].mean():.1f}")
print(f"  Q2: {perf['q2_score'].mean():.1f}")
print(f"  Q3: {perf['q3_score'].mean():.1f}")

print("\n平均薪资:")
print(f"  基本薪资均值: ¥{salary['base_salary'].mean():,.0f}")
print(f"  奖金均值: ¥{salary['bonus'].mean():,.0f}")
print(f"  总收入均值: ¥{merged['total_income'].mean():,.0f}")

conn.close()

实战2:大文件处理流水线

import pandas as pd
import numpy as np

# 创建模拟日志文件(50 万行)
np.random.seed(42)
n = 500_000
log_data = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=n, freq='s'),
    'user_id': np.random.randint(1, 10000, n),
    'action': np.random.choice(['click', 'view', 'purchase', 'logout'], n,
                               p=[0.5, 0.3, 0.15, 0.05]),
    'page': np.random.choice(['home', 'product', 'cart', 'checkout', 'help'], n),
    'duration_ms': np.random.exponential(2000, n).astype(int)
})
log_data.to_csv('web_logs.csv', index=False)
print(f"已创建 {n} 行日志数据")

# 分块读取并聚合
print("\n分块处理日志数据:")

stats = {}
chunk_size = 50_000

for chunk in pd.read_csv('web_logs.csv',
                         usecols=['action', 'user_id', 'duration_ms'],
                         chunksize=chunk_size):

    # 按 action 分组统计
    action_counts = chunk['action'].value_counts()
    for action, count in action_counts.items():
        stats[action] = stats.get(action, 0) + count

    # 统计总用户数
    stats['unique_users'] = len(chunk['user_id'].unique())

    print(f"处理 {chunk_size} 行...")

print("\n统计结果:")
for action, count in stats.items():
    print(f"  {action}: {count:,}" if action != 'unique_users'
          else f"  独立用户数: {count:,}")

实战练习

练习1:CSV 读取与清洗

题目:创建一个包含脏数据的 CSV 文件,然后读取并清洗:

  • 包含空值
  • 包含重复行
  • 包含错误类型(如价格列出现了文本)

参考答案

import pandas as pd
import numpy as np

# 创建脏数据
dirty_csv = """id,name,price,quantity
1,手机,2999,2
2,电脑,5999,1
3,,1999,3
4,耳机,abc,5
5,键盘,399,2
1,手机,2999,2
6,鼠标,99,
"""
with open('dirty_data.csv', 'w', encoding='utf-8') as f:
    f.write(dirty_csv)

# 读取并清洗
df = pd.read_csv('dirty_data.csv')

print("原始数据:")
print(df)

# 1. 删除完全重复的行
df = df.drop_duplicates()

# 2. 删除 name 为空的行
df = df.dropna(subset=['name'])

# 3. 处理 price 列的文本(转换为数字,无效的变 NaN)
df['price'] = pd.to_numeric(df['price'], errors='coerce')

# 4. 填充 quantity 的缺失值为 1
df['quantity'] = df['quantity'].fillna(1).astype(int)

# 5. 删除 price 仍然为 NaN 的行
df = df.dropna(subset=['price'])

print("\n清洗后数据:")
print(df)

练习2:分块统计

题目:用分块读取的方式,计算大文件中每个 category 的平均值。

参考答案

import pandas as pd
import numpy as np
from collections import defaultdict

# 先创建大文件
np.random.seed(42)
n = 200_000
big = pd.DataFrame({
    'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
    'value': np.random.normal(50, 15, n)
})
big.to_csv('big_categories.csv', index=False)

# 分块统计
cat_stats = defaultdict(lambda: {'sum': 0, 'count': 0})

for chunk in pd.read_csv('big_categories.csv', chunksize=50000):
    for cat, group in chunk.groupby('category'):
        cat_stats[cat]['sum'] += group['value'].sum()
        cat_stats[cat]['count'] += len(group)

print("分类统计:")
for cat, stats in sorted(cat_stats.items()):
    mean = stats['sum'] / stats['count']
    print(f"  {cat}: 均值={mean:.2f}, 数量={stats['count']}")

练习3:数据库查询整合

题目:创建一个 SQLite 数据库,包含订单表和客户表,然后用 JOIN 查询每个客户的总消费金额。

参考答案

import pandas as pd
import sqlite3

conn = sqlite3.connect('shop.db')

# 创建客户表
customers = pd.DataFrame({
    'customer_id': [1, 2, 3, 4, 5],
    'name': ['张三', '李四', '王五', '赵六', '钱七'],
    'city': ['北京', '上海', '广州', '深圳', '杭州']
})
customers.to_sql('customers', conn, if_exists='replace', index=False)

# 创建订单表
orders = pd.DataFrame({
    'order_id': range(1, 11),
    'customer_id': [1, 2, 1, 3, 2, 4, 1, 5, 3, 2],
    'amount': [500, 800, 300, 1200, 600, 400, 700, 900, 350, 550]
})
orders.to_sql('orders', conn, if_exists='replace', index=False)

# JOIN 查询
result = pd.read_sql('''
    SELECT c.name, c.city,
           COUNT(o.order_id) as order_count,
           SUM(o.amount) as total_spent,
           AVG(o.amount) as avg_order
    FROM customers c
    LEFT JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id
    ORDER BY total_spent DESC
''', conn)

print("客户消费分析:")
print(result.to_string())

conn.close()

本节总结

本节我们全面学习了从各种数据源读取数据的方法。关键要点:

CSV 读取是基本功:掌握 pd.read_csv() 的各种参数——sepencodingusecolsdtypeparse_datesnrowschunksize 等。

编码问题是日常痛点:Windows 中文系统导出的文件通常用 GBK 编码。遇到问题时,尝试 encoding='gbk'encoding='utf-8-sig'。使用 chardet 库可以自动检测编码。

分块读取解决大文件:当文件超过内存容量时,使用 chunksize 参数分块处理。配合 usecolsdtype 参数可以进一步优化内存。

数据库读取很灵活pd.read_sql() 可以执行任意 SQL 查询,直接在数据库层面完成过滤和聚合,只读取需要的结果,效率最高。

性能优化三板斧

  • 只读需要的列(usecols
  • 指定数据类型(dtype),特别是 category 类型可以大幅节省内存
  • 在数据库层面完成过滤,减少数据传输量

多种格式统一接口:无论是 CSV、Excel、JSON 还是数据库,pandas 都提供了 read_xxx() 的统一接口,降低了学习成本。

实用建议:在实际工作中,先花 5 分钟检查数据文件的编码、格式和大小,再决定用哪种方式读取。这一步可以避免后面 90% 的问题。

下一节预告

恭喜!你已经完成了第一阶段的全部内容。回顾一下你掌握的技能:

  • 环境配置与 Jupyter Notebook 使用
  • NumPy 数组的创建、运算和广播
  • 高级索引、维度变换和矩阵运算
  • 向量化思维,让代码效率提升百倍
  • 从各种数据源高效读取数据

接下来,我们将进入第二阶段,开始学习 pandas 的核心功能——数据处理与分析。你将掌握:

  • DataFrame 的基本操作和行列选择
  • 数据清洗与预处理
  • 数据分组与聚合(groupby)
  • 数据合并与连接(merge/concat)

pandas 才是数据分析的主力工具,让我们继续前进!

以上就是Pandas高效读取CSV、Excel和SQL数据库的数据的详细内容,更多关于Pandas数据读取的资料请关注脚本之家其它相关文章!

相关文章

  • python 从文件夹抽取图片另存的方法

    python 从文件夹抽取图片另存的方法

    今天小编就为大家分享一篇python 从文件夹抽取图片另存的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • python基础教程之while循环

    python基础教程之while循环

    这篇文章主要给大家介绍了关于python基础教程之while循环的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用python具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-08-08
  • Django框架视图介绍与使用详解

    Django框架视图介绍与使用详解

    这篇文章主要介绍了Django框架视图介绍与使用,结合实例形式分析了Django框架视图的功能、配置、使用方法及相关操作注意事项,需要的朋友可以参考下
    2019-07-07
  • python网络爬虫基本语法详解

    python网络爬虫基本语法详解

    掌握Python网络爬虫基本语法,就是打开数据世界的钥匙,在这份指南中,我们将带你深入浅出,从零开始一步步变成抓取信息的高手,准备好探索无限可能的网络数据世界了吗?让我们一起开启这段精彩旅程吧!
    2024-03-03
  • python从ftp获取文件并下载到本地

    python从ftp获取文件并下载到本地

    这篇文章主要介绍了python从ftp获取文件并下载到本地,帮助大家更好的理解和学习python,感兴趣的朋友可以了解下
    2020-12-12
  • Python+OpenCV进行人脸面部表情识别

    Python+OpenCV进行人脸面部表情识别

    这篇文章主要介绍了通过Python OpenCV实现对人脸面部表情识别,判断人是否为笑脸,文中的示例代码非常详细,需要的朋友可以参考一下
    2021-12-12
  • python实现给微信指定好友定时发送消息

    python实现给微信指定好友定时发送消息

    这篇文章主要为大家详细介绍了python实现给微信指定好友定时发消息,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • 一文详解Python中两大包管理与依赖管理工具(Poetry vs Pipenv)

    一文详解Python中两大包管理与依赖管理工具(Poetry vs Pipenv)

    在现代Python开发中,依赖管理是一个至关重要却又常常被忽视的环节,Poetry和Pipenv这两个现代化的Python依赖管理工具都旨在解决传统工具面临的问题,提供更优雅、更可靠的依赖管理体验,下面小编就为大家简单介绍一下吧
    2025-10-10
  • python+opencv实现动态物体识别

    python+opencv实现动态物体识别

    这篇文章主要为大家详细介绍了python+opencv实现动态物体识别,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • 用python实现英文字母和相应序数转换的方法

    用python实现英文字母和相应序数转换的方法

    这篇文章主要介绍了用python实现英文字母和相应序数转换的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09

最新评论