diff --git a/project.clj b/project.clj index b5752c5..640de72 100644 --- a/project.clj +++ b/project.clj @@ -2,6 +2,6 @@ :description "Testing utilities for storm" :source-path "src/clj" :java-source-path "src/jvm" - :dependencies [[org.clojure/clojure "1.2.0"] + :dependencies [[org.clojure/clojure "1.4.0"] [org.clojure/clojure-contrib "1.2.0"]] - :dev-dependencies [[storm "0.6.0"]]) + :dev-dependencies [[storm "0.8.1"]]) diff --git a/src/clj/storm/test/capturing_topology.clj b/src/clj/storm/test/capturing_topology.clj index f1bb886..d5284d4 100644 --- a/src/clj/storm/test/capturing_topology.clj +++ b/src/clj/storm/test/capturing_topology.clj @@ -8,7 +8,6 @@ (:import [storm.test MultiStreamFeederSpout]) (:require [backtype.storm.daemon [common :as common]]) (:use [storm.test.persistent-tuple-capture-bolt]) - (:use [clojure.contrib.def :only [defnk]]) (:use [backtype.storm util config thrift clojure testing log])) (defnk capturing-topology @@ -38,10 +37,11 @@ (.set_bolts topology (assoc (clojurify-structure bolts) (uuid) - (Bolt. - (into {} (for [id all-streams] [id (mk-global-grouping)])) - (serialize-component-object capturer) - (mk-plain-component-common {} nil)))) + (let [input-spec (into {} (for [id all-streams] + [id (mk-global-grouping)]))] + (Bolt. + (serialize-component-object capturer) + (mk-plain-component-common input-spec {} nil))))) (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology) (let [storm-id (common/get-storm-id state storm-name) ] diff --git a/src/clj/storm/test/visualization.clj b/src/clj/storm/test/visualization.clj index 4ceb736..123d654 100644 --- a/src/clj/storm/test/visualization.clj +++ b/src/clj/storm/test/visualization.clj @@ -98,7 +98,8 @@ (fn [bolt-spec] (let [ id (key bolt-spec) bolt (val bolt-spec) - inputs (clojurify-structure (.get_inputs bolt)) ] + common (.get_common bolt) + inputs (clojurify-structure (.get_inputs common)) ] (map (fn [input] (let [ from (key input) diff --git a/src/jvm/storm/test/DRPCExclamationTopology.java b/src/jvm/storm/test/DRPCExclamationTopology.java index 2c64b86..5fafcad 100644 --- a/src/jvm/storm/test/DRPCExclamationTopology.java +++ b/src/jvm/storm/test/DRPCExclamationTopology.java @@ -20,6 +20,10 @@ public static class ExclaimBolt implements IBasicBolt { public void prepare(Map conf, TopologyContext context) { } + public Map getComponentConfiguration() { + return null; + } + @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); diff --git a/src/jvm/storm/test/MultiStreamFeederSpout.java b/src/jvm/storm/test/MultiStreamFeederSpout.java index e991702..1b2525a 100644 --- a/src/jvm/storm/test/MultiStreamFeederSpout.java +++ b/src/jvm/storm/test/MultiStreamFeederSpout.java @@ -110,6 +110,9 @@ public void ack(Object msgId) { } } + public void activate() {} + public void deactivate() {} + public void fail(Object msgId) { synchronized(failed) { int curr = get(failed, _context.getStormId(), 0);