Apache Omid TSO 组件源码实现原理解析

 更新时间:2024年07月22日 10:41:16   作者:FlyingZCC  
这篇文章主要介绍了Apache Omid TSO 组件源码实现原理解析,本文给大家介绍的非常详细,感兴趣的朋友一起看看吧

Apache Omid TSO 组件实现原理

作用

独立进程,处理全局事务之间的并发冲突。

流程

TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler

总体流程

thread1
    TSOChannelHandler#channelRead
        AbstractRequestProcessor#timestampRequest 接收 client 请求,创建 RequestEvent 并 publish
thread2
    AbstractRequestProcessor#onEvent 处理 RequestEvent 请求
        AbstractRequestProcessor#handleRequest
            PersistenceProcessorImpl#addTimestampToBatch 创建 PersistEvent,当 batch 满了发送事件
thread3
    PersistenceProcessorHandler#onEvent 持久化事件处理

TSOChannelHandler

继承自 Netty 的 ChannelInboundHandlerAdapter,用于处理 TSO 的入站请求

channelRead

委托 requestProcessor 创建 timestampRequest 和 commitRequest 请求事件。

AbstractRequestProcessor

处理 timestamp 和 commit 事件。

onEvent

处理 RequestEvent 事件,按照事件类型派发给 handleTimestamp 和 handleCommit 方法进行处理。

handleTimestamp

1.通过 timestampOracle 获取下一个时间戳;
2.PersistenceProcessorImpl#addBatch 事件添加到 batch,但是后续对 timestamp 请求不会额外处理。

handleCommit

主要通过 hasConflictsWithCommittedTransactions 判断 writeSet 和 CommitHashMap 里是否有事务写冲突,如果没有则可以提交事务,分配 commitTimestamp。

private void handleCommit(RequestEvent event) throws Exception {
    long startTimestamp = event.getStartTimestamp(); // startTimestamp
    Iterable<Long> writeSet = event.writeSet(); // 写入集,存储的是 cellIds
    Collection<Long> tableIdSet = event.getTableIdSet();
    boolean isCommitRetry = event.isCommitRetry();
    boolean nonEmptyWriteSet = writeSet.iterator().hasNext(); // 检查写集合是否为空,即事务是否有写操作
    if (startTimestamp > lowWatermark &&
        !hasConflictsWithFences(startTimestamp, tableIdSet) &&
        !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) { // 检查事务是否满足提交条件,通过 hasConflictsWithCommittedTransactions 判断是否有事务写冲突
        // 可以进行事务提交
        long commitTimestamp = timestampOracle.next(); // 获取提交时间戳
        Optional<Long> forwardNewWaterMark = Optional.absent();
        if (nonEmptyWriteSet) { // 写集合非空
            long newLowWatermark = lowWatermark;
            for (long r : writeSet) { // 遍历写集合中的每个元素,更新其最新的写入时间戳,并计算新的低水位线
                long removed = hashmap.putLatestWriteForCell(r, commitTimestamp); // 更新 cellId 对应的 commitTimestamp, 返回之前的 oldest commitTimestamp
                newLowWatermark = Math.max(removed, newLowWatermark); // 更新低水位线
            }
            if (newLowWatermark != lowWatermark) { // 更新低水位线
                lowWatermark = newLowWatermark;
                forwardNewWaterMark = Optional.of(lowWatermark);
            }
        }
        forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);  // 持久化 commit 请求
    } else { // 事务不满足提交条件
        if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
            forwardCommitRetry(startTimestamp, c, event.getMonCtx());  // 若是提交重试,再次检查是否已提交以避免因响应延迟导致的重复提交
        } else {
            forwardAbort(startTimestamp, c, event.getMonCtx()); // 否则,中止事务
        }
    }
}

CommitHashMap

通过 LongCache 缓存 cellId -> lastCommittedTimestamp 的映射。

getLatestWriteForCell 方法:
根据 cellId 获取 lastCommittedTimestamp。

putLatestWriteForCell 方法:
更新 cellId 对应的 lastCommittedTimestamp。

LongCache

缓存 cellId -> lastCommittedTimestamp 的映射。

get 和 set 操作都是先将原始 cellId 进行 hash 操作找到位置,所以可能存在冲突。

set

更新 cellId 对应的 lastCommittedTimestamp。

public long set(long key, long value) {
    final int index = index(key); // cellId 取模返回下标,可能会冲突
    int oldestIndex = 0;
    long oldestValue = Long.MAX_VALUE;
    for (int i = 0; i < associativity; ++i) {
        int currIndex = 2 * (index + i); // 计算 key 下标
        if (cache[currIndex] == key) { // 相同事务 cellId, 替换场景
            oldestValue = 0;
            oldestIndex = currIndex;
            break;
        }
        if (cache[currIndex + 1] <= oldestValue) { // 没找到相同的key.通过和 oldestValue 比较会将最小的 timestamp 剔除
            oldestValue = cache[currIndex + 1];
            oldestIndex = currIndex;
        }
    }
    // 替换最旧的键值对,将其更新为新的键值对
    cache[oldestIndex] = key;
    cache[oldestIndex + 1] = value;
    return oldestValue;
}

