微服务 vs NeuronStorm 一次微服务的实战真实感悟 简化通道化身为神经元玩法 减少微服务中通道代码占比 节省代码时长

最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理多个工程代码,类似于【通道代码】,本身并不是业务代码,占的比例还不少!构想一个NeuronStorm,由平台完成通道的管理与维护,消灭通道代码,同时可视化编程
最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理多个工程代码,类似于【通道代码】,本身并不是业务代码,占的比例还不少!构想一个NeuronStorm,由平台完成通道的管理与维护,消灭通道代码,同时可视化编程

最近在微服务的工程下面增加业务功能,设计涉及三个业务领域,也就是微服务的堆栈要处理webapi<—>Service<—->Agent<—>SDK<—->webapi<—–>bll<—->memcache,建立微服务通道花了一些时间,占了一定比例的开发时间与代码工作,而真正业务代码量是比较少。

微服务的堆栈要处理webapi<--->Service<---->Agent<--->SDK<---->webapi<----->bll<---->memcache,涉及多个工程
微服务的堆栈要处理webapi<—>Service<—->Agent<—>SDK<—->webapi<—–>bll<—->memcache,涉及多个工程

微服务中通道代码能否做一些处理,以节省没有必要的时间占用呢?这是开发过程中一直在思考的问题。我想有以下两方面可以考虑:

  1. 代码生成器,对于微服务中各工程的代码,通道代码可以抽成模板或通过架构手段变成可生成的。当然这种玩法并没有改变微服务在通道代码上面的冗余。这种方案相对比较保守。但是对于现有工程来讲,不涉及大的变化,是比较好实施的。
  2. NeuronStorm,这种方案的话,是将代码逻辑封装到一个处理单元中,Neuron并不用关心自己在哪个业务领域处理哪个机器上。由NeuronStorm统一调度,由业务人员自行配置。这种方案下,Cell之间的通道是基于NeuronStorm统一实现,所以将微服务中的通道代码全部消灭掉了。
NueronStorm中的Nueron单元共享NeuronStorm通道机制,省掉通道代码工作量
NueronStorm中的Nueron单元共享NeuronStorm通道机制,省掉通道代码工作量

NeuronStorm的最小逻辑单元为Neuron,可以理解为逻辑单元,不同的Neuron可组合成业务流,通过组织Neuron单元形成业务流,使得代码逻辑流可以可视化呈现。

复杂的NueronStorm逻辑流程图
复杂的NueronStorm逻辑流程图

NeuronStorm是一个可视化的编程平台,依据设想,基于NeuronStorm的程序,只需要集中在Neuron单元中编写业务逻辑,通道代码由NeuronStorm集群控制。NeuronStorm借鉴了Apache Storm的Bolt概念,注重于可视化编程。

目前NueronStorm处于概念阶段,github的存储库已经建立起来,有兴趣的可以加入进来。

https://github.com/Lancker/NeuronStorm

巧妙拆分bolt提升Storm集群吞吐量

巧妙拆分bolt提升Storm集群吞吐量 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

项目背景介绍:

我们通过日志系统跟踪每个接口的运行状态,收发时间,平均速度,成功率,平均响应时长。日志被收集到了kafka,日志实时处理采用storm框架。一个请求有一条服务接收日志,一条服务器响应日志。需要通过日志实时处理,统计出每个接口的每分钟的指标,存到ES。

昨天的Topology图画成这个样子,如果有看过昨天的文章的话,应该还有印象。

Storm & kafka 日志统计代码流程转成多个Bolt

昨天写文章的时候,就思考要不要将CountBolt拆成MergeLogBolt加CountBolt,原来在一个CountBolt中即要做日志合并,又要做统计。今天整理代码的时候反复的思考,考虑到并行度的、代理管理维护、协作开发,多个维度,同时也考虑代码逻辑清晰度,果断拆开了。所以今天的图变成了:

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

