-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdev.clj
153 lines (133 loc) · 4.7 KB
/
dev.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
(ns dev
"Tools for interactive development with the REPL. This file should
not be included in a production build of the application.
Call `(reset)` to reload modified code and (re)start the system.
The system under development is `system`, referred from
`com.stuartsierra.component.repl/system`.
See also https://github.com/stuartsierra/component.repl"
(:require
[clojure.java.io :as io]
[clojure.java.javadoc :refer [javadoc]]
[clojure.pprint :refer [pprint]]
[clojure.reflect :refer [reflect]]
[clojure.repl :refer [apropos dir doc find-doc pst source]]
[clojure.set :as set]
[clojure.spec.alpha :as s]
[clojure.string :as str]
[clojure.test :as test]
[clojure.tools.namespace.repl :refer [refresh refresh-all clear]]
[com.stuartsierra.component :as component]
[com.stuartsierra.component.repl :refer [reset set-init start stop system]]
[criterium.core :as criterium]
[org.purefn.kurosawa.config :as conf]
[org.purefn.kurosawa.log.core :as klog]
[org.purefn.river :as river]
[org.purefn.river.batch :as batch]
[org.purefn.river.flush :as flush]
[org.purefn.river.serdes.nippy :as serdes]
[taoensso.timbre :as log])
(:import [java.io File]
[java.util UUID]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]))
;; Do not try to load source code from 'resources' directory
(clojure.tools.namespace.repl/set-refresh-dirs "dev" "src" "test")
;;--------------------------------------------------------------------------------
;; Kafka Producer
(defrecord Producer
[producer config]
component/Lifecycle
(start [this]
(let [{:keys [config producer]} this]
(if producer
this
(assoc this :producer (KafkaProducer. config
(serdes/nippy-serializer)
(serdes/nippy-serializer))))))
(stop [this]
(when producer
(.close producer))
(assoc this :producer nil)))
(defn producer
[]
(map->Producer
{:config {"bootstrap.servers" "localhost:9092"
"client.id" (str (UUID/randomUUID))
"acks" "all"
"retries" (Integer. 0)
"batch.size" (Integer. 16384)
"linger.ms" 1
"max.block.ms" (Integer. 1000)
"request.timeout.ms" (Integer. 1000)
"compression.type" "none"
"buffer.memory" 33554432}}))
(defn send-record
[^Producer producer topic key value]
(.send (:producer producer) (ProducerRecord. topic key value)))
(defn send-same-partition
[]
(send-record (:producer system) "firefly" 444 (UUID/randomUUID)))
(defn send-guid
[]
(send-record (:producer system) "firefly" (UUID/randomUUID) (UUID/randomUUID)))
;;--------------------------------------------------------------------------------
;; System
(defn to-map
[r]
{:topic (.topic r)
:partition (.partition r)
:offset (.offset r)
:timestamp (.timestamp r)
:timetamp-type (.timestampType r)
:serialized-key-size (.serializedKeySize r)
:serialized-value-size (.serializedValueSize r)
:key (.key r)
:value (.value r)
:headers (.headers r)})
(def cr nil)
(defn file-writer
[{:keys [file]} state records commit]
(with-open [w (io/writer file :append true)]
(doseq [{:keys [key value] :as r} records]
(def cr r)
(.write w (str key ":" value "\n"))))
(commit))
(defn batch-writer
[{:keys [file]} records]
(log/info "got" (count records))
(with-open [w (io/writer file :append true)]
(.write w (reduce
(fn [acc val]
(str acc val "\n"))
""
records))))
(def processor
(-> batch-writer
(flush/flush)
(flush/seen 5)
(flush/timed 1000)
(flush/max-records 10)
(flush/accumulate)
(flush/transform (comp
(filter (constantly false))
(map :value)))))
(defn dev-system
"Constructs a system map suitable for interactive development."
[]
(component/system-map
:producer (producer)
:consumer (component/using
(batch/batch-consumer
(assoc (batch/default-config)
::batch/timeout 5000
::batch/bootstrap-servers "localhost:9092"
::batch/topics ["firefly"]
::batch/group-id "serenity"
::batch/max-poll-records 1
::batch/max-poll-interval-ms 300000)
#'processor)
[:file])
:file (File. "./simon.txt")
))
(set-init (fn [_] (let [sys (dev-system)]
(klog/init-dev-logging sys)
sys)))