大型互联网平台日志系统(FileBeat+Kafka+LogStash+Elastic+Storm+MySql)小白的入门实战篇

大型互联网平台日志系统(FileBeat+Kafka+LogStash+Elastic+Storm+MySql)小白从入门实战篇
大型互联网平台日志系统(FileBeat+Kafka+LogStash+Elastic+Storm+MySql)小白从入门实战篇

原计划昨天的公众号图文直播因公司年会抽奖而没有进行,今天在自己的电脑上实战了FileBeat+Kafka+LogStash+Elastic+Storm+MySql的环境搭建,由于今天的实验过程并不顺利,所以整体实验方案砍掉了Storm、MySql,这将是明天的套餐。

作为一个有态度的资深公众号运营人员,写一篇文章不仅会为了达到原创标准凑够300字,同时也会把本次实战的心得与经验放到前面,让小伙伴能以最快的速度收获一些经验性的东西。同时,实战是一步一步操作,并跑并跑起来,是在坑里爬了很久,才写出来的,如果你有兴趣可以照着做一篇,是可以跑起来。看不明白的,可以留言问,包教包会~

完整的读完本实验总结,可以有以下收获:

  1. 能搭建互联网平台日志收集系统
  2. 能知道如何处理开源产品配置报错

本次实战的小心得如下:

  1. 玩开源产品,版本多,不要怕提示出错,网上多找找,一般都能找到答案。
  2. 这不是在堆代码,只是使用开源的产品,不需要编程功力。
  3. 先了解一下相关产品的设计思路,用得思路会更好些

本次实验用到的开源产品:

elasticsearch-6.5.4

elasticsearch-head

filebeat-6.5.4-windows-x86_64

kafka_2.12-2.1.0

logstash-6.6.0

zookeeper-3.4.12

autovisit.bat ( 这个一小段脚本,用来不断访问页面,以产生访问日志,自己DIY也可以,很简单。加头我放到Github上吸粉

本次实现新增加了Kafka\LogStash\zookeeper,先来看一下实现录的小视频,直观的感受一波操作。

关注公众号 Yuema约吗 可以查看录制的视频

一、zookeeper

kafka用zookeeper来协调集群节点,apache下很多开源产品都是用zookeeper来协调集群节点。知道这么回事就好。本次实战就不讲原理了,感兴趣的小伙伴可以去查资料。跑命令 zkServer.cmd 启动!配置文件:zookeeper-3.4.12\zookeeper-3.4.12-1\conf\zoo.cfg [把zoo_sample.cfg修改成zoo.cfg],用单机模拟的集群,所以后面有带了三个节点配置。纯体验的话,可以去掉,直接跑单机,看到的效果是一样的。

zookeeper_config
zookeeper_config

#存储内存中数据库快照的位置,如果不设置参数,更新事务日志将被存储到默认位置。

dataDir=../zookeeperData

#日志文件的位置

dataLogDir=../zookeeperLog

#mock a cluster

server.1=127.0.0.1:12888:1388

server.2=127.0.0.1:12889:1389

server.3=127.0.0.1:12887:1387

zookeeper_three_node
zookeeper_three_node

偏好配置还需要zookeeper-3.4.12\zookeeper-3.4.12-1\zookeeperData中加一个无扩展名的myid,三个节点可分别取值为1,2,3

zookeeper_three_node_file
zookeeper_three_node_file
zookeeper_three_node_file_myid
zookeeper_three_node_file_myid

二、Kafka

三个节点,把文件压解出来,先配置好一个,再复制几个,再微调一下。结构如下。因为是单机模拟集群,所以就这么干啦。

打开配置kafka_2.12-2.1.0\kafka_2.12-2.1.0-2\config\server.properties

重点配置敲黑板,注意,单机三个节点的话,端口改成9092,9093,9094,日志文件如果像我一样傻傻的用绝对路径的话,一定要每个点节都加上自己的编号区分开,不然会一直报错!

关键配置

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs-2

启动kafka

kafka-server-start.bat ../../config/server.properties

注意:这里用的是相对路径,跑LogStash的时候也要这么干,网上找到一个教程没有这么干,一直报错

相关命令送上来:

创建主题

kafka-topics.bat –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic dqtest

罗列kafka主题

kafka-topics.bat –list –zookeeper 127.0.0.1:2181

消费topic

kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic dqtest –from-beginning

敲黑板:不同的版本命令有所差异,如果跑不起来,可以核对一下版本对不对。同时,除了搜索,还可以打开config中的文件配置,查看睦一下到底是用的bootstrap-server还是zookeeper,嗯~就是这样子,算是很容易掉坑里的环节。

生产topic消息

kafka-console-producer.bat –broker-list localhost:9092 –topic dqtest

三 LogStash

3.1 填坑之路:报错找不到主类

解决:logstash-6.6.0\logstash-6.6.0\bin\logstash.bat

将55行的  %CLASSPATH% 修改 “%CLASSPATH%”

3. 启动命令

logstash -f ../config/first-pipeline.conf –config.reload.automatic

因为找到的资料用的是first-pipeline,所以也跟着敲了,唯一要注意的是,这里记得加上../config/,不然找不到配置文件的。

见证奇迹的时刻来了,折腾大半天,最后将FileBeat->Kafka->LogStash->Elasticsearch配通了,结果会是什么样子的呢?

哈哈,就是这样子的啦~今天的实战说真的不太顺利,遇到的小坑都有点意外,其中还需到报路径太长,之前是D:\DQ\xxxx\0008.Job\100010.日志系统\filebeat-6.5.4-windows-x86_64,报错后,修改成了D:\BigData\filebeat-6.5.4-windows-x86_64

明天的大餐是将Storm+MySql配置进来,Storm从Kafka消费数据,将运算结算存到mysql,关注公众号就能收看明天的大型实时日志系统实验总结!期待明天相聚在约吗图文直播间。

巧妙拆分bolt提升Storm集群吞吐量

FileBeat+ElasticSearch+Kibana 实时日志系统搭建从入门到放弃 小白玩大数据日志分析系统经典入门实操篇

 

大数据实时日志系统搭建
大数据实时日志系统搭建

距离全链路跟踪分析系统第二个迭代已经有一小阵子了,由于在项目中主要在写ES查询\Storm Bolt逻辑,都没有去搭建实时日志分析系统,全链路跟踪分析系统采用的开源产品组合为FileBeat、Kafka、LogStash、Elastic、Storm,外加自主前端、自定义日志。今天挤出时间,选用FileBeat、ElasticSearch、Kibana搭建了一个实时日志系统。搭建之前,看了一下Elastic Stack产品组织中的LogStash,一般建议FileBeat输出到LogStash,再由LogStash到ElasticSearch,今天的实操并没有使用到kafka、LogStash、Storm,这是明天的主餐,感兴趣的可以关注公众号,明天接着看。

看完本篇文章,您可以有以下收获:

a)您能够搭建一个实时的日志分析系统,并能知道如何处理遇到的问题。