如果细心看的话,每一个bolt都可以调整并行度了,可以依据每个bolt的处理速度配置合理科学的并行度。比如,要是EsBolt慢,可以增加EsBolt并行度,如果EsBolt并不是瓶颈所在,一个EsBolt也可以的话,可以配置为并行度为1。同时还可以考虑调整bolt之间的流控制,综合日志特点再调整并行度配置,从而提升集群处理的吞吐量。

目前我们拆分的Bolt细化到了PareBolt,MergeLogBolt,CountBolt,MergeCountBolt,EsBolt,划分原则如下:

1、单一职责,每个Bolt仅做一件事情。比如MergeBolt,CountBolt,原来是划到一个CountBolt,进行代码梳理的时候发现,MergeBolt需要维护一个组装log的HashMap,并要定时进行清理一定时间窗口未组装成功的孤立日志,组装成功的要及时从HashMap 中删除。而CountBolt维护统计数据是按timeKey,域名、业务名来组织 HashMap,时间窗口的处理逻辑完全不一样。对于MergeLog来讲,组一个删除一个,再定时清理孤立日志就好。而CountBolt则不一样。感觉夹杂在一起,给同事介绍起来也费劲,代码组织也不够清晰。拆成两个bolt之后,感觉代码逻辑更加清晰了。也更好的利用Storm提供的实时计算能力,可依据计算情况,为每个bolt设置相应的并行度,达到更高的吞吐量。简单的来讲,就是将一个大的逻辑,拆成一个一个小的逻辑,用tuple串起来,每个bolt都有明确的输入tuple,明确的输出tuple,也好进行代码逻辑问题定位。

2、协作原则,相对于在一整串代码逻辑中去写逻辑,别人读懂代码会更加费劲,往往是懵menbelity,而划成一个一个bolt之后,一个bolt的逻辑更加简单好懂。每个Bolt拿到tuple只做一件明确的事情,可以更好的并行开发。大家只要约定好输入的tuple,产出的tuple。比如,我们将Es操作划成一个Bolt,落地数据的小伙伴只要专注于Es操作,做合并的小伙伴只要专注于合并。哈哈,是不是很神奇。

收尾,这是一天下来的工作心得,对bolt的划分经验也是经过不断进行代码整理思考的结果。开始是在一个bolt里完成业务所需的所有逻辑,之后才拆成一个一个bolt来对待,并不是一上来就拆成一个一个bolt,第一天的时候比较担心日志无法成对的问题,所以都不敢拆,实验完GroupByField之后才敢拆。这个是有一个过程的。首先要知道业务要干什么,打算怎么实现,拆的话会引起什么问题,怎么解决,经过同事的指点与摸索,算是有了些心得,至少现在拆分出的几个bolt,基本上感觉是OK,已经是拆到一个合理的地步了。当然我也是刚玩Storm,如有好的bolt拆分心得,也欢迎指点,欢迎分享分享。

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

Storm & kafka 日志统计代码流程转成多个Bolt

Storm & kafka 日志统计代码流程转成多个Bolt

所谓一图击破迷团。我们使用kafkaSpout接上kafka消息队列消息,使用ParseBolt将kafka消息中的message拿出来,得到我们统计日志要使用的日志节点信息traceId(跟踪Id)、日志时间、日志类型等信息,ParseBolt仅干这件事,把拿到的日志节点信息emit出去!抵达下一个CoutBolt,采用了GroupByField进行数据流控制,将同traceId的 tuple送到一个Bolt处理。CountBolt处理会进行日志合并统计,这个CountBolt有点点肥大,目前还没有再拆,统计逻辑在这个CountBolt里面。达到一定周期后,会将自身的统计数据emit出去,抵达MergeBolt,MergeBolt只做合并,将CountBolt节点的数据进行合并,达到一定周期后,将合并好的数据emit出去,抵达EsBolt,EsBolt将结果写入ElasticSearch。

QA1:为什么我们要用TraceID进行流控制?

