Python实现批量将CSV按行或按列拆分

 更新时间:2026年03月04日 08:27:20   作者:小庄-Python办公  
这篇文章主要介绍了一个基于Python标准库csv和PyQt5的高性能CSV拆分工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

本文完整拆解一个可用于生产的 CSV 拆分工具,从架构、核心算法、GUI 与多线程到异常与性能边界,全部基于 Python 标准库 csv 与 PyQt5 实现。

效果图

需求与目标

核心目标是实现一个桌面 GUI:

  • 多文件选择与拖拽列表
  • 按行拆分与按列拆分两种模式,参数联动
  • 可配置输出目录、编码、分隔符、命名规则与覆盖策略
  • 支持 1GB / 500 万行规模,内存占用低,GUI 不阻塞
  • 实时进度、日志与异常提示,支持取消与清理

总体设计

项目结构极简,仅一个入口脚本:

  • GUI 与业务逻辑全部在 [main.py]
  • 核心拆分函数 split_csv_file 完成流式拆分与异常捕获
  • Worker 在 QThread 中执行,保证 GUI 响应
  • 日志、进度通过 Qt 信号回传 UI

设计重点是把“耗时 IO 与 CPU”放在后台线程,把“交互与展示”留给主线程。

CSV 拆分核心逻辑

核心函数是 [split_csv_file],仅使用 csv.reader/csv.writer,满足低内存与高兼容性。

1. 统一设置与结果模型

通过 SplitSettingsFileResult 描述输入配置与输出状态:

  • 选择拆分模式、编码、分隔符
  • 命名规则(前缀/后缀/序号格式)
  • 是否保留表头、覆盖策略
  • 统一错误信息与 traceback

2. 按行拆分

按行拆分包含两种子模式:

  • 按固定行数切分
  • 按唯一列值分组输出

按固定行数

关键点:

  • 每达到 rows_per_file 就关闭当前输出文件,启动下一个
  • 表头按需写入
  • 通过 row_index 控制进度刷新,避免频繁更新 UI

示例片段(仅为理解):

if count == 0:
    seq_text = format_sequence(seq, settings.seq_format)
    filename = f"{settings.prefix}{seq_text}{settings.suffix}.csv"
    out_file = os.path.join(settings.output_dir, filename)

按唯一列值

核心策略是使用 WriterCache 复用输出文件句柄,避免每行开关文件:

  • WriterCache 维护有限数量打开文件
  • 超出上限后按 LRU 关闭句柄
  • 已创建文件用 append 追加,避免重复写表头

3. 按列拆分与二次按行

按列拆分本质上也是“按唯一值分组输出”,并可选二次按行拆分:

  • 先按列值生成文件名
  • 若开启二次拆分,按行数再切分出 value_序号.csv

4. 文件名安全与序号格式

为防止非法字符,统一使用 sanitize_filename 处理列值:

  • 替换系统禁止字符
  • 空值统一成 EMPTY
  • 过长截断

序号格式支持两种:

  • 纯数字模板,如 0001
  • format 模式,如 {0:04d}

GUI 设计与交互流

GUI 核心在拆分成多个可复用面板。

1. 文件选择与拖拽

FileListWidget 继承 QListWidget,实现拖拽导入:

  • 过滤 csv 后缀
  • 自动去重

2. 模式切换

按行/按列通过 QStackedWidget 互斥显示:

  • self.mode_rowself.mode_col 绑定
  • _switch_mode 切换索引

3. 参数联动与列名读取

当文件列表变化或编码/分隔符变化时,自动刷新列名:

  • 读取首行作为表头
  • 更新“按行唯一列”和“按列拆分列”下拉框

4. 输出配置与命名规则

统一面板提供:

  • 输出目录选择
  • 分隔符(逗号/分号/制表符/竖线)
  • 编码(UTF-8/UTF-8-sig/GBK)
  • 覆盖或跳过策略
  • 命名规则(前缀/后缀/序号格式)

多线程与任务生命周期

耗时任务在 QThread 中执行:

  • run() 循环处理文件
  • 向 UI 发射总体进度与单文件进度
  • 每个文件独立成功/失败计数,互不阻塞

调用流程:

  • _start() 创建线程与 Worker
  • 绑定信号:log、progress、finished
  • worker_thread.start() 开始后台执行

进度与日志设计

