spout放在每个executer执行,我们先从spoutExecutors的初始化开始往下看,spoutExecutors是在一个worker中管理其中的tasks,在SpoutExecutors的构造函数中初始化一些组件:taskId,topologyId,spout等,在这个线程中,除了一些常见的属性,可以看到还会去创建并设置两个对象,将待执行的task信息传入:
构造完成之后,init方法进行一些初始化,在这里执行spout的open方法同时进行事件注册:
this.spout.open(storm_conf, userTopologyCtx, outputCollector); LOG.info("Successfully open SpoutExecutors " + idStr); taskHbTrigger.register(); int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf); // wait other bolt is ready JStormUtils.sleepMs(delayRun * 1000); if (taskStatus.isRun()) { spout.activate(); } else { spout.deactivate(); } LOG.info(idStr + " is ready "); }
在spout调用open初始化完成之后,spout需要根据配置文件每10秒读取一次数据,这个是怎么实现的呢?发现在调用open之后,会调用taskHbTrigger.register(),taskHeartbeatTrigger是一个TimerTrigger的继承类,他会根据配置,通过ScheduledExecutorService设置每隔一段时间执行task。
Spout.emit过程:
真正执行emit的是SpoutCollector.sendMsg
public List sendMsg(String out_stream_id, List