因为一条日志只记录接口收到的时间或响应时间,需合成一对,得到处理时长。

QA2:GroupByFeild怎么感觉理解起来很怪异?

是的,如果新接触Storm,会听到一堆新词汇,Spout,Bolt,Topology,minbus,supervisor,worker,task等等。感觉用路由来理解这里的GroupByFeild按字段分组数据流,可能会比较好理解,至少我是这么觉得的。

QA3:Bolt到底拆分成几个好?

其实,刚开始的时候,我是打算一个KafkaSpout,一个Bolt,先把业务逻辑整出来,再来拆。不管拆成多少个Bolt,完成业务所需要的逻辑步骤一个都不能少,不管怎么拆。一方面刚玩Storm,也得找感觉。写完整理逻辑后,再针对性感受一下拆开处理会有哪些问题,能不能拆?拆完之后有什么好处,拆成多个Bolt后,数据怎么合并,为什么要拆。反复推敲之后,得到大神指点,也考虑后续集群运行的计算资源使用率,得提高吞吐量是更好。将EsBolt单拿出来,主要考虑合并数据逻辑与Es操作逻辑本身就两个人写,拆开也好开发,逻辑也清晰。

QA4:还可以怎么拆?

CountBolt逻辑有两大逻辑,一是日志合二为一,二是请求计数,时长计数。日志合二为一,将收发时间组成对,得到单个请求时长,这个逻辑与计数可以相互独立出来。即CountBolt拆成ReformBolt+CountBolt,Reform仅对日志进行合二为一,CountBolt仅统计。想想是不是这样子?

QA5:并行度怎么调优?

这个暂时没有太多的经验,初步来看,EsBolt可能着重考虑一下,应该要做到批量插入。网上有小伙伴已经踩进一条一条插入到hbase超时的坑。Es看起来也类似。更详细的并行度分享,等我后续实战后再分享。

Storm让人开心的是,这种Bolt串起来的方式,很有趣,感觉非常适合做可视化编程控制,不仅仅是用在Storm上,可以将程序按这种思想拆起一个一个Bolt小球,通过可视化的界面来配置逻辑流程,相当于可视化编程或者可视化生成Storm Topology,想想都非常有意思。

今天的实战分享就到这里啦,每天都会分享工作中实战的点点滴滴,并不能保证自己当前的做法就是百分之百正确或合理,敬请指教。

Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm

storm hello world & kafka storm
storm hello world & kafka storm

Storm是一个实时计算框架,有开源的大神为我们搭好了平台,按照大神的玩法,Storm的作业是topology,而topolgy是由spout,blot组成,spout是取数据,blot是处理数据,一个topology由一个spout加多个blot组成。将topology丢到storm上就能跑起来,可以在本地模式下跑,也可以在集群模式下跑。

Storm的使用,可以查看小伙伴田海龙的经典Blog示例。一个经典的wordcount示例。这个示范了取数据到处理数据的过程。实际使用Storm的时候, 通常是Storm+kafka,按照我们公司的日志系统的情况来看,是这么搭配。

玩Storm能增值不?

当然可以,提升自身身价。同时,也可以考虑星火理财专业手段直接增值 。新技术能提升个人的技术身价,不过,玩Storm,玩到什么程度又是一个境界了。这个要看如何平衡了。一般日志成型后,可能也不会怎么动了,一直在不断的接触新东西,而storm的源代码读过否?

storm与财富增值
storm与财富增值

Storm难吗?

如果说使用storm,相对说讲,还是比较easy。比如用storm写一个word count,或者用storm读取kafka的消息。一般来讲,拿来用是比较简单。如果要去深入看storm,就另说了。

Storm如何集成Kafka?

其实有的时候找来找去,很多答案就在github.com上面,Storm的官方源码中,有kafka的集成示范代码,如果看不明白的话,其实也没有很大的关系。因为我们不还可以找国内小伙伴分享的一些经典。