进度条

进度分两层:

  • 当前文件进度:读取 file.tell() 与文件大小比例
  • 总体进度:按文件大小加权

日志

日志输出包含时间戳与异常堆栈:

  • timestamp() 统一格式
  • 失败时记录 traceback
  • 支持导出为文本文件

异常处理与容错策略

异常处理贯穿拆分流程:

  • 编码错误 UnicodeDecodeError
  • 列名缺失 ValueError
  • 磁盘空间不足 OSError + errno.ENOSPC

设计要点:

  • 单文件失败不影响其他文件
  • 失败信息写入日志并弹窗提示
  • 失败数在任务结束汇总

性能与内存控制

针对 1GB 大文件设计要点:

  • csv.reader 流式读取,行级处理
  • 输出文件使用 WriterCache 限制句柄数量
  • 不缓存整文件或整列内容

内存占用主要来自:

  • 现有行字符串对象
  • 已打开的输出句柄
  • GUI 层的日志与列表

对于“按唯一列值”的模式,如果唯一值极多:

  • 可适当降低 max_open_files
  • 采用“跳过已存在文件”减少 IO

源代码

'''
作者:小庄-Python办公
公众号:小庄-Python学习
'''
import sys
import os
import csv
import traceback
import errno
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

from PyQt5.QtCore import Qt, QObject, pyqtSignal, QThread
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import (
    QApplication,
    QMainWindow,
    QWidget,
    QFileDialog,
    QListWidget,
    QListWidgetItem,
    QVBoxLayout,
    QHBoxLayout,
    QLabel,
    QPushButton,
    QRadioButton,
    QButtonGroup,
    QStackedWidget,
    QGroupBox,
    QLineEdit,
    QSpinBox,
    QComboBox,
    QCheckBox,
    QTextEdit,
    QMessageBox,
    QProgressBar,
)


def timestamp() -> str:
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def sanitize_filename(value: str) -> str:
    if value is None:
        value = ""
    value = str(value).strip()
    if not value:
        value = "EMPTY"
    for ch in ['\\', '/', ':', '*', '?', '"', '<', '>', '|']:
        value = value.replace(ch, "_")
    value = value.replace("\n", "_").replace("\r", "_").replace("\t", "_")
    value = value.strip(" .")
    if not value:
        value = "EMPTY"
    if len(value) > 120:
        value = value[:120]
    return value


def format_sequence(seq: int, pattern: str) -> str:
    if "{" in pattern and "}" in pattern:
        return pattern.format(seq)
    width = max(1, len(pattern))
    return str(seq).zfill(width)


def ensure_dir(path: str) -> None:
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)


@dataclass
class SplitSettings:
    mode: str
    row_mode: str
    rows_per_file: int
    unique_column: str
    col_column: str
    col_secondary_split: bool
    col_secondary_rows: int
    prefix: str
    suffix: str
    seq_format: str
    keep_header: bool
    delimiter: str
    encoding: str
    output_dir: str
    on_exists: str
    max_open_files: int = 64


@dataclass
class FileResult:
    file_path: str
    success: bool
    created_files: List[str] = field(default_factory=list)
    skipped_files: List[str] = field(default_factory=list)
    error_message: str = ""
    exception: Optional[Exception] = None
    traceback_text: str = ""


def read_header(file_path: str, encoding: str, delimiter: str) -> List[str]:
    with open(file_path, "r", newline="", encoding=encoding) as f:
        reader = csv.reader(f, delimiter=delimiter)
        return next(reader, [])


class WriterCache:
    def __init__(self, max_open: int, created_set: set):
        self.max_open = max_open
        self._order: List[str] = []
        self._writers: Dict[str, Tuple[object, csv.writer]] = {}
        self._created_set = created_set

    def get_writer(self, key: str, file_path: str, encoding: str, delimiter: str, header: Optional[List[str]], on_exists: str, created_files: List[str], skipped_files: List[str]) -> Optional[csv.writer]:
        if key in self._writers:
            if key in self._order:
                self._order.remove(key)
            self._order.append(key)
            return self._writers[key][1]

        file_exists = os.path.exists(file_path)
        if file_exists and file_path not in self._created_set and on_exists == "skip":
            skipped_files.append(file_path)
            return None

        ensure_dir(os.path.dirname(file_path))
        if file_exists and file_path in self._created_set:
            mode = "a"
            header_to_write = None
        else:
            mode = "w"
            header_to_write = header
            self._created_set.add(file_path)
            created_files.append(file_path)
        f = open(file_path, mode, newline="", encoding=encoding)
        writer = csv.writer(f, delimiter=delimiter)
        if header_to_write:
            writer.writerow(header_to_write)
        self._writers[key] = (f, writer)
        self._order.append(key)
        while len(self._order) > self.max_open:
            evict_key = self._order.pop(0)
            handle, _ = self._writers.pop(evict_key)
            try:
                handle.close()
            except Exception:
                pass
        return writer

    def close_all(self) -> None:
        for handle, _ in self._writers.values():
            try:
                handle.close()
            except Exception:
                pass
        self._writers.clear()
        self._order.clear()