b)您将了解Elastic+Kibana在日志分析、商业大屏、航空看板、商业分析的应用,并能结合自身公司的业务情况,有选择的为公司实施大数据,提升公司的数据价值挖掘能力。

c)看不懂,包教包会

作为一篇有态度的技术文章,先把本次实操感受写在前面。

一、ElasticSearch-head揪心之作

1.1 揪心之作

部署完ElasticSearch,没有UI界面,这跟很多开源产品的现状很像,apache出品的不少开源产品,好多UI也是丑丑的。elasticsearch-head作为ElasticSearch UI的空白,算是一个惊喜,在Github上获得了5000多个星星,梦想中的星星数量啊。揪心的是,用起来,就是有点不爽啊,滚动条拖到尾部才能找到,也是丑丑的~复杂查询也是不够好用~

1.2 部署经验

npm install跑不成功,yarn install成功跑起来。最好将源改为国内大厂源,不然,应该是直接失败吧。

1.3 开源收获粉丝还是存在很多机会

ElasticSearch-head弥补了ElasticSearch UI确实,还有优化空间。kibana也有完善空间。

二、安全问题也许就是网上教程传出来的

ElasticSearch默认不允许跨域,网上查到的解决处理是

http.cors.enabled: true

http.cors.allow-origin: “*”

对于正式使用的话,还是有些风险的。自己玩弄着完是可以,真正使用的话,要限定一下。这招防君子不防小人。

三、完美的不可用产品kibana

这么说可能会引起kibana粉丝的不爽,对于一个全链路跟踪系统来讲,kibana真的是满足不了,kibana有着非常漂亮的UI,灵活的可视化呈现,却无法适应我们的业务场景,或者说,就着kibana的话,会付出更多的二次开发成本。kibana有着丰富的可视化组件,却没有我们所需的调用图。全链路跟踪分析系统没有使用kibana,而是自主研发的前端。

kibana有对于现有产品IIS\Apache\Mysql\服务器等等,都有数据采集组件,可以对系统所采用的软件进行监控。对于特定的业务,需要二次开发,比较复杂的业务的话,需要在kibana基础上进行二次开发,会比较麻烦。

四、FileBeat只是Beat产品中的一个

今天看资料才发现,Beat产品组中还有很多,FileBeat只是其中一个,目前完整组合是Filebeat,Metricbeat,Packetbeat,Winlogbeat,Auditbeat,Heartbeat,Functionbeat!全方位收集日志,监控无所不及。我们只用FileBeat~

五、跑开源产品不要怕报错

虽然会吓一跳,但是仔细看提示,还是能看到错误提示中给出的解决办法。

5-1. 跑FileBeat收集IIS Log时,会发现,ElasticSearch中查到的结果显示正则解析失败!

因为默认的grok规则配置是记录所有IIS字段,而IIS默认的log没有输入所有的字段,打造IIS站点日志配置,将其他没有输出的字段都勾选上。

尴尬:grok的正则写法丢到Match Tracer(一个正则调试工具)中,直接懵圈了,放一条日志进去匹配不到。字段对不上。

配置现场:

filebeat-6.5.4-windows-x86_64\module\iis\access\ingest\default.json

除了iis module,还有很多丰富的module备选,apache2,auditd,elasticsearch,haproxy,icinga,iis,kafka,kibana,logstash,mongodb,mysql,nginx,osquery,postgresql,redis,suricata,system,traefik

5-2.使用IIS module,要开启相应的插件