我的经验分享,也是我们结合网上教程实践一些心得。我们知道topology是由spout与bolt组成的,那么跟kafka集成,必然要一个kafkaSpout,这个有现成的,网上的代码只需要实现一个Scheme,对kafka的输出进行转化,把字节流转成字符串。简单的说,kafkaspout别人都写好了。只需要写一个bytebuffer to string的方法。

小坑之一: String 转UTF8

网上的代码报错,找了一段bytebuffer to string替换掉

小坑之二:zookeeper

到底是跟kafka共用一个,还是独立一套zookeeper给storm用。这个问题纠结了一下,因为zookeeper要是每人上集群都配置一套的话,会有好多zookeeper,不过,老司机校友也是这么推荐的,那就听老司机的。单独给storm部署一的大套。插曲:同时听我要zookeeper的同事吓一跳,因为TA觉得是给kafka用的,不想给我这个Storm用。而我们今天只是想把storm与kafka跑通来,并不想再去部署一个zookeeper。哈哈,最好还是蹭的kafka的zookeeper!

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦

有的人写Blog,帖很多代码,都不带写些文字,也有的人喜欢写很多文字,看不到代码。我喜欢全二为一,尽量把自己的一些想法加在里面。还记录一下坑啊什么的,可以查看田海龙的一个入门级demo  http://www.tianhailong.com/?p=1358

WordCounter.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.youku.demo.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.youku.demo.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
    public void cleanup() {}
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordReader.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.youku.demo.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

TopologyMain.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.youku.demo;
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}

pom.xml:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>demo-storm</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

words.txt:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt

因为入门的wordcount单独展示的,会给人感觉没有太大的实用价值。将kafka与storm的放一起,可以更好的理解怎么与kafka集成。

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦。

赞助商小伙伴链接:

如何享受宜信星火金服宜心理财:

  1. 扫码二维码
  2. 通过宜信星火金服活动链接 http://www.ixinghuo.com/qcode.php?yixinqcode
  3. 通过宜信星火金服理财师店铺链接:https://xinghuo.yixin.com/yiidea
  4. 通过宜信星火金服理财师移动端邀请页面 https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
    5.通过宜信星火金服理财师店铺宜心理财团队短链接:

http://yixin.hk

http://yixin.ceo

http://yue.ma

  1. 通过宜信星火金服宜心理财团队网站页面

http://www.yixinlicai.com.cn

  1. 通过  宜信.公司 | 宜信.网络 | 宜信.net

FlexJS vs BaibianJS: Inject what I want 不用再替换js,直接注入想要的JS代码 更省事了

FlexJs
FlexJs

自从BaibianJS发布之后,一直在想,我们将目标网站的jquery替换的这种玩法,有点low,在当前项目上虽然用上了,并能成功用于当前项目的网页爬虫,也是针对项目中需要在jqury.js加载完后要立即拦截xhr。更多情况下,并不需要拿来拦截xhr,用BaibianJS就显得有些不恰当了,始终给人一种勉强的感觉,需要先查看网页用到了哪个js,再针对性的替换。于是,对BaibianJS进行了适当的调整,产生了新的FlexJS,两种使用场景还是有很大的区别。

BaibianJS主要应用于接口爬取,需要对请求进行拦截,所以使用替换,及时将拦截代码注入进去。

FlexJS主要用于增强网页功能,当页面完全加载完之后再注入。

两者都是Chrome Extension,都解除了跨域限制,直接加载到Chrome扩张插件中就可以使用。二话不说上代码。

代码结构

manifest.json