def split_csv_file(
    file_path: str,
    settings: SplitSettings,
    log_cb: Callable[[str], None],
    progress_cb: Callable[[float], None],
    cancel_cb: Callable[[], bool],
) -> FileResult:
    result = FileResult(file_path=file_path, success=True)
    file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
    file_size = max(1, file_size)

    def update_progress(file_obj) -> None:
        try:
            ratio = min(1.0, file_obj.tell() / file_size)
            progress_cb(ratio)
        except Exception:
            pass

    try:
        with open(file_path, "r", newline="", encoding=settings.encoding) as f:
            reader = csv.reader(f, delimiter=settings.delimiter)
            header = next(reader, None)
            if header is None:
                return result
            header_row = header if settings.keep_header else None

            created_set: set = set()
            if settings.mode == "row":
                if settings.row_mode == "fixed":
                    seq = 1
                    count = 0
                    row_index = 0
                    writer = None
                    handle = None
                    for row in reader:
                        if cancel_cb():
                            result.success = False
                            result.error_message = "用户取消"
                            break
                        if count == 0:
                            seq_text = format_sequence(seq, settings.seq_format)
                            filename = f"{settings.prefix}{seq_text}{settings.suffix}.csv"
                            out_file = os.path.join(settings.output_dir, filename)
                            if os.path.exists(out_file) and settings.on_exists == "skip":
                                result.skipped_files.append(out_file)
                                writer = None
                                handle = None
                            else:
                                ensure_dir(settings.output_dir)
                                handle = open(out_file, "w", newline="", encoding=settings.encoding)
                                writer = csv.writer(handle, delimiter=settings.delimiter)
                                if header_row:
                                    writer.writerow(header_row)
                                result.created_files.append(out_file)
                            seq += 1
                        if writer:
                            writer.writerow(row)
                        count += 1
                        row_index += 1
                        if count >= settings.rows_per_file:
                            count = 0
                            if handle:
                                try:
                                    handle.close()
                                except Exception:
                                    pass
                            writer = None
                            handle = None
                        if row_index % 2000 == 0:
                            update_progress(f)
                    if handle:
                        try:
                            handle.close()
                        except Exception:
                            pass
                    update_progress(f)
                else:
                    if settings.unique_column not in header:
                        raise ValueError(f"列名不存在: {settings.unique_column}")
                    col_index = header.index(settings.unique_column)
                    cache = WriterCache(settings.max_open_files, created_set)
                    skipped_values: set = set()
                    for row in reader:
                        if cancel_cb():
                            result.success = False
                            result.error_message = "用户取消"
                            break
                        key = sanitize_filename(row[col_index] if col_index < len(row) else "")
                        if key in skipped_values:
                            continue
                        filename = f"{settings.prefix}{key}{settings.suffix}.csv"
                        out_file = os.path.join(settings.output_dir, filename)
                        writer = cache.get_writer(key, out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                        if writer is None:
                            skipped_values.add(key)
                            continue
                        writer.writerow(row)
                        if len(result.created_files) % 100 == 0:
                            update_progress(f)
                    cache.close_all()
                    update_progress(f)
            else:
                if settings.col_column not in header:
                    raise ValueError(f"列名不存在: {settings.col_column}")
                col_index = header.index(settings.col_column)
                cache = WriterCache(settings.max_open_files, created_set)
                value_state: Dict[str, Dict[str, int]] = {}
                skipped_values: set = set()
                for row in reader:
                    if cancel_cb():
                        result.success = False
                        result.error_message = "用户取消"
                        break
                    value = sanitize_filename(row[col_index] if col_index < len(row) else "")
                    if value in skipped_values:
                        continue
                    if settings.col_secondary_split:
                        state = value_state.setdefault(value, {"seq": 1, "count": 0})
                        if state["count"] == 0:
                            seq_text = format_sequence(state["seq"], settings.seq_format)
                            filename = f"{settings.prefix}{value}_{seq_text}{settings.suffix}.csv"
                            out_file = os.path.join(settings.output_dir, filename)
                            writer = cache.get_writer(f"{value}:{state['seq']}", out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                            if writer is None:
                                skipped_values.add(value)
                                continue
                            state["seq"] += 1
                        else:
                            writer = cache.get_writer(f"{value}:{state['seq']-1}", os.path.join(settings.output_dir, f"{settings.prefix}{value}_{format_sequence(state['seq']-1, settings.seq_format)}{settings.suffix}.csv"), settings.encoding, settings.delimiter, None, settings.on_exists, result.created_files, result.skipped_files)
                        if writer:
                            writer.writerow(row)
                        state["count"] += 1
                        if state["count"] >= settings.col_secondary_rows:
                            state["count"] = 0
                    else:
                        filename = f"{settings.prefix}{value}{settings.suffix}.csv"
                        out_file = os.path.join(settings.output_dir, filename)
                        writer = cache.get_writer(value, out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                        if writer is None:
                            skipped_values.add(value)
                            continue
                        writer.writerow(row)
                    if len(result.created_files) % 100 == 0:
                        update_progress(f)
                cache.close_all()
                update_progress(f)
    except UnicodeDecodeError as e:
        result.success = False
        result.error_message = f"编码错误: {e}"
        result.exception = e
        result.traceback_text = traceback.format_exc()
    except Exception as e:
        result.success = False
        result.error_message = str(e)
        result.exception = e
        result.traceback_text = traceback.format_exc()

    if result.exception and isinstance(result.exception, OSError) and getattr(result.exception, "errno", None) == errno.ENOSPC:
        log_cb(f"{timestamp()} 磁盘空间不足: {result.exception}")
    return result


class Worker(QObject):
    overall_progress = pyqtSignal(int)
    file_progress = pyqtSignal(int)
    log = pyqtSignal(str)
    finished = pyqtSignal(dict)
    show_error = pyqtSignal(str, str)

    def __init__(self, files: List[str], settings: SplitSettings):
        super().__init__()
        self.files = files
        self.settings = settings
        self._cancel = False
        self._created_files: List[str] = []

    def cancel(self) -> None:
        self._cancel = True

    def _is_cancelled(self) -> bool:
        return self._cancel

    def run(self) -> None:
        total_size = sum(max(1, os.path.getsize(p)) for p in self.files)
        processed_size = 0
        success_count = 0
        fail_count = 0

        for file_path in self.files:
            if self._cancel:
                break
            file_size = max(1, os.path.getsize(file_path))
            self.log.emit(f"{timestamp()} 开始处理: {file_path}")

            def log_cb(msg: str) -> None:
                self.log.emit(msg)

            def progress_cb(ratio: float) -> None:
                self.file_progress.emit(int(ratio * 100))
                overall_ratio = min(1.0, (processed_size + ratio * file_size) / total_size)
                self.overall_progress.emit(int(overall_ratio * 100))

            result = split_csv_file(file_path, self.settings, log_cb, progress_cb, self._is_cancelled)
            self._created_files.extend(result.created_files)
            if result.success:
                success_count += 1
                self.log.emit(f"{timestamp()} 完成: {file_path} 输出 {len(result.created_files)} 个文件")
            else:
                fail_count += 1
                detail = result.error_message
                if result.traceback_text:
                    detail = f"{result.error_message}\n{result.traceback_text}"
                self.log.emit(f"{timestamp()} 失败: {file_path}\n{detail}")
                if isinstance(result.exception, OSError) and getattr(result.exception, "errno", None) == errno.ENOSPC:
                    self.show_error.emit("磁盘空间不足", detail)
            processed_size += file_size
            self.file_progress.emit(100)
            self.overall_progress.emit(int(min(1.0, processed_size / total_size) * 100))

        summary = {
            "success": success_count,
            "failed": fail_count,
            "cancelled": self._cancel,
        }
        if self._cancel:
            for path in self._created_files:
                try:
                    if os.path.exists(path):
                        os.remove(path)
                except Exception:
                    pass
        self.finished.emit(summary)


class FileListWidget(QListWidget):
    def __init__(self):
        super().__init__()
        self.setSelectionMode(QListWidget.ExtendedSelection)
        self.setAcceptDrops(True)

    def dragEnterEvent(self, event):
        if event.mimeData().hasUrls():
            event.acceptProposedAction()
        else:
            super().dragEnterEvent(event)

    def dropEvent(self, event):
        if event.mimeData().hasUrls():
            for url in event.mimeData().urls():
                path = url.toLocalFile()
                if path and path.lower().endswith(".csv"):
                    self.add_file(path)
            event.acceptProposedAction()
        else:
            super().dropEvent(event)

    def add_file(self, path: str) -> None:
        for i in range(self.count()):
            if self.item(i).text() == path:
                return
        self.addItem(QListWidgetItem(path))

    def files(self) -> List[str]:
        return [self.item(i).text() for i in range(self.count())]


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("CSV 拆分工具 作者:小庄-Python办公")
        self.resize(1000, 720)

        self.file_list = FileListWidget()
        self.add_btn = QPushButton("添加文件")
        self.remove_btn = QPushButton("移除选中")
        self.clear_btn = QPushButton("清空列表")

        self.mode_row = QRadioButton("按行拆分")
        self.mode_col = QRadioButton("按列拆分")
        self.mode_row.setChecked(True)
        self.mode_group = QButtonGroup()
        self.mode_group.addButton(self.mode_row)
        self.mode_group.addButton(self.mode_col)

        self.stack = QStackedWidget()
        self.row_panel = self._build_row_panel()
        self.col_panel = self._build_col_panel()
        self.stack.addWidget(self.row_panel)
        self.stack.addWidget(self.col_panel)

        self.prefix_input = QLineEdit("part_")
        self.suffix_input = QLineEdit("")
        self.seq_input = QLineEdit("0001")
        self.keep_header = QCheckBox("保留表头")
        self.keep_header.setChecked(True)

        self.output_dir_input = QLineEdit("")
        self.output_browse_btn = QPushButton("选择输出目录")
        self.delimiter_combo = QComboBox()
        self.delimiter_combo.addItems([",", ";", "\\t", "|"])
        self.encoding_combo = QComboBox()
        self.encoding_combo.addItems(["UTF-8", "UTF-8-sig", "GBK"])
        self.exists_overwrite = QRadioButton("覆盖")
        self.exists_skip = QRadioButton("跳过")
        self.exists_overwrite.setChecked(True)

        self.start_btn = QPushButton("开始")
        self.cancel_btn = QPushButton("取消")
        self.cancel_btn.setEnabled(False)
        self.export_log_btn = QPushButton("导出日志")

        self.total_progress = QProgressBar()
        self.file_progress = QProgressBar()
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        self.log_text.setFont(QFont("Consolas", 10))

        layout = QVBoxLayout()
        layout.addWidget(self._build_files_group())
        layout.addWidget(self._build_mode_group())
        layout.addWidget(self._build_naming_group())
        layout.addWidget(self._build_output_group())
        layout.addWidget(self._build_progress_group())
        layout.addWidget(self._build_log_group())

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.add_btn.clicked.connect(self._add_files)
        self.remove_btn.clicked.connect(self._remove_selected)
        self.clear_btn.clicked.connect(self.file_list.clear)
        self.mode_row.toggled.connect(self._switch_mode)
        self.mode_col.toggled.connect(self._switch_mode)
        self.output_browse_btn.clicked.connect(self._choose_output_dir)
        self.start_btn.clicked.connect(self._start)
        self.cancel_btn.clicked.connect(self._cancel)
        self.export_log_btn.clicked.connect(self._export_log)
        self.delimiter_combo.currentIndexChanged.connect(self._refresh_headers)
        self.encoding_combo.currentIndexChanged.connect(self._refresh_headers)
        self.file_list.itemSelectionChanged.connect(self._refresh_headers)

        self.worker_thread: Optional[QThread] = None
        self.worker: Optional[Worker] = None

    def _build_files_group(self) -> QGroupBox:
        group = QGroupBox("文件选择")
        layout = QVBoxLayout()
        btn_layout = QHBoxLayout()
        btn_layout.addWidget(self.add_btn)
        btn_layout.addWidget(self.remove_btn)
        btn_layout.addWidget(self.clear_btn)
        layout.addLayout(btn_layout)
        layout.addWidget(self.file_list)
        group.setLayout(layout)
        return group

    def _build_mode_group(self) -> QGroupBox:
        group = QGroupBox("拆分模式")
        layout = QVBoxLayout()
        mode_layout = QHBoxLayout()
        mode_layout.addWidget(self.mode_row)
        mode_layout.addWidget(self.mode_col)
        layout.addLayout(mode_layout)
        layout.addWidget(self.stack)
        group.setLayout(layout)
        return group

    def _build_row_panel(self) -> QWidget:
        panel = QWidget()
        layout = QVBoxLayout()
        self.row_fixed = QRadioButton("按固定行数")
        self.row_unique = QRadioButton("按唯一列值")
        self.row_fixed.setChecked(True)
        self.row_group = QButtonGroup()
        self.row_group.addButton(self.row_fixed)
        self.row_group.addButton(self.row_unique)
        row_mode_layout = QHBoxLayout()
        row_mode_layout.addWidget(self.row_fixed)
        row_mode_layout.addWidget(self.row_unique)
        layout.addLayout(row_mode_layout)

        fixed_layout = QHBoxLayout()
        fixed_layout.addWidget(QLabel("每文件行数"))
        self.rows_per_file = QSpinBox()
        self.rows_per_file.setRange(1, 10_000_000)
        self.rows_per_file.setValue(100000)
        fixed_layout.addWidget(self.rows_per_file)
        layout.addLayout(fixed_layout)

        unique_layout = QHBoxLayout()
        unique_layout.addWidget(QLabel("列名"))
        self.row_unique_column = QComboBox()
        unique_layout.addWidget(self.row_unique_column)
        layout.addLayout(unique_layout)

        panel.setLayout(layout)
        return panel

    def _build_col_panel(self) -> QWidget:
        panel = QWidget()
        layout = QVBoxLayout()
        col_layout = QHBoxLayout()
        col_layout.addWidget(QLabel("列名"))
        self.col_column = QComboBox()
        col_layout.addWidget(self.col_column)
        layout.addLayout(col_layout)

        split_layout = QHBoxLayout()
        self.col_secondary = QCheckBox("每子文件再按行数拆分")
        split_layout.addWidget(self.col_secondary)
        split_layout.addWidget(QLabel("行数"))
        self.col_secondary_rows = QSpinBox()
        self.col_secondary_rows.setRange(1, 10_000_000)
        self.col_secondary_rows.setValue(100000)
        split_layout.addWidget(self.col_secondary_rows)
        layout.addLayout(split_layout)
        panel.setLayout(layout)
        return panel

    def _build_naming_group(self) -> QGroupBox:
        group = QGroupBox("命名规则")
        layout = QVBoxLayout()
        row1 = QHBoxLayout()
        row1.addWidget(QLabel("前缀"))
        row1.addWidget(self.prefix_input)
        row1.addWidget(QLabel("后缀"))
        row1.addWidget(self.suffix_input)
        layout.addLayout(row1)
        row2 = QHBoxLayout()
        row2.addWidget(QLabel("序号格式"))
        row2.addWidget(self.seq_input)
        row2.addWidget(self.keep_header)
        layout.addLayout(row2)
        group.setLayout(layout)
        return group

    def _build_output_group(self) -> QGroupBox:
        group = QGroupBox("输出配置")
        layout = QVBoxLayout()
        row1 = QHBoxLayout()
        row1.addWidget(QLabel("输出目录"))
        row1.addWidget(self.output_dir_input)
        row1.addWidget(self.output_browse_btn)
        layout.addLayout(row1)

        row2 = QHBoxLayout()
        row2.addWidget(QLabel("分隔符"))
        row2.addWidget(self.delimiter_combo)
        row2.addWidget(QLabel("编码"))
        row2.addWidget(self.encoding_combo)
        layout.addLayout(row2)

        row3 = QHBoxLayout()
        row3.addWidget(QLabel("已存在文件"))
        row3.addWidget(self.exists_overwrite)
        row3.addWidget(self.exists_skip)
        layout.addLayout(row3)
        group.setLayout(layout)
        return group

    def _build_progress_group(self) -> QGroupBox:
        group = QGroupBox("进度")
        layout = QVBoxLayout()
        layout.addWidget(QLabel("总体进度"))
        layout.addWidget(self.total_progress)
        layout.addWidget(QLabel("当前文件进度"))
        layout.addWidget(self.file_progress)
        row = QHBoxLayout()
        row.addWidget(self.start_btn)
        row.addWidget(self.cancel_btn)
        row.addWidget(self.export_log_btn)
        layout.addLayout(row)
        group.setLayout(layout)
        return group

    def _build_log_group(self) -> QGroupBox:
        group = QGroupBox("日志")
        layout = QVBoxLayout()
        layout.addWidget(self.log_text)
        group.setLayout(layout)
        return group

    def _switch_mode(self) -> None:
        self.stack.setCurrentIndex(0 if self.mode_row.isChecked() else 1)

    def _add_files(self) -> None:
        files, _ = QFileDialog.getOpenFileNames(self, "选择 CSV 文件", "", "CSV Files (*.csv)")
        for path in files:
            self.file_list.add_file(path)
        self._refresh_headers()

    def _remove_selected(self) -> None:
        for item in self.file_list.selectedItems():
            row = self.file_list.row(item)
            self.file_list.takeItem(row)
        self._refresh_headers()

    def _choose_output_dir(self) -> None:
        path = QFileDialog.getExistingDirectory(self, "选择输出目录")
        if path:
            self.output_dir_input.setText(path)

    def _delimiter_value(self) -> str:
        text = self.delimiter_combo.currentText()
        return "\t" if text == "\\t" else text

    def _refresh_headers(self) -> None:
        files = self.file_list.files()
        if not files:
            self.row_unique_column.clear()
            self.col_column.clear()
            return
        path = files[0]
        try:
            header = read_header(path, self.encoding_combo.currentText(), self._delimiter_value())
            self.row_unique_column.clear()
            self.col_column.clear()
            self.row_unique_column.addItems(header)
            self.col_column.addItems(header)
        except Exception as e:
            self.log_text.append(f"{timestamp()} 读取表头失败: {e}")
            self.row_unique_column.clear()
            self.col_column.clear()

    def _build_settings(self) -> Optional[SplitSettings]:
        files = self.file_list.files()
        if not files:
            QMessageBox.warning(self, "提示", "请先选择 CSV 文件")
            return None
        output_dir = self.output_dir_input.text().strip()
        if not output_dir:
            QMessageBox.warning(self, "提示", "请指定输出目录")
            return None
        delimiter = self._delimiter_value()
        encoding = self.encoding_combo.currentText()
        on_exists = "overwrite" if self.exists_overwrite.isChecked() else "skip"
        mode = "row" if self.mode_row.isChecked() else "col"
        row_mode = "fixed" if self.row_fixed.isChecked() else "unique"
        settings = SplitSettings(
            mode=mode,
            row_mode=row_mode,
            rows_per_file=self.rows_per_file.value(),
            unique_column=self.row_unique_column.currentText(),
            col_column=self.col_column.currentText(),
            col_secondary_split=self.col_secondary.isChecked(),
            col_secondary_rows=self.col_secondary_rows.value(),
            prefix=self.prefix_input.text(),
            suffix=self.suffix_input.text(),
            seq_format=self.seq_input.text() or "0001",
            keep_header=self.keep_header.isChecked(),
            delimiter=delimiter,
            encoding=encoding,
            output_dir=output_dir,
            on_exists=on_exists,
        )
        return settings

    def _start(self) -> None:
        settings = self._build_settings()
        if not settings:
            return
        files = self.file_list.files()
        ensure_dir(settings.output_dir)
        self.total_progress.setValue(0)
        self.file_progress.setValue(0)
        self.start_btn.setEnabled(False)
        self.cancel_btn.setEnabled(True)
        self.log_text.append(f"{timestamp()} 任务开始,文件数: {len(files)}")

        self.worker_thread = QThread()
        self.worker = Worker(files, settings)
        self.worker.moveToThread(self.worker_thread)
        self.worker_thread.started.connect(self.worker.run)
        self.worker.log.connect(self.log_text.append)
        self.worker.file_progress.connect(self.file_progress.setValue)
        self.worker.overall_progress.connect(self.total_progress.setValue)
        self.worker.finished.connect(self._finish)
        self.worker.show_error.connect(self._show_error)
        self.worker_thread.start()

    def _finish(self, summary: dict) -> None:
        if self.worker_thread:
            self.worker_thread.quit()
            self.worker_thread.wait()
        self.start_btn.setEnabled(True)
        self.cancel_btn.setEnabled(False)
        if summary.get("cancelled"):
            QMessageBox.information(self, "已取消", "任务已取消,临时文件已清理")
        else:
            QMessageBox.information(
                self,
                "完成",
                f"成功: {summary.get('success', 0)},失败: {summary.get('failed', 0)}",
            )

    def _cancel(self) -> None:
        if self.worker:
            self.worker.cancel()
            self.log_text.append(f"{timestamp()} 用户取消任务")
            self.cancel_btn.setEnabled(False)

    def _show_error(self, title: str, detail: str) -> None:
        QMessageBox.critical(self, title, detail)

    def _export_log(self) -> None:
        path, _ = QFileDialog.getSaveFileName(self, "导出日志", "log.txt", "Text Files (*.txt)")
        if not path:
            return
        try:
            with open(path, "w", encoding="utf-8") as f:
                f.write(self.log_text.toPlainText())
            QMessageBox.information(self, "成功", "日志已导出")
        except Exception as e:
            QMessageBox.critical(self, "失败", str(e))


def main() -> None:
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()

可扩展方向

如果继续增强,可考虑:

  • 增加“预览前 N 行”功能
  • 支持保存与加载配置
  • 增加“输出文件压缩”为 zip
  • 提供“拆分后统计报表”

结语

这个工具的核心理念是“稳定、低内存、高可用”。通过标准库 csv 与 PyQt5 的组合,达成了性能与交互的平衡,并保持项目结构简单、易维护。对于日常办公与大规模数据拆分,已具备可直接使用的工程级质量。

以上就是Python实现批量将CSV按行或按列拆分的详细内容,更多关于Python拆分CSV的资料请关注脚本之家其它相关文章!

相关文章

  • Python实现将Unicode转换为ASCII

    Python实现将Unicode转换为ASCII

    这篇文章主要为大家详细介绍了系统编码的不同方法以及如何利用Python实现将Unicode转换为 ASCII,文中的示例代码讲解详细,有需要的小伙伴可以学习一下
    2023-10-10
  • 用python求一重积分和二重积分的例子

    用python求一重积分和二重积分的例子

    今天小编就为大家分享一篇用python求一重积分和二重积分的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • Python爬虫小例子——爬取51job发布的工作职位

    Python爬虫小例子——爬取51job发布的工作职位

    这篇文章主要介绍了Python爬取51job发布的工作职位,文中讲解非常细致,帮助大家更好的理解和学习Python,感兴趣的朋友可以了解下
    2020-07-07
  • Python之requests高级用法详解

    Python之requests高级用法详解

    这篇文章主要介绍了Python之requests高级用法详解,http协议是无状态的, 服务器无法区分多个请求是否来自同一个人,因此需要用cookie来进行标识,一般如果需要登录才能访问的网站就需要发送cookie信息,需要的朋友可以参考下
    2023-10-10
  • python中实现根据坐标点位置求方位角

    python中实现根据坐标点位置求方位角

    这篇文章主要介绍了python中实现根据坐标点位置求方位角方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-08-08
  • Pandas的数据过滤实现

    Pandas的数据过滤实现

    这篇文章主要介绍了Pandas的数据过滤实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • Python获取邮件地址的方法

    Python获取邮件地址的方法

    这篇文章主要介绍了Python获取邮件地址的方法,通过自定义函数分析提取字符串中邮件地址的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • python控制台实现tab补全和清屏的例子

    python控制台实现tab补全和清屏的例子

    今天小编就为大家分享一篇python控制台实现tab补全和清屏的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • Python format字符串格式化函数的使用

    Python format字符串格式化函数的使用

    本文主要介绍了Python format字符串格式化函数的使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-01-01
  • Python seaborn数据可视化绘图(直方图,密度图,散点图)

    Python seaborn数据可视化绘图(直方图,密度图,散点图)

    这篇文章主要介绍了Python seaborn数据可视化绘图(直方图,密度图,散点图),文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
    2022-07-07

最新评论