A tour of Riemann : Coalesce
A small article about the Riemann coalesce stream.
How it works ?
The API doc is pretty clear. Coalesce remembers the most recent event for each host/service, and send every N seconds a vector containing the most recent states.
For example:
(streams
(with :ttl 60
(coalesce 10
#(info %))))
-
At time t = 1 : Riemann receives `
{:host "foo" :service "bar" :metric 10 :time 1}
Coalesce instantly emits :
[{:host "foo" :service "bar" :metric 20 :time 1}]
Coalesce will always emit the first event it receives.
-
At time t = 4 : Riemann receives
{:host "foo" :service "foobar" :metric 12 :time 4}
-
At time t = 7 : Riemann receives
{:host "foo" :service "bar" :metric 30 :time 7}
-
At time t = 11 : Coalesce emits
[{:host "foo" :service "foobar" :metric 12 :time 4} {:host "foo" :service "bar" :metric 30 :time 7}]
As you can see, coalesce emits the last event seen for each host/service.
-
At time t = 13 : Riemann receives
{:host "foo" :service "bar" :metric 40 :time 13}
-
At time t = 21 : Coalesce emits
[{:host "foo" :service "foobar" :metric 12 :time 4} {:host "foo" :service "bar" :metric 40 :time 13}]
Again, coalesce emits the last states for each host and service.
Now, an example.
I want to monitor a Cassandra cluster, and fire an email if the cpu mean
of the cluster is greater than 60.
The code
First, create a mycorp/system/cpu.clj
file:
(ns mycorp.system.cpu
"check cpu"
(:require [riemann.config :refer :all]
[riemann.streams :refer :all]
[riemann.test :refer :all]
[riemann.folds :as folds]
[mycorp.output.email :as email]
[clojure.tools.logging :refer :all]))
(def cpu-mean-alert-cassandra
"A stream checking if the cpu mean for all hosts with service = `cpu` and tagged `cassandra` is > to 60"
;; filter by services and tags
(where (and (tagged "cassandra") (service "cpu"))
;; every 10 seconds, send the last state for each host/service
;; :service will always be "cpu" so we only have differents hosts on events
(coalesce 10
;; apply mean using smap to compute the cpu mean
(smap folds/mean
;; update the event, remove :host and update the description
(with {:host nil :service "cassandra-cpu-mean"}
;; check if mean > 60
(where (> (:metric event) 60)
;; send email
(io (email/email "foo@mcorbin.fr"))
;; tap for tests
(tap :cpu-mean-alert-tap)))))))
(tests
(deftest cpu-mean-alert-test
(let [result (inject! [mycorp.system.cpu/cpu-mean-alert-cassandra]
[{:host "foo"
:service "cpu"
:metric 65
:tags ["cassandra"]
:ttl 60
:time 1}
{:host "bar"
:service "cpu"
:metric 50
:tags ["cassandra"]
:ttl 50
:time 2}
;; not tagged
{:host "baz"
:service "cpu"
:metric 99
:tags []
:ttl 60
:time 3}
{:host "foobar"
:service "cpu"
:metric 95
:tags ["cassandra"]
:ttl 60
:time 12}
{:host "foobar"
:service "riemann"
:metric 95
:tags []
:ttl 60
:time 22}])]
(is (= (:cpu-mean-alert-tap result)
[{:service "cassandra-cpu-mean"
:metric 65
:tags ["cassandra"]
:ttl 60
:time 1}
{:service "cassandra-cpu-mean"
:metric 70
:tags ["cassandra"]
:ttl 60
:time 12}])))))
(Don’t forget to add cpu.clj
file in your riemann.config file ;))
In the cpu-mean-alert-cassandra
stream, i first filter Cassandra/cpu events with where
, and use coalesce 10
(so coalesce will emit a vector of events every 10 seconds).
After that, i use smap
to apply folds/mean
on the vector of events emitted by coalesce. I now have an event representing the cpu mean
of the cluster.
I use with
to update the event, and where
again to check the mean value against a threshold.
If the mean is > to 60, i fire an email.
Easy !
More use cases
Use coalesce when you want to aggregate common events from multiple hosts.
In the previous example, i used folds/mean
to calculate the mean, but i could do anything i want, for example:
-
Get max, min etc… values.
-
Check for unbalanced values between hosts, for example : why do i have 10 nodes at 30 % CPU but one constantly at 95 % ? I used coalesce to detects unbalanced partitions in Kafka topics (caused by a bad partition key) for example.
-
…
Add a comment
If you have a bug/issue with the commenting system, please send me an email (my email is in the "About" section).