Skip to content

Commit

Permalink
Merge pull request tomdz#3 from maxichan/master
Browse files Browse the repository at this point in the history
Bolt updates - pass class instead of strings for input/output fields, fixed tests
  • Loading branch information
maxinechan authored and GitHub Enterprise committed Sep 15, 2016
2 parents 8c4a322 + 8dd002b commit c07c773
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 384 deletions.
14 changes: 12 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<groupId>org.tomdz.storm</groupId>
<artifactId>storm-esper</artifactId>
<name>storm-esper</name>
<version>1.0.2</version>
<version>1.0.4</version>
<packaging>jar</packaging>
<description>Storm + Esper integration</description>
<licenses>
Expand Down Expand Up @@ -67,12 +67,16 @@
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!--
Use "provided" scope to keep storm out of the jar-with-dependencies
For IntelliJ dev, intellij will load properly.
-->
<scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>com.espertech</groupId>
Expand Down Expand Up @@ -260,5 +264,11 @@
</plugins>
</build>
</profile>
<profile>
<id>intellij</id>
<properties>
<provided.scope>compile</provided.scope>
</properties>
</profile>
</profiles>
</project>
28 changes: 28 additions & 0 deletions src/main/java/org/tomdz/storm/esper/EsperBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.tomdz.storm.esper.model.Event;

import java.util.*;

Expand Down Expand Up @@ -74,6 +75,28 @@ public AliasedInputBuilder aliasStream(String componentId, String streamId)
{
return new AliasedInputBuilder(bolt, new StreamId(componentId, streamId));
}

public InputsBuilder onStreamEvent(Event event, String componentId)
{
return getInputsBuilderFromAliasedBuilder(
aliasStream(componentId, event.getStreamName()),
event
);
}

public InputsBuilder onComponentEvent(Event event, String componentId)
{
return getInputsBuilderFromAliasedBuilder(aliasComponent(componentId), event);
}

private InputsBuilder getInputsBuilderFromAliasedBuilder(AliasedInputBuilder aliasedBuilder, Event event)
{
Map<Class, String[]> model = event.getModelByType();
for (Class field : model.keySet()) {
aliasedBuilder = aliasedBuilder.withFields(model.get(field)).ofType(field);
}
return aliasedBuilder.toEventType(event.getEventName());
}
}

public static final class AliasedInputBuilder
Expand Down Expand Up @@ -151,6 +174,11 @@ public OutputStreamBuilder onDefaultStream()
{
return new OutputStreamBuilder(bolt, "default");
}

public OutputsBuilder onEvent(Event event)
{
return onStream(event.getStreamName()).fromEventType(event.getEventName()).emit(event.getModels());
}
}

public static final class OutputStreamBuilder
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/tomdz/storm/esper/model/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.tomdz.storm.esper.model;

import java.util.Map;

public interface Event {
Map<Class, String[]> getModelByType();
String[] getModels();
String getStreamName();
String getEventName();
}
35 changes: 0 additions & 35 deletions src/test/java/org/tomdz/storm/esper/Connection.java

This file was deleted.

52 changes: 52 additions & 0 deletions src/test/java/org/tomdz/storm/esper/EsperTestJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.tomdz.storm.esper;

import org.apache.storm.Config;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashSet;

import static org.testng.Assert.assertEquals;

public abstract class EsperTestJob implements TestJob {

/**
* Check that all outgoing fields from the Esper bolt match the incoming fields of the
* gathering bolt.
* @param esperBolt
* @param gatheringBolt
*/
public void assertEventTypesEqual(EsperBolt esperBolt, GatheringBolt gatheringBolt) {
for (Tuple tuple : gatheringBolt.getGatheredData()) {
String streamId = tuple.getSourceStreamId();

EventTypeDescriptor eventType = esperBolt.getEventTypeForStreamId(streamId);

assertEquals(new HashSet<String>(tuple.getFields().toList()),
new HashSet<String>(eventType.getFields().toList()));
}
}

public CompleteTopologyParam createTestDataConfig(String spoutName, Values... values) {
MockedSources mockedSources = new MockedSources();
mockedSources.addMockData(spoutName, values);

return createConfig(mockedSources);
}

public CompleteTopologyParam createConfig(MockedSources mockedSources) {
Config conf = new Config();
conf.setNumWorkers(1);
conf.setDebug(false);
conf.put(Config.TOPOLOGY_DEBUG, false);
conf.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, 0);

CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(conf);
return completeTopologyParam;
}
}
70 changes: 0 additions & 70 deletions src/test/java/org/tomdz/storm/esper/Event.java

This file was deleted.

Loading

0 comments on commit c07c773

Please sign in to comment.