{

“name”: “FlexJS”,

“version”: “1.0”,

“description”: “FlexJS,FlexJS主要有两个功能,一是注入JS,实现任性注入。二是修改Access-Control-Allow-Origin,实现任性跨域”,

 

“content_scripts”:

[

{

“matches”:  [“<all_urls>”],

“js”: [  “js/content-script.js”]

}

],

 

 

“permissions”: [“webRequest”,  “webRequestBlocking”,

“http://*/*”],

“background”: {

“scripts”: [  “background.js”]

},

“web_accessible_resources”:  [“js/inject.js”],

“manifest_version”: 2

}

 

background.js

chrome.webRequest.onHeadersReceived.addListener(function(details)  {

details.responseHeaders.push({name:’Access-Control-Allow-Origin’,value:”*”});

console.log(details.responseHeaders)

return  {responseHeaders:details.responseHeaders};

},{urls:  [“<all_urls>”]},  [“responseHeaders”,”blocking”]);

 

 

JS/content-script.js

// 向页面注入JS

function  injectCustomJs(jsPath)

{

jsPath = jsPath || ‘js/inject.js’;

var temp =  document.createElement(‘script’);

temp.setAttribute(‘type’,  ‘text/javascript’);

// 获得的地址类似:chrome-extension://ihcokhadfjfchaeagdoclpnjdiokfakg/js/inject.js

temp.src =  chrome.extension.getURL(jsPath);

temp.onload = function()

{

// 放在页面不好看,执行完后移除掉

this.parentNode.removeChild(this);

};

document.head.appendChild(temp);

}

 

injectCustomJs();

 

JS/inject.js

console.log($(“title”).html());

 

FlexJS采用了互联网上的新鲜源代码组装而成,去掉了不相掉的代码,仅有两个实用功能,一是解除跨域限制,二是注入JS。inject.js是放飞梦想的地方,可以针对特定网站进行功能加强。Good luck with FlexJS!

Storm历险记之浅入浅出:Storm Hello World入门示例

俗话说一图胜千言万语,想了解Storm的话,先来看几张图,直观的了解一下Storm。图片有官方的图片,也有技术人自己画的图片,均来自互联网,在看代码之前先来简单的看一下图片。请快速的看一下图片,找一下感觉,如果一下子看不明白,其实也没有关系。图片流之后,会有一小段文字说明。本文适合Storm小白看,大神吐槽或在1秒内关掉。

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图1

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图2

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图3

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图4

Storm历险记之浅入浅出:Storm Hello World入门示例
Storm历险记之浅入浅出:Storm Hello World入门示例

图5

Storm是实时流式处理计算框架,不断的取数据,不断的处理数据,这个过程就像水流一样。官方配图就是一个水龙头来诠释Storm内涵。数据处理流程的开始是Spout,取数据,中间的过程是多个Bolt组合,Bolt是处理数据的单元,Spout与Bolt就像流程图的开始与中间处理过程。Spout与Bolt组合成了一个topology作业,丢给storm就能跑起来。Storm有本地模式,也有远程模式,今天的Storm Hello World采用本地模式。

 

代码结构:

图6

代码源自https://www.cnblogs.com/xuwujing/p/8584684.html,因为用的最新的storm-core 1.2.2,代码有些改变。从eclipse报错提示来看,store代码中原来 superclass中的部分方法被移到了interfase中,所以有些@Override要去掉。

 

POM文件

<!– https://mvnrepository.com/artifact/ring-cors/ring-cors –>

<dependency>

<groupId>ring-cors</groupId>

<artifactId>ring-cors</artifactId>

<version>0.1.12</version>

</dependency>

<!– https://mvnrepository.com/artifact/org.apache.storm/storm-core –>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>1.2.2</version>

<scope>provided</scope>

</dependency>

修正过的代码

TestSpout

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

public class TestSpout extends BaseRichSpout{

 

private static final long serialVersionUID = 225243592780939490L;

 

private SpoutOutputCollector collector;

private static final String field=”word”;

private int count=1;

private String[] message =  {

“Storm Hello World”,

“http://www.jishudao.com storm blog”,

“Play with storm”

};

 

/**

* open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。

* 有三个参数:

* 1.Storm配置的Map;

* 2.topology中组件的信息;

* 3.发射tuple的方法;

*/

public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {

System.out.println(“open:”+map.get(“test”));

this.collector = collector;

}

 

/**

* nextTuple()方法是Spout实现的核心。

* 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。

*/

public void nextTuple() {

 

if(count<=message.length){

System.out.println(“第”+count +”次开始发送数据…”);

this.collector.emit(new Values(message[count-1]));

}

count++;

}

 

 

/**

* declareOutputFields是在IComponent接口中定义,用于声明数据格式。

* 即输出的一个Tuple中,包含几个字段。

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

System.out.println(“定义格式…”);

declarer.declare(new Fields(field));

}

 

/**

* 当一个Tuple处理成功时,会调用这个方法

*/

@Override

public void ack(Object obj) {

System.out.println(“ack:”+obj);

}

 

/**

* 当Topology停止时,会调用这个方法

*/

@Override

public void close() {

System.out.println(“关闭…”);

}

 

/**

* 当一个Tuple处理失败时,会调用这个方法

*/

@Override

public void fail(Object obj) {

System.out.println(“失败:”+obj);

}

 

}

 

TestBolt

import java.util.Map;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

 

public class TestBolt extends BaseRichBolt{

 

/**

*

*/

private static final long serialVersionUID = 4743224635827696343L;

 

private OutputCollector collector;

 

/**

* 在Bolt启动前执行,提供Bolt启动环境配置的入口

* 一般对于不可序列化的对象进行实例化。

* 注:如果是可以序列化的对象,那么最好是使用构造函数。

*/

public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {

System.out.println(“prepare:”+map.get(“test”));

this.collector=collector;

}

 

/**

* execute()方法是Bolt实现的核心。

* 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

*/

public void execute(Tuple tuple) {

String msg=tuple.getStringByField(“word”);

System.out.println(“开始分割单词:”+msg);

String[] words = msg.toLowerCase().split(” “);

for (String word : words) {

this.collector.emit(new Values(word));//向下一个bolt发射数据

}

 

}

 

/**

* 声明数据格式

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“count”));

}

 

/**

* cleanup是IBolt接口中定义,用于释放bolt占用的资源。

* Storm在终止一个bolt之前会调用这个方法。

*/

@Override

public void cleanup() {

System.out.println(“TestBolt的资源释放”);

}

}

 

Test2Bolt

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

 

public class Test2Bolt extends BaseRichBolt{

 

/**

*

*/

private static final long serialVersionUID = 4743224635827696343L;

 

 

/**

* 保存单词和对应的计数

*/

private HashMap<String, Integer> counts = null;

 

private long count=1;

/**

* 在Bolt启动前执行,提供Bolt启动环境配置的入口

* 一般对于不可序列化的对象进行实例化。

* 注:如果是可以序列化的对象,那么最好是使用构造函数。

*/

public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {

System.out.println(“prepare:”+map.get(“test”));

this.counts=new HashMap<String, Integer>();

}

 

/**

* execute()方法是Bolt实现的核心。

* 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

*

*/

public void execute(Tuple tuple) {

String msg=tuple.getStringByField(“count”);

System.out.println(“第”+count+”次统计单词出现的次数”);

/**

* 如果不包含该单词,说明在该map是第一次出现

* 否则进行加1

*/

if (!counts.containsKey(msg)) {

counts.put(msg, 1);

} else {

counts.put(msg, counts.get(msg)+1);

}

count++;

}

 

 

/**

* cleanup是IBolt接口中定义,用于释放bolt占用的资源。

* Storm在终止一个bolt之前会调用这个方法。

*/

@Override

public void cleanup() {

System.out.println(“===========开始显示单词数量============”);

for (Map.Entry<String, Integer> entry : counts.entrySet()) {

System.out.println(entry.getKey() + “: ” + entry.getValue());

}

System.out.println(“===========结束============”);

System.out.println(“Test2Bolt的资源释放”);

}

 

/**

* 声明数据格式

*/

public void declareOutputFields(OutputFieldsDeclarer arg0) {

 

}

}

 

 

 

App

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

 

public class App {

 

private static final String test_spout=”test_spout”;

private static final String test_bolt=”test_bolt”;

private static final String test2_bolt=”test2_bolt”;

 

public static void main(String[] args)  {

//定义一个拓扑

TopologyBuilder builder=new TopologyBuilder();

//设置一个Executeor(线程),默认一个

builder.setSpout(test_spout, new TestSpout(),1);

//shuffleGrouping:表示是随机分组

//设置一个Executeor(线程),和一个task

builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);

//fieldsGrouping:表示是按字段分组

//设置一个Executeor(线程),和一个task

builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields(“count”));

Config conf = new Config();

conf.put(“test”, “test”);

try{

//运行拓扑

if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称

System.out.println(“运行远程模式”);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} else{//没有参数时,本地提交

//启动本地模式

System.out.println(“运行本地模式”);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(“Word-counts” ,conf,  builder.createTopology() );

Thread.sleep(20000);

//  //关闭本地集群

cluster.shutdown();

}

}catch (Exception e){

e.printStackTrace();

}

}

}

 

 