不要慌,报错上面有提示,跑2条命令。玩开源啊,要有看报错的准备,从报错中看答案。看不出来再去搜索~

六、where is Ambari

Apache Ambari能傻瓜式的部署安装Hadoop,Elastic系列产品少了一个类似的产品。对于开源届来讲,不了一个统一的傻瓜式部署平台。如果有的话,请留言告诉我哈~

七、where is  智能调优

一堆产品组合成一个解决方案,人肉调优好心碎,也不专业。

八、 Kibana机器学习初感

初看,是看不出什么内涵,就罗列了一堆维度,看不出什么机器学习的能力。好像一个营销噱头,也可能我没有看到kibana的机器学习功力。

能坚持看完吐槽的小伙伴,真的是铁丝。接下来会将本次实验的步骤截图展示出来。很多都是网上可以查到的!也不要太失望,我截了不少效果图,对于不想动手搭建环境的老铁,可以直接看效果图,再决定要不要玩起来。

部署说明,本次搭建的实时日志分析系统用到了FileBeat,ElasticSearch,Kibana,都可以从官方网站www.elastic.co直接下载。elasticsearch-head请从github上下载。基本上都是开箱即用的,只有少部分要调整,依据我的部署说明来,应该能通关,如有遇到解决不了的问题,也要可以在公众号中留言提问,我会解救你。

部署步骤:

前置:安装jdk,我用的1.8版本,自行安装解决。

  1. 安装elasticsearch-6.5.4
  2. elasticsearch-head
  3. kibana-6.5.4-windows-x86_64
  4. filebeat-6.5.4-windows-x86_64

不要怕,都是解压出来就能跑

大数据实时日志系统搭建文件目录
大数据实时日志系统搭建文件目录

1.安装elasticsearch-6.5.4

解压出来,跑 elasticsearch-6.5.4\bin\elasticsearch.bat

跑完,可以查看localhost:9200,能查看到类似以下界面!纯接口,人类表示无法看,于是要安装一个elasticsearch-head,类似一个管理查看的。

elasticsearch_9200安装成功效果图
elasticsearch_9200安装成功效果图

2.安装elasticsearch-head

将https://github.com/mobz/elasticsearch-head

克隆或者下载到本店,下载一个node.js,安装好

因为这个东东需要用到node.直接到node.js上下载一个,一路next,就安装好了node.js。

cmd进入到elasticsearch-head的目录

依次输入

yarn install

yarn start

命令说明:yarn install是恢复elasticsearch-head引用到的包,yarn start是用于启动。当然,也可以使用npm install,npm start,最终效果是一样的,如果你没有安装yarn的话,可以用npm。因为资源可能在国外,所以执行命令会慢,等一等,如果失败的话,请更换成国内的源。(可以搜索,也可以来问我哈)

见证奇迹的时刻来了:

elastic-header-cors_problem
elastic-header-cors_problem

报错了,提示跨域了。解决方案:修改elasticsearch-6.5.4\config\elasticsearch.yml,增加:

http.cors.enabled: true

http.cors.allow-origin: “*”

备注:这里用的是*,如果用于生产环境,建议写具体的URI

处理完的效果图

elastic-header-cors_ok
elastic-header-cors_ok

3.安装kibana-6.5.4-windows-x86_64

解压后执行命令  kibana-6.5.4-windows-x86_64\bin\kibana.bat

进入http://localhost:5601/ 查看kibana,进入是空白的,kibana自带了一些示例数据,在里面点点点就找到了,进入Dashboards

  • eCommerce
  • Logs
  • Flights

官方示例数据是三大领域,商业、日志、航班,覆盖了多数场景,可以照着官方的示例完成自身的业务需求。

4.filebeat-6.5.4-windows-x86_64

Filebeat收集IIS有点小插曲,不碍事。

将filebeat-6.5.4-windows-x86_64\modules.d\iis.yml.disabled 改名为 iis.yml

跑不起来,但是有提示。就像问道游戏,每个任务都会有明确的提示,不会晕菜。

小插曲来了,elaticsearch中的抓到的日志,message中显示解析失败,日志没有按一个一个字段存下来,而是一整条,看起来怪怪的。怎么破?

到IIS中,选择站点,右则会有日志,点开,选择字段,全部选上。

配置完,就能正确收到IIS日志信息了。

这就是我选这个图当封面的原因,坑啊。

怎么不断产生访问日志?

写一个.bat文件,多点开几个,跑着就好了。

:S

echo hello world

curl http://www.yue.ma

goto S

效果图

安装部署流程讲完了,不懂的地方可以随时留言!喜欢看效果图的小伙伴有福了,我截了不少kibana的效果图,一起来看看吧!效果图后面,还会分析一下,为什么kibana效果也不差,而且还是开源的,我们为什么不用kibanan

Kibana用来全方位监控服务器、数据库、网络状态、商业数据等等都是不错的选择,但是我们的全链路跟踪分析系统需求有些特殊,需要对各站点日志请求进行分析,归到不同的业务上,并依据日志生成调用关系图。这就是实时日志系统搭建从入门到放弃的来源,因为明天会升级方案!

明天将介绍FileBeat+Kafka+LogStash+ElasticSearch+Storm,全链路分析系统的大数据日志环境搭建!敬请关注明天的大数据日志分析实战图文直播!

