jstorm源码解析之bolt异常处理方法

 更新时间:2017年08月04日 09:03:03   投稿:jingxian  
下面小编就为大家带来一篇jstorm源码解析之bolt异常处理方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧

问题

用过storm或者jstorm的都知道,如果在bolt代码中发生了没被catch住的异常,所在worker进程会退出。本文就从源码角度分析一下具体设计,其实并不是“有异常然后进程崩了”这么简单。

实质

我们先看BasicBoltExecutor的源码:

  public void execute(Tuple input) {
    _collector.setContext(input);
    try {
      _bolt.execute(input, _collector);
      _collector.getOutputter().ack(input);
    } catch (FailedException e) {
      if (e instanceof ReportedFailedException) {
        _collector.reportError(e);
      }
      _collector.getOutputter().fail(input);
    }
  }

_bolt.execute(input, _collector) 就是执行我们自己编写的bolt里的excute方法。可以看到,在这里,只会catch storm自己定义的FailedException,并且发送fail消息,标记tuple处理失败, 其余异常则会被放过。

再外层是BoltExecutors的processTupleEvent方法:

    try {
      if (!isSystemBolt && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
        backpressureTrigger.handle(tuple);
      } else {
        bolt.execute(tuple);
      }
    } catch (Throwable e) {
      error = e;
      LOG.error("bolt execute error ", e);
      report_error.report(e);
    }

在这里,所有异常都会被catch住,但是只会进行report_error,并不会发fail消息,相关tuple只能等超时才能被标记为失败。

再来看report_error.report(e) 的具体实现,通过看构造函数,可以看到report_error是一个TaskReportErrorAndDie类,

  @Override
  public void report(Throwable error) {
    this.reporterror.report(error);
    this.haltfn.run();
  }

在这里,reporterror是一个AsyncLoopDefaultKill类

  @Override
  public void run() {
    JStormUtils.halt_process(1, "Async loop died!");
  }

这里就是整个过程的最终步骤了, JStormUtils.halt_process()方法会打印一条"Async loop died!"的日志后将worker进程杀死。

思考

通过代码可以出来,对于jstorm,“异常后worker退出”是一个故意设计出的特性,并非程序不健壮。猜测这一块的设计理念就是对于已知异常,开发人员自己捕获并重新抛出FailedException,使相应消息失败;未知异常则强制使进程直接失败退出,避免过度的catch导致问题被掩盖。

不过虽然话是这么说,对这个设计还是持保留意见,毕竟storm和普通的java程序不一样,storm的worker进程在退出后是会自动被重启的,所以这种异常处理方式并不能起到failfast的效果。

相反,worker的持续重启,还会带来一些其他问题。再一个,不主动将消息标为失败,而是等超时,如果设置的超时时间过长(当然超时时间太长也不合理),也会引入一些问题。比如说kafkaSpout, 一条消息没被ack之前是不会继续取后边的数据的,这样如果有一条数据需要等超时,同分区下的数据在这一个超时周期内,就都无法被处理了。

从另一方面来说,如果像FailedException一样处理其他所有异常,由于异常之后可以看到有数据fail,也并不会掩盖问题。

所以说,这一块的处理逻辑,个人感觉还是需要斟酌一下。

以上这篇jstorm源码解析之bolt异常处理方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot整合ShardingSphere5.x实现数据加解密功能(最新推荐)

    SpringBoot整合ShardingSphere5.x实现数据加解密功能(最新推荐)

    这篇文章主要介绍了SpringBoot整合ShardingSphere5.x实现数据加解密功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-06-06
  • Java通俗易懂系列设计模式之责任链模式

    Java通俗易懂系列设计模式之责任链模式

    这篇文章主要介绍了Java通俗易懂系列设计模式之责任链模式,对设计模式感兴趣的同学,一定要看一下
    2021-04-04
  • Java中Integer128的坑

    Java中Integer128的坑

    本文主要介绍了Java中Integer128的坑,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-03-03
  • 深入讲解Java 9中的九个新特性

    深入讲解Java 9中的九个新特性

    Java 8 发布三年多之后,即将快到2017年7月下一个版本发布的日期了。 你可能已经听说过 Java 9 的模块系统,但是这个新版本还有许多其它的更新。 这里有九个令人兴奋的新功能将与 Java 9 一起发布。需要的朋友可以参考学习,下面来一起看看吧。
    2017-05-05
  • 如何彻底删除SVN中的文件和文件夹(附恢复方法)

    如何彻底删除SVN中的文件和文件夹(附恢复方法)

    在SVN中如果删除某个文件或文件夹也可以在历史记录中进行找回,有的时候需要彻底删除某些文件,即不希望通过历史记录进行恢复,需要在服务器上对SVN的数据进行重新整理
    2014-08-08
  • Java实现一个简单的线程池代码示例

    Java实现一个简单的线程池代码示例

    线程池是管理线程的一个池子,通过阻塞队列管理任务,主要参数包括corePoolSize、maximumPoolSize、keepAliveTime等,这篇文章主要介绍了Java实现一个简单的线程池的相关资料,需要的朋友可以参考下
    2024-09-09
  • Spring中的构造注入

    Spring中的构造注入

    这篇文章主要介绍了Spring中的构造注入,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Java反射如何修改private final成员变量值

    Java反射如何修改private final成员变量值

    这篇文章主要介绍了Java反射如何修改private final成员变量值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • Java版C语言版简单使用静态语言实现动态数组的方法

    Java版C语言版简单使用静态语言实现动态数组的方法

    本文给大家分享java版和C语言版简单使用静态语言实现动态数组的方法,非常不错,具有参考借鉴价值,需要的朋友参考下吧
    2017-10-10
  • Java实现HashMap排序方法的示例详解

    Java实现HashMap排序方法的示例详解

    这篇文章主要通过一些示例为大家介绍了Java对HashMap进行排序的方法,帮助大家更好的理解和使用Java,感兴趣的朋友可以了解一下
    2022-05-05

最新评论