Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
make option maps actual maps everywhere, just to be consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed May 6, 2012
1 parent 5e2410d commit d6e5c0c
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 45 deletions.
8 changes: 2 additions & 6 deletions README.markdown
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
An event is a value that we don't yet have, a value which is unrealized. Lamina contains a rich set of operators for dealing with unrealized values, both individually and collectively.
An event is a value that we don't yet have. Lamina contains a rich set of operators for dealing with these unrealized values, both individually and collectively. If you're new to Lamina, you should start by reading about [how it deals with individual events](https://github.com/ztellman/lamina/wiki/Introduction).

Individual events are represented by async-results. If you're new to Lamina, you should start by [reading about them](https://github.com/ztellman/lamina/wiki/Introduction).

Streams of events are represented by channels. They are used in [Aleph](https://github.com/ztellman/aleph) to model network communication over a variety of protocols.

Much like Clojure's sequences, we can apply transforms to all events that pass through the channel:
Streams of events are represented by channels, which are used in [Aleph](https://github.com/ztellman/aleph) to model network communication over a variety of protocols. Much like Clojure's sequences, we can apply transforms to all events that pass through the channel:

```clj
> (use 'lamina.core)
Expand Down
10 changes: 5 additions & 5 deletions src/lamina/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@
callback (last transforms+callback)]
(unify-gensyms
`(let [ch## (channel)]
~(if (empty? transforms)
`(receive-all ch## ~callback)
`(do
(receive-all (->> ch## ~@transforms) ~callback)
ch##))))))
(do
~(if (empty? transforms)
`(receive-all ch## ~callback)
`(receive-all (->> ch## ~@transforms) ~callback))
ch##)))))

(defmacro split
"Returns a channel which will forward each message to all downstream-channels.
Expand Down
22 changes: 11 additions & 11 deletions src/lamina/executor/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@
"Defines a thread pool that can be used with instrument and defn-instrumented.
more goes here"
[& {:keys [idle-timeout
min-thread-count
max-thread-count
interrupt?]
:or {idle-timeout 60000
min-thread-count 1
max-thread-count Integer/MAX_VALUE
interrupt? true}
:as options}]
[{:keys [idle-timeout
min-thread-count
max-thread-count
interrupt?]
:or {idle-timeout 60000
min-thread-count 1
max-thread-count Integer/MAX_VALUE
interrupt? true}
:as options}]
(when-not (contains? options :name)
(throw (IllegalArgumentException. "Every executor must have a :name specified.")))
(let [nm (name (:name options))
Expand Down Expand Up @@ -141,5 +141,5 @@
(def
^{:doc "A default executor with an unbounded maximum thread count."}
default-executor (executor
:name "lamina-default-executor"
:idle-timeout 15000))
{:name "lamina-default-executor"
:idle-timeout 15000}))
17 changes: 10 additions & 7 deletions src/lamina/trace/instrument.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
[lamina.executor.utils :as ex]
[lamina.trace.timer :as t]))

(defrecord Enter [^long timestamp args])
(defrecord Enter [description ^long timestamp args])

(defmethod print-method Enter [x writer]
(print-method (into {} x) writer))

(defmacro instrument-task-body
[nm executor enter-probe return-probe implicit? with-bindings? timeout invoke args]
`(do
(when (probe-enabled? ~enter-probe)
(enqueue ~enter-probe (Enter. (System/currentTimeMillis) ~args)))
(enqueue ~enter-probe (Enter. ~nm (System/currentTimeMillis) ~args)))
(let [timer# (t/enqueued-timer
~executor
:description ~nm
Expand Down Expand Up @@ -68,7 +71,7 @@
(defmacro instrument-body [nm enter-probe return-probe implicit? invoke args]
`(do
(when (probe-enabled? ~enter-probe)
(enqueue ~enter-probe (Enter. (System/currentTimeMillis) ~args)))
(enqueue ~enter-probe (Enter. ~nm (System/currentTimeMillis) ~args)))
(let [timer# (t/timer
:description ~nm
:args ~args
Expand Down Expand Up @@ -159,10 +162,10 @@
computing the value, and the 'return' probe will include an :enqueued-duration
parameter that describes the time, in nanoseconds, spent waiting to be executed."

[f & {:keys [executor timeout probes implicit? with-bindings?]
:as options
:or {implicit? true
with-bindings? false}}]
[f {:keys [executor timeout probes implicit? with-bindings?]
:as options
:or {implicit? true
with-bindings? false}}]
(when-not (contains? options :name)
(throw (IllegalArgumentException. "Instrumented functions must have a :name defined.")))
(if executor
Expand Down
16 changes: 10 additions & 6 deletions src/lamina/trace/timer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
-1
(unchecked-subtract (long ~'return) (long ~'enter)))
:args ~'args
:result ~'result
:sub-tasks (when-not (.isEmpty ~'sub-tasks)
(doall (map #(timing % start#) ~'sub-tasks)))
~@extras)]
Expand All @@ -78,6 +79,7 @@
^long timestamp
^long enqueued
args
^{:volatile-mutable true} result
^{:volatile-mutable true, :tag long} enter
^{:volatile-mutable true, :tag long} return
^ConcurrentLinkedQueue sub-tasks]
Expand All @@ -100,11 +102,11 @@
(ex/trace-error executor timing)))
(mark-return [this val]
(set! return (System/nanoTime))
(let [timing (make-timing EnqueuedTiming enter
(set! result val)
(let [timing (make-timing EnqueuedTiming enqueued
:enqueued-duration (if (= Long/MIN_VALUE enter)
-1
(unchecked-subtract (long enter) (long enqueued)))
:result val)]
(unchecked-subtract (long enter) (long enqueued))))]
(when return-probe
(enqueue return-probe timing))
(ex/trace-return executor timing))))
Expand All @@ -124,6 +126,7 @@
(System/currentTimeMillis)
(System/nanoTime)
args
nil
Long/MIN_VALUE
Long/MIN_VALUE
(ConcurrentLinkedQueue.))]
Expand Down Expand Up @@ -171,6 +174,7 @@
error-probe
^long start-stage
args
^{:volatile-mutable true} result
^long timestamp
^long enter
^{:volatile-mutable true, :tag long} return
Expand All @@ -189,11 +193,10 @@
(assoc (timing this enter) :error err)))
(mark-return [this val]
(set! return (System/nanoTime))

(set! result val)
(when return-probe
(enqueue return-probe
(make-timing Timing enter
:result val)))))
(make-timing Timing enter)))))

(defn timer-
[description
Expand All @@ -213,6 +216,7 @@
error-probe
start-stage
args
nil
(System/currentTimeMillis)
(System/nanoTime)
Long/MIN_VALUE
Expand Down
6 changes: 3 additions & 3 deletions test/lamina/test/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
(def n 1e3)

(defn benchmark-active-probe [description probe]
(let [nm (gensym "name")
x (executor :name nm :max-thread-count 1)
(let [nm (gensym "name")
x (executor {:name nm :max-thread-count 1})
p (probe-channel [nm probe])
f (instrument #(.countDown ^CountDownLatch %)
:executor x
Expand All @@ -42,7 +42,7 @@
(dotimes [_ n]
(.execute x #(.countDown latch)))
(.await latch))))
(let [x (executor :name "test" :max-thread-count 1)
(let [x (executor {:name "test" :max-thread-count 1})
f (instrument #(.countDown ^CountDownLatch %)
:executor x
:name :foo)]
Expand Down
2 changes: 1 addition & 1 deletion test/lamina/test/pipeline.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
(defn boom [_]
(throw (Exception. "boom")))

(def exc (executor :name :test-pipeline))
(def exc (executor {:name :test-pipeline}))

(defmacro repeated-pipeline [n f]
`(pipeline ~@(repeat n f)))
Expand Down
12 changes: 6 additions & 6 deletions test/lamina/test/trace.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

;;;

(def exc (executor :name :test-executor))
(def exc (executor {:name :test-executor}))

;;;

Expand All @@ -46,7 +46,7 @@
(let [nm (gensym "name")
probe (probe-channel [nm probe-type])
[val callback] (capture)
f (apply instrument f (apply concat (assoc options :name nm)))]
f (instrument f (assoc options :name nm))]
(receive-all probe callback)
(try
(let [result (apply f args)]
Expand Down Expand Up @@ -139,7 +139,7 @@
(defn benchmark-active-probe [description probe]
(let [nm (gensym "name")
p (probe-channel [nm probe])
f (instrument + :name nm)]
f (instrument + {:name nm})]
(receive-all p (fn [_]))
(bench description
(f 1 1))))
Expand All @@ -149,13 +149,13 @@
(+ 1 1 1))
(bench "baseline addition with apply"
(apply + [1 1 1]))
(let [f (instrument + :name :foo)]
(let [f (instrument + {:name :foo})]
(bench "instrument addition with one arg"
(f 1)))
(let [f (instrument + :name :foo)]
(let [f (instrument + {:name :foo})]
(bench "instrument addition with three args"
(f 1 1 1)))
(let [f (instrument + :name :foo)]
(let [f (instrument + {:name :foo})]
(bench "instrument addition with five args"
(f 1 1 1 1 1)))
(benchmark-active-probe "instrument addition with return probe" :return)
Expand Down

0 comments on commit d6e5c0c

Please sign in to comment.