踩坑 The POM for ring-cors:ring-cors:jar:0.1.5 is missing

解决办法:

https://mvnrepository.com/artifact/ring-cors/ring-cors/0.1.12

直接把ring-cors-0.1.12.jar ring-cors-0.1.12.pom下载下来,放到maven本地库中。

 

有初步的Hello World感觉之后,可以再详细看看官方的资料,除了看别人翻译的,强烈建议对比着官方的看。官方有详细的文档清单,不要着急,一个一个慢慢看。

Basics of Storm

Layers on top of Storm

Trident

Trident is an alternative interface to Storm. It provides exactly-once processing, “transactional” datastore persistence, and a set of common stream analytics operations.

Streams API

Stream APIs is another alternative interface to Storm. It provides a typed API for expressing streaming computations and supports functional style operations.

NOTE: Streams API is an experimental feature, and further works might break backward compatibility. We’re also notifying it via annotating classes with marker interface @InterfaceStability.Unstable.

SQL

The Storm SQL integration allows users to run SQL queries over streaming data in Storm.

NOTE: Storm SQL is an experimental feature, so the internals of Storm SQL and supported features are subject to change. But small change will not affect the user experience. We will notify the user when breaking UX change is introduced.

Flux

Setup and Deploying

Intermediate

Debugging