微服务 vs NeuronStorm 一次微服务的实战真实感悟 简化通道化身为神经元玩法 减少微服务中通道代码占比 节省代码时长

最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理多个工程代码,类似于【通道代码】,本身并不是业务代码,占的比例还不少!构想一个NeuronStorm,由平台完成通道的管理与维护,消灭通道代码,同时可视化编程
最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理多个工程代码,类似于【通道代码】,本身并不是业务代码,占的比例还不少!构想一个NeuronStorm,由平台完成通道的管理与维护,消灭通道代码,同时可视化编程

最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理webapi<—>Service<—->Agent<—>SDK<—->webapi<—–>bll<—->memcache,建立微服务通道花了一些时间,占了一定比例的开发时间与代码工作,而真正业务代码量是比较少。

微服务的堆栈要处理webapi<--->Service<---->Agent<--->SDK<---->webapi<----->bll<---->memcache,涉及多个工程
微服务的堆栈要处理webapi<—>Service<—->Agent<—>SDK<—->webapi<—–>bll<—->memcache,涉及多个工程

微服务中通道代码能否做一些处理,以节省没有必要的时间占用呢?这是开发过程中一直在思考的问题。我想有以下两方面可以考虑:

  1. 代码生成器,对于微服务中各工程的代码,通道代码可以抽成模板或通过架构手段变成可生成的。当然这种玩法并没有改变微服务在通道代码上面的冗余。这种方案相对比较保守。但是对于现有工程来讲,不涉及大的变化,是比较好实施的。
  2. NeuronStorm,这种方案的话,是将代码逻辑封装到一个处理单元中,Neuron并不用关心自己在哪个业务领域处理哪个机器上。由NeuronStorm统一调度,由业务人员自行配置。这种方案下,Cell之间的通道是基于NeuronStorm统一实现,所以将微服务中的通道代码全部消灭掉了。
NueronStorm中的Nueron单元共享NeuronStorm通道机制,省掉通道代码工作量
NueronStorm中的Nueron单元共享NeuronStorm通道机制,省掉通道代码工作量

NeuronStorm的最小逻辑单元为Neuron,可以理解为逻辑单元,不同的Neuron可组合成业务流,通过组织Neuron单元形成业务流,使得代码逻辑流可以可视化呈现。

复杂的NueronStorm逻辑流程图
复杂的NueronStorm逻辑流程图

NeuronStorm是一个可视化的编程平台,依据设想,基于NeuronStorm的程序,只需要集中在Neuron单元中编写业务逻辑,通道代码由NeuronStorm集群控制。NeuronStorm借鉴了Apache Storm的Bolt概念,注重于可视化编程。

目前NueronStorm处于概念阶段,github的存储库已经建立起来,有兴趣的可以加入进来。

https://github.com/Lancker/NeuronStorm

巧妙拆分bolt提升Storm集群吞吐量

巧妙拆分bolt提升Storm集群吞吐量 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

项目背景介绍:

我们通过日志系统跟踪每个接口的运行状态,收发时间,平均速度,成功率,平均响应时长。日志被收集到了kafka,日志实时处理采用storm框架。一个请求有一条服务接收日志,一条服务器响应日志。需要通过日志实时处理,统计出每个接口的每分钟的指标,存到ES。

昨天的Topology图画成这个样子,如果有看过昨天的文章的话,应该还有印象。

Storm & kafka 日志统计代码流程转成多个Bolt

昨天写文章的时候,就思考要不要将CountBolt拆成MergeLogBolt加CountBolt,原来在一个CountBolt中即要做日志合并,又要做统计。今天整理代码的时候反复的思考,考虑到并行度的、代理管理维护、协作开发,多个维度,同时也考虑代码逻辑清晰度,果断拆开了。所以今天的图变成了:

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

如果细心看的话,每一个bolt都可以调整并行度了,可以依据每个bolt的处理速度配置合理科学的并行度。比如,要是EsBolt慢,可以增加EsBolt并行度,如果EsBolt并不是瓶颈所在,一个EsBolt也可以的话,可以配置为并行度为1。同时还可以考虑调整bolt之间的流控制,综合日志特点再调整并行度配置,从而提升集群处理的吞吐量。

目前我们拆分的Bolt细化到了PareBolt,MergeLogBolt,CountBolt,MergeCountBolt,EsBolt,划分原则如下:

1、单一职责,每个Bolt仅做一件事情。比如MergeBolt,CountBolt,原来是划到一个CountBolt,进行代码梳理的时候发现,MergeBolt需要维护一个组装log的HashMap,并要定时进行清理一定时间窗口未组装成功的孤立日志,组装成功的要及时从HashMap 中删除。而CountBolt维护统计数据是按timeKey,域名、业务名来组织 HashMap,时间窗口的处理逻辑完全不一样。对于MergeLog来讲,组一个删除一个,再定时清理孤立日志就好。而CountBolt则不一样。感觉夹杂在一起,给同事介绍起来也费劲,代码组织也不够清晰。拆成两个bolt之后,感觉代码逻辑更加清晰了。也更好的利用Storm提供的实时计算能力,可依据计算情况,为每个bolt设置相应的并行度,达到更高的吞吐量。简单的来讲,就是将一个大的逻辑,拆成一个一个小的逻辑,用tuple串起来,每个bolt都有明确的输入tuple,明确的输出tuple,也好进行代码逻辑问题定位。

