C#实现千万数据秒级导入的代码

 更新时间:2025年09月12日 09:12:46   作者:李袁明  
在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可以参考下

前言

在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,这里我就给大家分享一下千万数据秒级导入怎么实现

一、数据存储

这里使用到的数据库是clickhouse,因为像mysql、sqlserver这类关系型数据库对于大数据的支持不怎么好所以这里我使用的是clickhouse,它是由俄罗斯的yindex公司开发的列式存储数据库。查询语句这些和mysql差不多上手容易学起来简单。需要注意的是clickhouse对数据的删除比较严格一般都不会去删除它里面的数据库,如果要删除数据,我们一般都是采用用一个字段标记数据是否被删除,然后过滤掉这些被删除的数据,clickhouse的四种引擎也可以去了解一下

二、处理逻辑

优化前导入耗时两分钟以上优化后导入耗时30秒以内

优化前代码处理逻辑

优化后导入一千万数据大约需要两分钟完成处理。处理逻辑是将数据循环装入List中,当数据量达到预设的chunkSize阈值(150万)时开始批量插入。值得注意的是,1000万数据需要循环处理7次。这里存在一个常见误区:认为提高150万的阈值能加快处理速度。实际上,处理速度慢的原因并非循环次数,而是由于单线程运行无法充分发挥ClickHouse的高并发优势。