Integration With External Systems, and Other Libraries

Container, Resource Management System Integration

Advanced

本文适合Storm小白看,大神吐槽或在1秒内关掉。也适合.net转java的软件工程师查看。建议先照流程跑起代码,再自己照着示例一行一行敲一遍,感受一下storm关键词,加深印象。不能带你深入浅出,只能浅入浅出。我也刚看,欢迎关注公众号,一直学习成长!

 

参考与引用:

http://storm.apache.org/releases/current/index.html

https://www.cnblogs.com/xuwujing/p/8584684.html

BaibianJS(百变JS) 快速注入JS 随意跨域

BaibianJS(百变JS) is a little and amazing mini chrome extension!It will help you to inject js and send cross domain xhr request by setting Access-Control-Allow-Origin.百变JS是一个小巧实用的Chrome插件,主要2个功能,1是能帮您任性注入JS,2是通过让您任性跨域!

为什么会有BaibianJS

为了注入JS和跨域,最先的方案都是用winform+webbrowser,玩得不要不要的,但是调试起来不方便!在实际使用过程中上,也用过Fiddler进入注入,但是替换JS要计算请求长度,用起来会比较烦。而最佳的注入节点又是jq 下方,所以Fiddler实现了想要的注入,但是却很烦。最后,想用chrome Extension主要在于,替换更加灵活,而且有chrome强大的console辅助调试。

