Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
rename async-result -> async-promise
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Apr 17, 2013
1 parent 8c10d7e commit 0164b77
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 82 deletions.
6 changes: 3 additions & 3 deletions README.markdown
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Lamina

An event is a value that we don't yet have. Lamina contains a rich set of operators for dealing with these unrealized values, both individually and collectively. If you're new to Lamina, you should start by reading about [how it deals with individual events](https://github.com/ztellman/lamina/wiki/Introduction).
Lamina is for describing and analyzing streams of data. It provides a rich set of operators for dealing with these unrealized values, both individually and collectively. If you're new to Lamina, you should start by reading about [how it deals with individual events](https://github.com/ztellman/lamina/wiki/Introduction).

## Installation

[lamina "0.5.0-beta15"]
[lamina "0.5.0-beta16"]

## Rationale

Expand Down Expand Up @@ -33,7 +33,7 @@ nil
> (map* dec ch)
<== [...]
> (enqueue ch 1 2 3)
:lamina/enqueued
<< ... >>
> (view-graph ch)
```

Expand Down
4 changes: 2 additions & 2 deletions src/lamina/connections.clj
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@
(defn client
"Layers a client-side request-response communication model on top of a bidirectional socket. Returns
a function which takes the `request` and optionally a `timeout` in milliseconds, and returns an
async-result representing the response.
async-promise representing the response.
`connection-generator` is a function which takes zero parameters, and returns a channel representing
a bidirectional socket, or an async-result which will be realized as a bidirectional socket. It will
a bidirectional socket, or an async-promise which will be realized as a bidirectional socket. It will
only be called once a request has been made, and whenever the socket channel subsequently closes.
If a request is made while there is not a live connection, it will be sent once the connection is opened.
Expand Down
10 changes: 5 additions & 5 deletions src/lamina/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
[lamina.core.result

result-channel
async-result?
async-promise?
with-timeout
expiring-result
timed-result
Expand Down Expand Up @@ -197,7 +197,7 @@
If more than two channels are specified, `siphon` becomes transitive. `(siphon a b c)` is equivalent to
`(siphon a b)` and `(siphon b c)`."
([src dst]
(if (async-result? src)
(if (async-promise? src)
(r/siphon-result src dst)
(ch/siphon src dst)))
([src dst & rest]
Expand All @@ -212,7 +212,7 @@
If more than two channels are specified, `join` becomes transitive. `(join a b c)` is equivalent to
`(join a b)` and `(join b c)`."
([src dst]
(if (async-result? src)
(if (async-promise? src)
(do
(r/siphon-result src dst)
(r/subscribe dst (r/result-callback (fn [_]) #(u/error src % false))))
Expand All @@ -222,10 +222,10 @@
(apply join dst rest)))

(defn error-value
"Returns the error-value of the channel or async-result if it's in an error state, and 'default-value'
"Returns the error-value of the channel or async-promise if it's in an error state, and 'default-value'
otherwise"
[x default-value]
(if (async-result? x)
(if (async-promise? x)
(r/error-value x default-value)
(ch/error-value x default-value)))

Expand Down
8 changes: 4 additions & 4 deletions src/lamina/core/graph/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
msg
(not grounded?))]

(when (r/async-result? result)
(when (r/async-promise? result)
(reset-meta! result
{:description {:type :queue, :name description}, :timestamp (now)}))

Expand Down Expand Up @@ -391,7 +391,7 @@
consumer-description (.description e)
result (enqueue-and-release lock state msg true)]

#_(when (r/async-result? result)
#_(when (r/async-promise? result)
(reset-meta! result
{:description {:type queue, :name consumer-description}}))

Expand Down Expand Up @@ -481,7 +481,7 @@
::split
(if-let [v (.get cancellations name)]

(if (r/async-result? v)
(if (r/async-promise? v)
::already-registered
::invalid-name)

Expand Down Expand Up @@ -790,7 +790,7 @@
(identical? ::split x)
(cancel (.split state) name)

(r/async-result? x)
(r/async-promise? x)
(l/with-lock lock
(q/cancel-receive (.queue state) x))

Expand Down
40 changes: 22 additions & 18 deletions src/lamina/core/operators.clj
Original file line number Diff line number Diff line change
Expand Up @@ -700,19 +700,21 @@
The returned channel will emit each message from `channel` only once the designated time has arrived.
This assumes the timestamp for each message is monotonically increasing."
[{:keys [timestamp task-queue auto-advance?]} channel]
[{:keys [timestamp task-queue auto-advance?]} ch]
(let [ch* (lamina.core.channel/channel)]

(p/run-pipeline auto-advance?

;; allow for consumption to be deferred until the topology is built
(fn [auto-advance?]

(receive-in-order channel
(bridge-in-order ch ch* "defer-onto-queue"
:wait-on-callback? true
:callback
(fn [msg]
(let [r (r/result-channel)
t (timestamp msg)]

(t/invoke-at task-queue t
(with-meta
(fn []
Expand All @@ -724,13 +726,12 @@

;; advance to the message entering the topology
(when auto-advance?
(t/advance-until task-queue t))
(t/advance-until task-queue t)
(when (drained? ch)
))

;; if we've auto-advanced, this should always be realized
r))))

(fn [_]
(close ch*)))
r)))))

ch*))

Expand Down Expand Up @@ -813,17 +814,19 @@
period (t/period)}}
ch]

;; ch -> dist -> intermediate -> aggregator -> out

;; distribution
(let [ch* (mimic ch)
(let [intermediate (mimic ch)
on-clearance (atom nil)
dist (distributor
{:facet facet
:on-clearance #(close ch*)
:on-clearance #(close intermediate)
:initializer (fn [facet-value ch]
(let [generated (->> ch
(generator facet-value)
(map* #(vector facet-value %)))]
(siphon generated ch*)
(siphon generated intermediate)
generated))})
propagator (-> dist meta ::propagator)]

Expand All @@ -838,7 +841,7 @@
(map re-nil (keys %))
(map re-nil (vals %))))]

(receive-all ch*
(receive-all intermediate
(fn [[facet msg]]
(let [facet (de-nil facet)
msg (de-nil msg)]
Expand All @@ -851,8 +854,9 @@
(doto (ConcurrentHashMap.)
(.put facet msg))))

;; the flush predicate returns true
(and (= (.size m) (count propagator))
;; we've filled up all available slots
(and (not (closed? ch))
(= (.size m) (count propagator))
(= (keys m) (dist/facets propagator))
(reset! aggregator (ConcurrentHashMap.))))
m)))]
Expand All @@ -876,10 +880,10 @@
(cancel))))))

;; hook up everything
(on-closed out #(close intermediate))
(on-closed intermediate #(do (flush-aggregator @aggregator) (close out)))
(on-error intermediate #(error out % false))
(on-error out #(error intermediate % false))
(join ch dist)
(on-closed out #(close ch*))
(on-closed ch* #(do (flush-aggregator @aggregator) (close out)))
(on-error ch* #(error out % false))
(on-error out #(error ch* % false))

out)))
10 changes: 5 additions & 5 deletions src/lamina/core/pipeline.clj
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
(defn- unwind-stages [idx stages remaining subscribe-expr unwrap?]
`(cond

(r/async-result? val##)
(r/async-promise? val##)
(let [value# (r/success-value val## ::unrealized)]
(if (identical? ::unrealized value#)
~(subscribe-expr idx)
Expand Down Expand Up @@ -234,9 +234,9 @@
"A means for composing asynchronous functions. Returns a function which will pass the value into the first
function, the result from that function into the second, and so on.
If any function returns an unrealized async-result, the next function won't be called until that value is realized.
The call into the pipeline itself returns an async-result, which won't be realized until all functions have completed.
If any function throws an exception or returns an async-result that realizes as an error, this will short-circuit all
If any function returns an unrealized async-promise, the next function won't be called until that value is realized.
The call into the pipeline itself returns an async-promise, which won't be realized until all functions have completed.
If any function throws an exception or returns an async-promise that realizes as an error, this will short-circuit all
calls to subsequent functions, and cause the pipeline's result to be realized as an error.
Loops and other more complex flows may be created if any stage returns a redirect signal by returning the result of
Expand All @@ -262,7 +262,7 @@
Defaults to false.
`:unwrap?` - if true, and the pipeline does not need to pause between streams, the pipeline will return an actual value
rather than an async-result.
rather than an async-promise.
`:with-bindings` - if true, conveys the binding context of the initial invocation of the pipeline into any deferred stages."
[& opts+stages]
Expand Down
4 changes: 2 additions & 2 deletions src/lamina/core/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@
(recur (cons c cs)))))))))))]