2、协作原则,相对于在一整串代码逻辑中去写逻辑,别人读懂代码会更加费劲,往往是懵menbelity,而划成一个一个bolt之后,一个bolt的逻辑更加简单好懂。每个Bolt拿到tuple只做一件明确的事情,可以更好的并行开发。大家只要约定好输入的tuple,产出的tuple。比如,我们将Es操作划成一个Bolt,落地数据的小伙伴只要专注于Es操作,做合并的小伙伴只要专注于合并。哈哈,是不是很神奇。

收尾,这是一天下来的工作心得,对bolt的划分经验也是经过不断进行代码整理思考的结果。开始是在一个bolt里完成业务所需的所有逻辑,之后才拆成一个一个bolt来对待,并不是一上来就拆成一个一个bolt,第一天的时候比较担心日志无法成对的问题,所以都不敢拆,实验完GroupByField之后才敢拆。这个是有一个过程的。首先要知道业务要干什么,打算怎么实现,拆的话会引起什么问题,怎么解决,经过同事的指点与摸索,算是有了些心得,至少现在拆分出的几个bolt,基本上感觉是OK,已经是拆到一个合理的地步了。当然我也是刚玩Storm,如有好的bolt拆分心得,也欢迎指点,欢迎分享分享。

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

Storm & kafka 日志统计代码流程转成多个Bolt

Storm & kafka 日志统计代码流程转成多个Bolt

所谓一图击破迷团。我们使用kafkaSpout接上kafka消息队列消息,使用ParseBolt将kafka消息中的message拿出来,得到我们统计日志要使用的日志节点信息traceId(跟踪Id)、日志时间、日志类型等信息,ParseBolt仅干这件事,把拿到的日志节点信息emit出去!抵达下一个CoutBolt,采用了GroupByField进行数据流控制,将同traceId的 tuple送到一个Bolt处理。CountBolt处理会进行日志合并统计,这个CountBolt有点点肥大,目前还没有再拆,统计逻辑在这个CountBolt里面。达到一定周期后,会将自身的统计数据emit出去,抵达MergeBolt,MergeBolt只做合并,将CountBolt节点的数据进行合并,达到一定周期后,将合并好的数据emit出去,抵达EsBolt,EsBolt将结果写入ElasticSearch。

QA1:为什么我们要用TraceID进行流控制?

因为一条日志只记录接口收到的时间或响应时间,需合成一对,得到处理时长。

QA2:GroupByFeild怎么感觉理解起来很怪异?

是的,如果新接触Storm,会听到一堆新词汇,Spout,Bolt,Topology,minbus,supervisor,worker,task等等。感觉用路由来理解这里的GroupByFeild按字段分组数据流,可能会比较好理解,至少我是这么觉得的。

QA3:Bolt到底拆分成几个好?

其实,刚开始的时候,我是打算一个KafkaSpout,一个Bolt,先把业务逻辑整出来,再来拆。不管拆成多少个Bolt,完成业务所需要的逻辑步骤一个都不能少,不管怎么拆。一方面刚玩Storm,也得找感觉。写完整理逻辑后,再针对性感受一下拆开处理会有哪些问题,能不能拆?拆完之后有什么好处,拆成多个Bolt后,数据怎么合并,为什么要拆。反复推敲之后,得到大神指点,也考虑后续集群运行的计算资源使用率,得提高吞吐量是更好。将EsBolt单拿出来,主要考虑合并数据逻辑与Es操作逻辑本身就两个人写,拆开也好开发,逻辑也清晰。

QA4:还可以怎么拆?

CountBolt逻辑有两大逻辑,一是日志合二为一,二是请求计数,时长计数。日志合二为一,将收发时间组成对,得到单个请求时长,这个逻辑与计数可以相互独立出来。即CountBolt拆成ReformBolt+CountBolt,Reform仅对日志进行合二为一,CountBolt仅统计。想想是不是这样子?

QA5:并行度怎么调优?

这个暂时没有太多的经验,初步来看,EsBolt可能着重考虑一下,应该要做到批量插入。网上有小伙伴已经踩进一条一条插入到hbase超时的坑。Es看起来也类似。更详细的并行度分享,等我后续实战后再分享。

Storm让人开心的是,这种Bolt串起来的方式,很有趣,感觉非常适合做可视化编程控制,不仅仅是用在Storm上,可以将程序按这种思想拆起一个一个Bolt小球,通过可视化的界面来配置逻辑流程,相当于可视化编程或者可视化生成Storm Topology,想想都非常有意思。

今天的实战分享就到这里啦,每天都会分享工作中实战的点点滴滴,并不能保证自己当前的做法就是百分之百正确或合理,敬请指教。

Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm

storm hello world & kafka storm
storm hello world & kafka storm

Storm是一个实时计算框架,有开源的大神为我们搭好了平台,按照大神的玩法,Storm的作业是topology,而topolgy是由spout,blot组成,spout是取数据,blot是处理数据,一个topology由一个spout加多个blot组成。将topology丢到storm上就能跑起来,可以在本地模式下跑,也可以在集群模式下跑。