BainbianJS正是在这种纠结的背景下孕育而生,虽然只是一个简单的JS,功能却非常实用。可以非常灵活的注入JS,对网页进行分析与数据提取。

How to use

Step 1

下载BainbianJS插件文件,background.js\manifest.json,真正起作用的就这两个,放到一个BianbianJS文件夹里。进入Chrome Extension插件界面,选加载安装,选择BianbianJS文件夹,安装成功。

如何进入Chrome Extension插件管理界面

在谷歌浏览器中输入 chrome://extensions/

Step 2

安装成功后,要依据自己的使用情况修改backgroud.js中的配置,配置要替换的JS,非常简单,就不废话了。修改JS记得重载加载插件哦!

与我互动

请关注我的公众号

约吗公众号

捐助

如果你觉得不错,想捐助献爱心,请扫码

微信赞赏码

 

代码已经开源到Github

https://github.com/Lancker/BaibianJS

图云书分享朋友圈操作指南 让朋友们方便查看到自己的闲置图书图单 互换互借图书

 

图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南

将自己的图书扫码录入后,如何分享到朋友圈,这个问题已经有不少小伙伴在问小编,下面就来讲一下,怎么操作流程。

文字简要版:通过图书云公众号菜单-我的-我的书目,点击任意一本书,在图书页,点【截二维码分享到朋友圈】,有没有Catch到?

下面,小编再带大家分享图解一下:

看图之前,小编分享一下常用共享图书吃鸡分享文案:

书非借不可读也 微信识别二维码即可借阅我的闲置图书!高端大气上档次的《Angular 5 高级编程》带回家!上微信公众号图书云,与身边的朋友同事好朋友共享图书,让闲置图书

图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南
图书云-分享朋友圈操作指南

更有价值!

换成通用的即:

书非借不可读也 微信识别二维码即可借阅我的闲置图书!高端大气上档次的《XXXXXXX》带回家!上微信公众号图书云,与身边的朋友同事好朋友共享图书,让闲置图书更有价值!

亲们可以发挥想像力,让朋友们以最快的速度将自己的闲置图书搬空~~

本期《图云书分享朋友圈操作指南》讲解完毕,有不明白的地方可以通过微信公众号【图书云】给留言~

ChinaHadoop会员博客集中营 源自Hadoop云计算群

ChinaHadoop.org 源自Hadoop云计算群(qq群号:300165122),是国内最早一批投身Hadoop大数据行业的小伙伴,目前近2000人!为更好的提供交流互动平台,特创办了ChinaHadoop.org,微信公众号ChinaHadoop,我们希望通过不断完善ChinaHadoop会员机制,促进会员之间的交流与互动,让知识更快的传播!

扫码关注ChinaHadoop公众号

ChinaHadoop微信公众号二维码
ChinaHadoop微信公众号二维码 博客集中营 源自Hadoop云计算群

会员权益:

所有Hadoop云计算群成员(qq群号:300165122)可免费获得顶级域名一个( .top \.work\.tech),免费获得不限量虚拟主机空间一个。会员权益享受请联系微信13439975582

交换友情链接 ChinaHadoop   https://ChinaHadoop.org

成员博客列表

如需在https://ChinaHadoop开通博客权限,请微信联系

ChinaHadoop维护负责人微信 13439975582

转自: https://chinahadoop.org/index.php/2018/07/03/chinahadoop-hello-world/

Hadoop大数据云计算工作机会-去哪儿网大数据

去哪儿网大数据中心在招人,主要是hadoop集群运维方向 有意者可以私聊地点:北京中关村待遇:30*16左右,关于去哪儿hadoop工作,请咨询(QQ群:300165122)群成员@白杨

Hadoop云计算QQ群:300165122,汇聚了业务近2000名从业人员,是国内较早一批投身Hadopp大数据工作的聚集地!本群主要进行业务技术交流,发布业内活动、工作机会!

Hadoop百度百科介绍:
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。