博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spout详解
阅读量:4310 次
发布时间:2019-06-06

本文共 2739 字,大约阅读时间需要 9 分钟。

spout放在每个executer执行,我们先从spoutExecutors的初始化开始往下看,spoutExecutors是在一个worker中管理其中的tasks,在SpoutExecutors的构造函数中初始化一些组件:taskId,topologyId,spout等,在这个线程中,除了一些常见的属性,可以看到还会去创建并设置两个对象,将待执行的task信息传入:
1、TaskTransfer
2、TaskHeartbeatTrigger
 
构造完成之后,init方法进行一些初始化,在这里执行spout的open方法同时进行事件注册:
this.spout.open(storm_conf, userTopologyCtx, outputCollector);     LOG.info("Successfully open SpoutExecutors " + idStr);     taskHbTrigger.register();     int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);     // wait other bolt is ready     JStormUtils.sleepMs(delayRun * 1000);     if (taskStatus.isRun()) {
spout.activate(); } else {
spout.deactivate(); } LOG.info(idStr + " is ready "); }
在spout调用open初始化完成之后,spout需要根据配置文件每10秒读取一次数据,这个是怎么实现的呢?发现在调用open之后,会调用taskHbTrigger.register(),taskHeartbeatTrigger是一个TimerTrigger的继承类,他会根据配置,通过ScheduledExecutorService设置每隔一段时间执行task。
 

Spout.emit过程:

真正执行emit的是SpoutCollector.sendMsg
public List
sendMsg(String out_stream_id, List
values, Object message_id, Integer out_task_id, ICollectorCallback callback) {
final long startTime = emitTotalTimer.getTime(); try {
boolean needAck = (message_id != null) && (ackerNum > 0); //needAck满足的两个条件 Long root_id = getRootId(message_id);//如果需要ack,随机生成rootId,并对rootId做一次去重校验 java.util.List
out_tasks; if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values, null, root_id); } else {
out_tasks = sendTargets.get(out_stream_id, values, null, root_id); } if (out_tasks.size() == 0) {
// don't need send tuple to other task return out_tasks; } List
ackSeq = new ArrayList
(); for (Integer t : out_tasks) {
MessageId msgid; if (needAck) {
// Long as = MessageId.generateId(); Long as = MessageId.generateId(random); msgid = MessageId.makeRootId(root_id, as); ackSeq.add(as); } else {
msgid = MessageId.makeUnanchored(); } TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid); tp.setTargetTaskId(t); transfer_fn.transfer(tp); } sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck); if (callback != null) callback.execute(out_tasks); return out_tasks; } finally {
emitTotalTimer.updateTime(startTime); } }
 

转载于:https://www.cnblogs.com/harlenzhang/p/5544964.html

你可能感兴趣的文章
20141103
查看>>
HTML <hr> 标签定义和用法
查看>>
使用File查询出所有的文件和目录的信息
查看>>
.NET Micro Framework V4.2 QFE2新版本简介
查看>>
Vue.js学习笔记(2)vue-router
查看>>
python中函数和方法的区别
查看>>
(转载)java线程 - 线程唤醒后并被执行时,是在上次阻塞的代码行重新往下执行,而不是从头开始执行...
查看>>
【codeforces 483B】Friends and Presents
查看>>
【codeforces 767B】The Queue
查看>>
【codeforces 190C】STL
查看>>
041魔法方法:构造和析构
查看>>
7月/暑假集训总结1
查看>>
通悉IDC刘雨生带您查看BGP线路服务器的优势
查看>>
js在html中的三种写法
查看>>
数据切分——Atlas读写分离Mysql集群的搭建
查看>>
学习Learn Python The Hard Way 前言中的一段话,可与君共勉
查看>>
步步为营-79-缓存
查看>>
二分图匹配
查看>>
vim基本操作键盘图
查看>>
Bios-》主引导记录(MBR)-》启动文件-》操作系统
查看>>