Storm的使用,可以查看小伙伴田海龙的经典Blog示例。一个经典的wordcount示例。这个示范了取数据到处理数据的过程。实际使用Storm的时候, 通常是Storm+kafka,按照我们公司的日志系统的情况来看,是这么搭配。

玩Storm能增值不?

当然可以,提升自身身价。同时,也可以考虑星火理财专业手段直接增值 。新技术能提升个人的技术身价,不过,玩Storm,玩到什么程度又是一个境界了。这个要看如何平衡了。一般日志成型后,可能也不会怎么动了,一直在不断的接触新东西,而storm的源代码读过否?

storm与财富增值
storm与财富增值

Storm难吗?

如果说使用storm,相对说讲,还是比较easy。比如用storm写一个word count,或者用storm读取kafka的消息。一般来讲,拿来用是比较简单。如果要去深入看storm,就另说了。

Storm如何集成Kafka?

其实有的时候找来找去,很多答案就在github.com上面,Storm的官方源码中,有kafka的集成示范代码,如果看不明白的话,其实也没有很大的关系。因为我们不还可以找国内小伙伴分享的一些经典。

我的经验分享,也是我们结合网上教程实践一些心得。我们知道topology是由spout与bolt组成的,那么跟kafka集成,必然要一个kafkaSpout,这个有现成的,网上的代码只需要实现一个Scheme,对kafka的输出进行转化,把字节流转成字符串。简单的说,kafkaspout别人都写好了。只需要写一个bytebuffer to string的方法。

小坑之一: String 转UTF8

网上的代码报错,找了一段bytebuffer to string替换掉

小坑之二:zookeeper

到底是跟kafka共用一个,还是独立一套zookeeper给storm用。这个问题纠结了一下,因为zookeeper要是每人上集群都配置一套的话,会有好多zookeeper,不过,老司机校友也是这么推荐的,那就听老司机的。单独给storm部署一的大套。插曲:同时听我要zookeeper的同事吓一跳,因为TA觉得是给kafka用的,不想给我这个Storm用。而我们今天只是想把storm与kafka跑通来,并不想再去部署一个zookeeper。哈哈,最好还是蹭的kafka的zookeeper!

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦

有的人写Blog,帖很多代码,都不带写些文字,也有的人喜欢写很多文字,看不到代码。我喜欢全二为一,尽量把自己的一些想法加在里面。还记录一下坑啊什么的,可以查看田海龙的一个入门级demo  http://www.tianhailong.com/?p=1358

WordCounter.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.youku.demo.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.youku.demo.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
    public void cleanup() {}
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordReader.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.youku.demo.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

TopologyMain.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.youku.demo;
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}

pom.xml:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>demo-storm</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

words.txt:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt

因为入门的wordcount单独展示的,会给人感觉没有太大的实用价值。将kafka与storm的放一起,可以更好的理解怎么与kafka集成。

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦。

赞助商小伙伴链接:

如何享受宜信星火金服宜心理财:

  1. 扫码二维码
  2. 通过宜信星火金服活动链接 http://www.ixinghuo.com/qcode.php?yixinqcode
  3. 通过宜信星火金服理财师店铺链接:https://xinghuo.yixin.com/yiidea
  4. 通过宜信星火金服理财师移动端邀请页面 https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
    5.通过宜信星火金服理财师店铺宜心理财团队短链接:

http://yixin.hk

http://yixin.ceo

http://yue.ma

  1. 通过宜信星火金服宜心理财团队网站页面

http://www.yixinlicai.com.cn

  1. 通过  宜信.公司 | 宜信.网络 | 宜信.net

FlexJS vs BaibianJS: Inject what I want 不用再替换js,直接注入想要的JS代码 更省事了

FlexJs
FlexJs

自从BaibianJS发布之后,一直在想,我们将目标网站的jquery替换的这种玩法,有点low,在当前项目上虽然用上了,并能成功用于当前项目的网页爬虫,也是针对项目中需要在jqury.js加载完后要立即拦截xhr。更多情况下,并不需要拿来拦截xhr,用BaibianJS就显得有些不恰当了,始终给人一种勉强的感觉,需要先查看网页用到了哪个js,再针对性的替换。于是,对BaibianJS进行了适当的调整,产生了新的FlexJS,两种使用场景还是有很大的区别。

BaibianJS主要应用于接口爬取,需要对请求进行拦截,所以使用替换,及时将拦截代码注入进去。

FlexJS主要用于增强网页功能,当页面完全加载完之后再注入。

两者都是Chrome Extension,都解除了跨域限制,直接加载到Chrome扩张插件中就可以使用。二话不说上代码。

代码结构

manifest.json