get

获取 cellId 对应的 lastCommittedTimestamp,找不到则返回 0.

public long get(long key) {
    final int index = index(key);
    for (int i = 0; i < associativity; ++i) { // associativity 里存储的元素key应该是相同的
        int currIndex = 2 * (index + i); // 计算 key 的下标
        if (cache[currIndex] == key) { // 找到 cache key
            return cache[currIndex + 1]; // 返回对应的 value
        }
    }
    return 0;
}

PersistenceProcessorImpl

将 startTimestamp 和 commitTimestamp 放入 batch.

addCommitToBatch

创建 event,添加到 current batch
如果 current batch is full
    triggerCurrentBatchFlush

triggerCurrentBatchFlush

创建 PersistBatchEvent 并发送事件

PersistenceProcessorHandler

处理上面 PersistenceProcessorImpl 发送过来的事件,进行持久化处理。

onEvent

实际上只处理 commit 事件,会创建 put 对象将事务信息持久化到 hbase 的 commitTable (OMID_COMMIT_TABLE).

HBaseCommitTable

构造方法: 根据 HBaseCommitTableConfig 配置初始化

到此这篇关于Apache Omid TSO 组件源码实现原理的文章就介绍到这了,更多相关Apache Omid TSO 组件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • ubuntu下的虚拟环境中安装Django的操作方法

    ubuntu下的虚拟环境中安装Django的操作方法

    这篇文章主要介绍了ubuntu下的虚拟环境中安装Django的操作方法,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-09-09
  • CentOS系统Maven安装教程分享

    CentOS系统Maven安装教程分享

    本文介绍了如何在CentOS系统中安装Maven,并提供了一个简单的实际应用案例,安装Maven需要先安装Java和设置环境变量,Maven可以自动管理项目的构建、报告和文档
    2025-02-02
  • Linux+php+apache+oracle环境搭建之CentOS下安装Oracle数据库

    Linux+php+apache+oracle环境搭建之CentOS下安装Oracle数据库

    研究了两天Linux下安装Oracle,重装了两次虚拟机,终于安装成功。很有收获的。记录下安装过程。大神们如有更好的方式,请联系我!
    2014-08-08
  • 在 Linux 上使用 Multitail命令的教程

    在 Linux 上使用 Multitail命令的教程

    MultiTail是一个开源的ncurses的实用工具,可用于在一个窗口或单一外壳,显示实时一样的尾巴命令,该命令拆分控制台为更多子窗口的日志文件的最后几行。这篇文章主要介绍了在 Linux 上使用 Multitail命令的教程,需要的朋友可以参考下
    2019-12-12
  • 详解git中配置的.gitignore不生效的解决办法

    详解git中配置的.gitignore不生效的解决办法

    这篇文章主要介绍了详解git中配置的.gitignore不生效的解决办法的相关资料,这里提供解决办法希望能帮助到大家,需要的朋友可以参考下
    2017-09-09
  • Linux proc目录下子文件或子文件夹的作用

    Linux proc目录下子文件或子文件夹的作用

    这篇文章主要介绍了Linux proc目录下子文件或子文件夹的作用,以及读取这些信息的实际操作命令,需要的朋友可以参考下
    2014-03-03
  • DDNS 的工作原理及其在 Linux 上的实现

    DDNS 的工作原理及其在 Linux 上的实现

    DDNS (Dynamic DNS) 扩展了 DNS 将客户端 IP 与其域名进行静态映射的功能,它可以将同一域名实时地解析为不同的动态 IP,而不需要额外的人工干预
    2016-09-09
  • linux两台服务器实现自动同步文件

    linux两台服务器实现自动同步文件

    这篇文章主要介绍了linux两台服务器实现自动同步文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Linux命令行快速技巧之定位一个文件的方法

    Linux命令行快速技巧之定位一个文件的方法

    Linux 命令行专门设计了很多非常有用的命令行工具在你的电脑上查找文件。下面我们看一下它们其中三个:ls、tree 和 find。 感兴趣的朋友跟随小编一起看看吧
    2018-11-11
  • Linux下activeMQ的启动和停止命令方式

    Linux下activeMQ的启动和停止命令方式

    文章介绍了在Linux环境下启动和停止Apache ActiveMQ的步骤,启动前需要确保服务未运行,使用`./activemq start`命令启动,停止时使用`./activemq stop`命令,启动后可以通过访问`http://127.0.0.1:8161/admin/`来验证服务是否成功启动
    2024-12-12

最新评论