forked from nathanmarz/storm-starter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReachTopology.java
179 lines (154 loc) · 6.28 KB
/
ReachTopology.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.*;
/**
* This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
* compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
* <p/>
* Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
* who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
* unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
* records.
* <p/>
* This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
* minutes on a single machine into one that takes just a couple seconds.
* <p/>
* For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
* <p/>
* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.
*/
public class ReachTopology {
public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
}};
public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
put("tim", Arrays.asList("alex"));
put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
put("adam", Arrays.asList("david", "carissa"));
put("mike", Arrays.asList("john", "bob"));
put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
}};
public static class GetTweeters extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object id = tuple.getValue(0);
String url = tuple.getString(1);
List<String> tweeters = TWEETERS_DB.get(url);
if (tweeters != null) {
for (String tweeter : tweeters) {
collector.emit(new Values(id, tweeter));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "tweeter"));
}
}
public static class GetFollowers extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object id = tuple.getValue(0);
String tweeter = tuple.getString(1);
List<String> followers = FOLLOWERS_DB.get(tweeter);
if (followers != null) {
for (String follower : followers) {
collector.emit(new Values(id, follower));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "follower"));
}
}
public static class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>();
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}
public static class CountAggregator extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "reach"));
}
}
public static LinearDRPCTopologyBuilder construct() {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 4);
builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
return builder;
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = construct();
Config conf = new Config();
if (args == null || args.length == 0) {
conf.setMaxTaskParallelism(3);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
for (String url : urlsToTry) {
System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
}
cluster.shutdown();
drpc.shutdown();
}
else {
conf.setNumWorkers(6);
StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
}