{

“name”: “FlexJS”,

“version”: “1.0”,

“description”: “FlexJS,FlexJS主要有两个功能,一是注入JS,实现任性注入。二是修改Access-Control-Allow-Origin,实现任性跨域”,

 

“content_scripts”:

[

{

“matches”:  [“<all_urls>”],

“js”: [  “js/content-script.js”]

}

],

 

 

“permissions”: [“webRequest”,  “webRequestBlocking”,

“http://*/*”],

“background”: {

“scripts”: [  “background.js”]

},

“web_accessible_resources”:  [“js/inject.js”],

“manifest_version”: 2

}

 

background.js

chrome.webRequest.onHeadersReceived.addListener(function(details)  {

details.responseHeaders.push({name:’Access-Control-Allow-Origin’,value:”*”});

console.log(details.responseHeaders)

return  {responseHeaders:details.responseHeaders};

},{urls:  [“<all_urls>”]},  [“responseHeaders”,”blocking”]);

 

 

JS/content-script.js

// 向页面注入JS

function  injectCustomJs(jsPath)

{

jsPath = jsPath || ‘js/inject.js’;

var temp =  document.createElement(‘script’);

temp.setAttribute(‘type’,  ‘text/javascript’);

// 获得的地址类似:chrome-extension://ihcokhadfjfchaeagdoclpnjdiokfakg/js/inject.js

temp.src =  chrome.extension.getURL(jsPath);

temp.onload = function()

{

// 放在页面不好看,执行完后移除掉

this.parentNode.removeChild(this);

};

document.head.appendChild(temp);

}

 

injectCustomJs();

 

JS/inject.js

console.log($(“title”).html());

 

FlexJS采用了互联网上的新鲜源代码组装而成,去掉了不相掉的代码,仅有两个实用功能,一是解除跨域限制,二是注入JS。inject.js是放飞梦想的地方,可以针对特定网站进行功能加强。Good luck with FlexJS!

Storm历险记之浅入浅出:Storm Hello World入门示例

俗话说一图胜千言万语,想了解Storm的话,先来看几张图,直观的了解一下Storm。图片有官方的图片,也有技术人自己画的图片,均来自互联网,在看代码之前先来简单的看一下图片。请快速的看一下图片,找一下感觉,如果一下子看不明白,其实也没有关系。图片流之后,会有一小段文字说明。本文适合Storm小白看,大神吐槽或在1秒内关掉。

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图1

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图2

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图3

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图4

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图5

Storm是实时流式处理计算框架,不断的取数据,不断的处理数据,这个过程就像水流一样。官方配图就是一个水龙头来诠释Storm内涵。数据处理流程的开始是Spout,取数据,中间的过程是多个Bolt组合,Bolt是处理数据的单元,Spout与Bolt就像流程图的开始与中间处理过程。Spout与Bolt组合成了一个topology作业,丢给storm就能跑起来。Storm有本地模式,也有远程模式,今天的Storm Hello World采用本地模式。

 

代码结构:

图6

代码源自https://www.cnblogs.com/xuwujing/p/8584684.html,因为用的最新的storm-core 1.2.2,代码有些改变。从eclipse报错提示来看,store代码中原来 superclass中的部分方法被移到了interfase中,所以有些@Override要去掉。

 

POM文件

<!– https://mvnrepository.com/artifact/ring-cors/ring-cors –>

<dependency>

<groupId>ring-cors</groupId>

<artifactId>ring-cors</artifactId>

<version>0.1.12</version>

</dependency>

<!– https://mvnrepository.com/artifact/org.apache.storm/storm-core –>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>1.2.2</version>

<scope>provided</scope>

</dependency>

修正过的代码

TestSpout

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

public class TestSpout extends BaseRichSpout{

 

private static final long serialVersionUID = 225243592780939490L;

 

private SpoutOutputCollector collector;

private static final String field=”word”;

private int count=1;

private String[] message =  {

“Storm Hello World”,

“http://www.jishudao.com storm blog”,

“Play with storm”

};

 

/**

* open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。

* 有三个参数:

* 1.Storm配置的Map;

* 2.topology中组件的信息;

* 3.发射tuple的方法;

*/

public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {

System.out.println(“open:”+map.get(“test”));

this.collector = collector;

}

 

/**

* nextTuple()方法是Spout实现的核心。

* 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。

*/

public void nextTuple() {

 

if(count<=message.length){

System.out.println(“第”+count +”次开始发送数据…”);

this.collector.emit(new Values(message[count-1]));

}

count++;

}

 

 

/**

* declareOutputFields是在IComponent接口中定义,用于声明数据格式。

* 即输出的一个Tuple中,包含几个字段。

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

System.out.println(“定义格式…”);

declarer.declare(new Fields(field));

}

 

/**

* 当一个Tuple处理成功时,会调用这个方法

*/

@Override

public void ack(Object obj) {

System.out.println(“ack:”+obj);

}

 

/**

* 当Topology停止时,会调用这个方法

*/

@Override

public void close() {

System.out.println(“关闭…”);

}

 

/**

* 当一个Tuple处理失败时,会调用这个方法

*/

@Override

public void fail(Object obj) {

System.out.println(“失败:”+obj);

}

 

}

 

TestBolt

import java.util.Map;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

 

public class TestBolt extends BaseRichBolt{

 

/**

*

*/

private static final long serialVersionUID = 4743224635827696343L;

 

private OutputCollector collector;

 

/**

* 在Bolt启动前执行,提供Bolt启动环境配置的入口

* 一般对于不可序列化的对象进行实例化。

* 注:如果是可以序列化的对象,那么最好是使用构造函数。

*/

public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {

System.out.println(“prepare:”+map.get(“test”));

this.collector=collector;

}

 

/**

* execute()方法是Bolt实现的核心。

* 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

*/

public void execute(Tuple tuple) {

String msg=tuple.getStringByField(“word”);

System.out.println(“开始分割单词:”+msg);

String[] words = msg.toLowerCase().split(” “);

for (String word : words) {

this.collector.emit(new Values(word));//向下一个bolt发射数据

}

 

}

 

/**

* 声明数据格式

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“count”));

}

 

/**

* cleanup是IBolt接口中定义,用于释放bolt占用的资源。

* Storm在终止一个bolt之前会调用这个方法。

*/

@Override

public void cleanup() {

System.out.println(“TestBolt的资源释放”);

}

}

 

Test2Bolt

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

 

public class Test2Bolt extends BaseRichBolt{

 

/**

*

*/

private static final long serialVersionUID = 4743224635827696343L;

 

 

/**

* 保存单词和对应的计数

*/

private HashMap<String, Integer> counts = null;

 

private long count=1;

/**

* 在Bolt启动前执行,提供Bolt启动环境配置的入口

* 一般对于不可序列化的对象进行实例化。

* 注:如果是可以序列化的对象,那么最好是使用构造函数。

*/

public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {

System.out.println(“prepare:”+map.get(“test”));

this.counts=new HashMap<String, Integer>();

}

 

/**

* execute()方法是Bolt实现的核心。

* 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

*

*/

public void execute(Tuple tuple) {

String msg=tuple.getStringByField(“count”);

System.out.println(“第”+count+”次统计单词出现的次数”);

/**

* 如果不包含该单词,说明在该map是第一次出现

* 否则进行加1

*/

if (!counts.containsKey(msg)) {

counts.put(msg, 1);

} else {

counts.put(msg, counts.get(msg)+1);

}

count++;

}

 

 

/**

* cleanup是IBolt接口中定义,用于释放bolt占用的资源。

* Storm在终止一个bolt之前会调用这个方法。

*/

@Override

public void cleanup() {

System.out.println(“===========开始显示单词数量============”);

for (Map.Entry<String, Integer> entry : counts.entrySet()) {

System.out.println(entry.getKey() + “: ” + entry.getValue());

}

System.out.println(“===========结束============”);

System.out.println(“Test2Bolt的资源释放”);

}

 

/**

* 声明数据格式

*/

public void declareOutputFields(OutputFieldsDeclarer arg0) {

 

}

}

 

 

 

App

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

 

public class App {

 

private static final String test_spout=”test_spout”;

private static final String test_bolt=”test_bolt”;

private static final String test2_bolt=”test2_bolt”;

 

public static void main(String[] args)  {

//定义一个拓扑

TopologyBuilder builder=new TopologyBuilder();

//设置一个Executeor(线程),默认一个

builder.setSpout(test_spout, new TestSpout(),1);

//shuffleGrouping:表示是随机分组

//设置一个Executeor(线程),和一个task

builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);

//fieldsGrouping:表示是按字段分组

//设置一个Executeor(线程),和一个task

builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields(“count”));

Config conf = new Config();

conf.put(“test”, “test”);

try{

//运行拓扑

if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称

System.out.println(“运行远程模式”);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} else{//没有参数时,本地提交

//启动本地模式

System.out.println(“运行本地模式”);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(“Word-counts” ,conf,  builder.createTopology() );

Thread.sleep(20000);

//  //关闭本地集群

cluster.shutdown();

}

}catch (Exception e){

e.printStackTrace();

}

}

}

 

 