private async Task InsertContentsAsync2(string filePath, BlackListCreateDto dto, ulong maxId, long batchId)
 {
     Stopwatch stopwatch = new();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.Split(".").Last() == "txt";
     var chunkSize = 1500000;
     List<string> blackLists = new();
     int offset = 0;
     int total = 0;
     stopwatch.Start();
     if (isTxt)
     {
         using StreamReader sr = new(filePath);
         string line;
         while ((line = await sr.ReadLineAsync()) != null)
         {
             if (regex.IsMatch(line))
             {
                 blackLists.Add(line);
                 offset++;
                 total++;
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }
     else
     {

         var rows = await MiniExcel.QueryAsync(filePath);
         foreach (IEnumerable<dynamic> row in rows)
         {
             var pair = (IDictionary<string, object>)row;
             var cells = pair.Values;
             if (cells != null && cells.Any())
             {
                 foreach (var cell in cells)
                 {
                     if (cell is not null && cell is string str)
                     {
                         blackLists.Add(str);
                         offset++;
                         total++;
                     }

                 }
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }

     async Task InsertDataAsync(List<string> blackLists)
     {
         List<BlackList> list = blackLists.Select(it => CreateBlackList(dto, batchId, ++maxId, userId, it.ToString())).ToList();
         var result = await _clickHouseClientRepository.BulkInsertAsync(list, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
         blackLists.Clear();
     }
     stopwatch.Stop();
     Logger.LogInformation("文件路径:{filePath},{total}条数据, 耗时:{Elapsed} ", filePath, total, stopwatch.Elapsed);
 }

优化后的代码

优化后的代码将单线程执行模式改为多消费者并行处理,采用生产者-消费者模式分离IO和计算操作。文件读取时使用最大缓冲区和顺序扫描:

await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);

关键优化点包括:

  • 使用异步文件读取(FileStream异步API)
  • 预分配列表内存(new List(chunkSize))
  • 采用ClickHouse批量并行提交而非单条插入
  • 设置chunkSize为10万条
  • 使用BoundedChannel进行管道容量控制

关于管道容量控制的作用:

  1. 防止内存溢出:限制管道中积压的未处理数据量,避免生产者过快导致内存问题
  2. 实现自动背压(Backpressure):管道满时生产者会自动阻塞或丢弃数据(取决于配置),直到消费者处理部分数据

选择10万而非150万的原因:

原因1:内存考量

  • 150万条数据(假设每条20字节)单批次占用约30MB
  • 并行处理时多个批次会显著增加内存压力,可能触发GC影响性能
  • 10万条仅占用约2MB,对GC更友好,适合长期批处理

原因2:ClickHouse性能

  • BulkInsert在5万~10万批次时吞吐量最佳

原因3:并行处理效率

  • 10万条/批次能更好分配任务到多线程/核心
  • 150万条/批次可能导致任务分配不均

原因4:I/O优化

  • 大数据批次可能导致I/O阻塞,CPU空闲等待
  • 小批次(10万)能更好重叠I/O和计算,提升吞吐量
private async Task InsertContentsAsync(string filePath, BlackListCreateDto dto, ulong maxId, long batchId)
 {
     var stopwatch = Stopwatch.StartNew();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.EndsWith(".txt", StringComparison.OrdinalIgnoreCase);
     const int chunkSize = 100_000; 
     var totalProcessed = 0L;

     // 并行处理管道
     var processingChannel = Channel.CreateBounded<List<string>>(new BoundedChannelOptions(5)
     {
         SingleWriter = true,
         SingleReader = false,
         FullMode = BoundedChannelFullMode.Wait
     });

     // 启动并行消费者
     var processingTasks = Enumerable.Range(0, Environment.ProcessorCount)
         .Select(_ => Task.Run(async () =>
         {
             await foreach (var batch in processingChannel.Reader.ReadAllAsync())
             {
                 await ProcessBatchAsync(batch);
                 Interlocked.Add(ref totalProcessed, batch.Count);
             }
         })).ToArray();

     try
     {
         if (isTxt)
         {
             await ProcessTextFileAsync(filePath, regex, processingChannel.Writer, chunkSize);
         }
         else
         {
             await ProcessExcelFileAsync(filePath, processingChannel.Writer, chunkSize);
         }
     }
     finally
     {
         processingChannel.Writer.Complete();
         await Task.WhenAll(processingTasks);
         stopwatch.Stop();

         Logger.LogInformation("文件路径:{FilePath}, {Total}条数据, 耗时:{Elapsed}ms",
             filePath, totalProcessed, stopwatch.ElapsedMilliseconds);
     }

     async Task ProcessBatchAsync(List<string> batch)
     {
         var entities = batch
             .Select(phone => CreateBlackList(dto, batchId, ++maxId, userId, phone))
             .ToList();

         await _clickHouseClientRepository.BulkInsertAsync(
             entities,
             _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
     }
 }

 private async Task ProcessTextFileAsync(string filePath, Regex regex, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
     using var sr = new StreamReader(fs);

     var batch = new List<string>(chunkSize);
     string line;

     while ((line = await sr.ReadLineAsync()) != null)
     {
         if (regex.IsMatch(line))
         {
             batch.Add(line);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         }
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

 private async Task ProcessExcelFileAsync(string filePath, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read);
     using var reader = MiniExcel.QueryAsDataTable(stream);
     var batch = new List<string>(chunkSize);
     foreach (DataRow row in reader.Rows)
     {
         var cell = row[0].ToString();
         if (cell is string str && !string.IsNullOrWhiteSpace(str))
         {
             batch.Add(str);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         }
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

总结

到此这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这了,更多相关C#千万数据秒级导入内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C# 多线程学习之基础入门

    C# 多线程学习之基础入门

    线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。今天小编就带大家了解一下C#中的多线程,快来学习一下吧
    2021-12-12
  • C#使用SevenZipSharp实现压缩文件和目录

    C#使用SevenZipSharp实现压缩文件和目录

    SevenZipSharp压缩/解压(.7z .zip)”是指使用SevenZipSharp库进行7z和zip格式的文件压缩与解压缩操作,SevenZipSharp是C#语言封装的7-Zip API,它使得在.NET环境中调用7-Zip的功能变得简单易行,本文给大家介绍了C#使用SevenZipSharp实现压缩文件和目录
    2025-01-01
  • C#设计模式之Builder生成器模式解决带老婆配置电脑问题实例

    C#设计模式之Builder生成器模式解决带老婆配置电脑问题实例

    这篇文章主要介绍了C#设计模式之Builder生成器模式解决带老婆配置电脑问题,简单介绍了生成器模式的概念、功能并结合具体实例形式分析了C#生成器模式解决配电脑问题的步骤与相关操作技巧,需要的朋友可以参考下
    2017-09-09
  • 初步认识C#中的Lambda表达式和匿名方法

    初步认识C#中的Lambda表达式和匿名方法

    这篇文章主要介绍了初步认识C#中的Lambda表达式和匿名方法,本文着重讲解Lambda表达式和匿名方法的基础知识,需要的朋友可以参考下
    2015-01-01
  • WPF设置窗体可以使用鼠标拖动大小的方法

    WPF设置窗体可以使用鼠标拖动大小的方法

    这篇文章主要介绍了WPF设置窗体可以使用鼠标拖动大小的方法,涉及针对窗口的操作与设置技巧,具有很好的借鉴价值,需要的朋友可以参考下
    2014-11-11
  • 适用于WebForm Mvc的Pager分页组件C#实现

    适用于WebForm Mvc的Pager分页组件C#实现

    这篇文章主要为大家分享了适用于WebForm Mvc的Pager分页组件,由C#实现,感兴趣的小伙伴们可以参考一下
    2016-04-04
  • 解决Unity项目中UI脚本丢失的问题

    解决Unity项目中UI脚本丢失的问题

    这篇文章主要介绍了解决Unity项目中UI脚本丢失的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • Winform ComboBox如何独立绘制下拉选项的字体颜色

    Winform ComboBox如何独立绘制下拉选项的字体颜色

    这篇文章主要介绍了Winform ComboBox如何独立绘制下拉选项的字体颜色,帮助大家更好的理解和使用c# winform,感兴趣的朋友可以了解下
    2020-11-11
  • 基于C#实现进程回收管理工具

    基于C#实现进程回收管理工具

    这篇文章主要为大家详细介绍了入户基于C#实现一个进程回收管理工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-04-04
  • c# 进程内部的同步

    c# 进程内部的同步

    这篇文章主要介绍了c# 进程内部的同步,帮助大家更好的理解和学习c#,感兴趣的朋友可以了解下
    2020-10-10

最新评论