RocketMQ设计之异步刷盘

 更新时间:2022年03月21日 10:29:20   作者:周杰伦本人  
本文介绍RocketMQ设计之异步刷盘,RocketMQ消息存储到磁盘上,这样既保证断电后恢复,也让存储消息量超出内存限制,RocketMQ为了提高性能,会尽可能保证磁盘顺序写,消息通过Producer写入RocketMQ的时候,有两种方式,上篇介绍了同步刷盘,本文介绍异步刷盘,需要的朋友可以参考下

上一篇RocketMQ设计之同步刷盘

异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

            //每次Flush间隔500毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            //每次Flush最少4页内存数据(16KB)
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

               //距离上次刷盘时间阈值为10秒
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }

                if (printFlushProgress) {
                    this.printFlushProgress();
                }

                long begin = System.currentTimeMillis();
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }

        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }

        this.printFlushProgress();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return FlushRealTimeService.class.getSimpleName();
    }

    private void printFlushProgress() {
        // CommitLog.log.info("how much disk fall behind memory, "
        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}
  • 判断是否超过10秒没刷盘了,如果超过强制刷盘
  • 等待Flush间隔500ms
  • 通过MappedFile刷盘
  • 设置StoreCheckpoint刷盘时间点
  • 超过500ms的刷盘记录日志
  • Broker正常停止前,把内存page中的数据刷盘

开启缓冲池:

class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            //每次提交间隔200毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

            //每次提交最少4页内存数据(16KB)
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            //距离上次提交时间阈值为200毫秒
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
}

RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

  • 判断是否超过200毫秒没提交,需要强制提交
  • 提交到MappedFile,此时还未刷盘
  • 然后唤醒刷盘线程
  • 在Broker正常停止前,提交内存page中的数据

到此这篇关于RocketMQ设计之异步刷盘的文章就介绍到这了,更多相关RocketMQ异步刷盘内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java利用Dijkstra算法求解拓扑关系最短路径

    Java利用Dijkstra算法求解拓扑关系最短路径

    迪杰斯特拉算法(Dijkstra)是由荷兰计算机科学迪家迪杰斯特拉于1959年提出的,因此又叫狄克斯特拉算法。本文将利用迪克斯特拉(Dijkstra)算法求拓扑关系最短路径,感兴趣的可以了解一下
    2022-07-07
  • Java定义队列结构,并实现入队、出队操作完整示例

    Java定义队列结构,并实现入队、出队操作完整示例

    这篇文章主要介绍了Java定义队列结构,并实现入队、出队操作,结合完整实例形式分析了java数据结构中队列的定义、入队、出队、判断队列是否为空、打印队列元素等相关操作技巧,需要的朋友可以参考下
    2020-02-02
  • java实现静默加载Class示例代码

    java实现静默加载Class示例代码

    这篇文章主要给大家介绍了关于java实现静默加载Class的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用java具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-10-10
  • SpringBoot中使用Cookie实现记住登录的示例代码

    SpringBoot中使用Cookie实现记住登录的示例代码

    这篇文章主要介绍了SpringBoot中使用Cookie实现记住登录的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • Java反应式框架Reactor中的Mono和Flux

    Java反应式框架Reactor中的Mono和Flux

    这篇文章主要介绍了Java反应式框架Reactor中的Mono和Flux,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-07-07
  • JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析

    JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析

    这篇文章主要介绍了JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析及简单处理方法,文中给大家介绍了javaweb中dbutils的使用,需要的朋友可以参考下
    2017-12-12
  • 详解Java中JSON数据的生成与解析

    详解Java中JSON数据的生成与解析

    今天给大家带来的是关于Java的相关知识,文章围绕着Java中JSON数据的生成与解析展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • springboot @Controller和@RestController的区别及应用详解

    springboot @Controller和@RestController的区别及应用详解

    这篇文章主要介绍了springboot @Controller和@RestController的区别及应用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Mybatis中注入执行sql查询、更新、新增及建表语句案例代码

    Mybatis中注入执行sql查询、更新、新增及建表语句案例代码

    这篇文章主要介绍了Mybatis中注入执行sql查询、更新、新增以及建表语句,主要说明一个另类的操作,注入sql,并使用mybatis执行,结合案例代码详解讲解,需要的朋友可以参考下
    2023-02-02
  • Java中的ReadWriteLock读写锁详解

    Java中的ReadWriteLock读写锁详解

    这篇文章主要介绍了Java中的ReadWriteLock读写锁详解,ReadWriteLock也是一个接口,提供了readLock和writeLock两种锁的操作机制,一个资源可以被多个线程同时读,或者被一个线程写,但是不能同时存在读和写线程,需要的朋友可以参考下
    2023-12-12

最新评论