踩坑 The POM for ring-cors:ring-cors:jar:0.1.5 is missing

解决办法:

https://mvnrepository.com/artifact/ring-cors/ring-cors/0.1.12

直接把ring-cors-0.1.12.jar ring-cors-0.1.12.pom下载下来,放到maven本地库中。

 

有初步的Hello World感觉之后,可以再详细看看官方的资料,除了看别人翻译的,强烈建议对比着官方的看。官方有详细的文档清单,不要着急,一个一个慢慢看。

Basics of Storm

Layers on top of Storm

Trident

Trident is an alternative interface to Storm. It provides exactly-once processing, “transactional” datastore persistence, and a set of common stream analytics operations.

Streams API

Stream APIs is another alternative interface to Storm. It provides a typed API for expressing streaming computations and supports functional style operations.

NOTE: Streams API is an experimental feature, and further works might break backward compatibility. We’re also notifying it via annotating classes with marker interface @InterfaceStability.Unstable.

SQL

The Storm SQL integration allows users to run SQL queries over streaming data in Storm.

NOTE: Storm SQL is an experimental feature, so the internals of Storm SQL and supported features are subject to change. But small change will not affect the user experience. We will notify the user when breaking UX change is introduced.

Flux

Setup and Deploying

Intermediate

Debugging

Integration With External Systems, and Other Libraries

Container, Resource Management System Integration

Advanced

本文适合Storm小白看,大神吐槽或在1秒内关掉。也适合.net转java的软件工程师查看。建议先照流程跑起代码,再自己照着示例一行一行敲一遍,感受一下storm关键词,加深印象。不能带你深入浅出,只能浅入浅出。我也刚看,欢迎关注公众号,一直学习成长!

 

参考与引用:

http://storm.apache.org/releases/current/index.html

https://www.cnblogs.com/xuwujing/p/8584684.html