(cond
(r/async-result? x)
(r/async-promise? x)
x

(identical? :lamina/discarded x)
Expand Down Expand Up @@ -533,7 +533,7 @@
(recur (cons c cs))))))))))))]

(cond
(r/async-result? x)
(r/async-promise? x)
x

(identical? :lamina/discarded x)
Expand Down
6 changes: 3 additions & 3 deletions src/lamina/core/result.clj
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@
(defn result-callback [on-success on-error]
(ResultCallback. on-success on-error))

(defn async-result?
(defn async-promise?
"Returns true if `x` is a result."
[x]
(or
Expand Down Expand Up @@ -552,7 +552,7 @@
result)))

(defn merge-results
"Given n `results` returns a single async-result which will be realized as a sequence of all the realized
"Given n `results` returns a single async-promise which will be realized as a sequence of all the realized
results."
[& results]
(let [cnt (count results)
Expand All @@ -570,7 +570,7 @@
combined-result)

(let [r (first results)]
(if-not (async-result? r)
(if-not (async-promise? r)

;; not a result - set, decrement, and recur
(do
Expand Down
2 changes: 1 addition & 1 deletion src/lamina/executor/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
(fn [_]
(let [result (context/with-context (context/assoc-context :timer timer)
(f))]
(when (r/async-result? result)
(when (r/async-promise? result)
(t/mark-waiting timer))
result))
(fn [result]
Expand Down
2 changes: 1 addition & 1 deletion src/lamina/query/operators.clj
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
values (if (set? value)
value
(->> value :options vals (map normalize-for-comparison) set))]
#(contains? values %))))
#(contains? values (f %)))))

;;;

Expand Down
5 changes: 2 additions & 3 deletions src/lamina/stats.clj
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,12 @@
window
quantiles
task-queue]
:or {quantiles [50 75 95 99 99.9]
:or {quantiles [0.5 0.75 0.95 0.99 0.999]
task-queue (t/task-queue)
window (t/minutes 5)}
:as options}
ch]
(let [quantiles (map #(double (/ % 100)) quantiles)
sampler (sample/moving-sampler options)
(let [sampler (sample/moving-sampler options)
ch* (bridge-accumulate ch (mimic ch) "moving-quantiles"
(merge options
(number-accumulator "moving-quantiles" #(update sampler %))
Expand Down
13 changes: 7 additions & 6 deletions src/lamina/trace.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@
(map* distill-timing)

(q/query-stream
'[(zip
{:duration-quantiles [:durations concat moving-quantiles]
:calls [:durations concat rate rolling-sum]
:total-duration [:durations concat rolling-sum]
:sub-tasks [:sub-tasks concat (group-by :task [recur])]
})]
'[(group-by :task
[(zip
{:duration-quantiles [:durations concat moving-quantiles]
:calls [:durations concat rate rolling-sum]
:total-duration [:durations concat rolling-sum]
:sub-tasks [:sub-tasks concat recur]
})])]
options)))

;;;
Expand Down
2 changes: 1 addition & 1 deletion src/lamina/trace/instrument.clj
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
(try
(let [result# (context/with-context (context/assoc-context :timer timer#)
~invoke)]
(when (async-result? result#)
(when (async-promise? result#)
(t/mark-waiting timer#))
(run-pipeline result#
{:error-handler (fn [err#] (t/mark-error timer# err#))}
Expand Down
4 changes: 2 additions & 2 deletions src/lamina/trace/probe.clj
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@
;;;

(defn probe-result
"Allows an async-result to be treated like a probe-channel that only accepts a single value
"Allows an async-promise to be treated like a probe-channel that only accepts a single value
before deactivating."
[result]
(when-not (r/async-result? result)
(when-not (r/async-promise? result)
(throw (IllegalArgumentException. "probe-result must be given a result-channel")))
(reify
IEnqueue
Expand Down
2 changes: 1 addition & 1 deletion src/lamina/trace/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
{:error-handler (fn [ex] (t/mark-error timer ex))}
(fn [_]
(let [result (f)]
(reset! unwrap? (not (async-result? result)))
(reset! unwrap? (not (async-promise? result)))
result))
(fn [result]
(t/mark-return timer result)
Expand Down
Loading

0 comments on commit 0164b77

Please sign in to comment.