August 9, 2017

A tour of Riemann : Coalesce

A small article about the Riemann coalesce stream.

How it works ?

The API doc is pretty clear. Coalesce remembers the most recent event for each host/service, and send every N seconds a vector containing the most recent states.

For example:

  (with :ttl 60
    (coalesce 10
     #(info %))))
  • At time t = 1 : Riemann receives `

{:host "foo" :service "bar" :metric 10 :time 1}

Coalesce instantly emits :

[{:host "foo" :service "bar" :metric 20 :time 1}]

Coalesce will always emit the first event it receives.

  • At time t = 4 : Riemann receives

{:host "foo" :service "foobar" :metric 12 :time 4}
  • At time t = 7 : Riemann receives

{:host "foo" :service "bar" :metric 30 :time 7}
  • At time t = 11 : Coalesce emits

[{:host "foo" :service "foobar" :metric 12 :time 4} {:host "foo" :service "bar" :metric 30 :time 7}]

As you can see, coalesce emits the last event seen for each host/service.

  • At time t = 13 : Riemann receives

{:host "foo" :service "bar" :metric 40 :time 13}
  • At time t = 21 : Coalesce emits

[{:host "foo" :service "foobar" :metric 12 :time 4} {:host "foo" :service "bar" :metric 40 :time 13}]

Again, coalesce emits the last states for each host and service.

Now, an example. I want to monitor a Cassandra cluster, and fire an email if the cpu mean of the cluster is greater than 60.

The code

First, create a mycorp/system/cpu.clj file:

(ns mycorp.system.cpu
  "check cpu"
  (:require [riemann.config :refer :all]
            [riemann.streams :refer :all]
            [riemann.test :refer :all]
            [riemann.folds :as folds]
            [ :as email]
            [ :refer :all]))

(def cpu-mean-alert-cassandra
  "A stream checking if the cpu mean for all hosts with service = `cpu` and tagged `cassandra` is > to 60"
  ;; filter by services and tags
  (where (and (tagged "cassandra") (service "cpu"))
  ;; every 10 seconds, send the last state for each host/service
  ;; :service will always be "cpu" so we only have differents hosts on events
    (coalesce 10
      ;; apply mean using smap to compute the cpu mean
      (smap folds/mean
        ;; update the event, remove :host and update the description
        (with {:host nil :service "cassandra-cpu-mean"}
          ;; check if mean > 60
          (where (> (:metric event) 60)
            ;; send email
            (io (email/email ""))
            ;; tap for tests
            (tap :cpu-mean-alert-tap)))))))

  (deftest cpu-mean-alert-test
    (let [result (inject! [mycorp.system.cpu/cpu-mean-alert-cassandra]
                          [{:host "foo"
                            :service "cpu"
                            :metric 65
                            :tags ["cassandra"]
                            :ttl 60
                            :time 1}
                           {:host "bar"
                            :service "cpu"
                            :metric 50
                            :tags ["cassandra"]
                            :ttl 50
                            :time 2}
                           ;; not tagged
                           {:host "baz"
                            :service "cpu"
                            :metric 99
                            :tags []
                            :ttl 60
                            :time 3}
                           {:host "foobar"
                            :service "cpu"
                            :metric 95
                            :tags ["cassandra"]
                            :ttl 60
                            :time 12}
                           {:host "foobar"
                            :service "riemann"
                            :metric 95
                            :tags []
                            :ttl 60
                            :time 22}])]
      (is (= (:cpu-mean-alert-tap result)
            [{:service "cassandra-cpu-mean"
              :metric 65
              :tags ["cassandra"]
              :ttl 60
              :time 1}
             {:service "cassandra-cpu-mean"
              :metric 70
              :tags ["cassandra"]
              :ttl 60
              :time 12}])))))

(Don’t forget to add cpu.clj file in your riemann.config file ;))

In the cpu-mean-alert-cassandra stream, i first filter Cassandra/cpu events with where, and use coalesce 10 (so coalesce will emit a vector of events every 10 seconds).

After that, i use smap to apply folds/mean on the vector of events emitted by coalesce. I now have an event representing the cpu mean of the cluster.

I use with to update the event, and where again to check the mean value against a threshold. If the mean is > to 60, i fire an email.

Easy !

More use cases

Use coalesce when you want to aggregate common events from multiple hosts. In the previous example, i used folds/mean to calculate the mean, but i could do anything i want, for example:

  • Get max, min etc…​ values.

  • Check for unbalanced values between hosts, for example : why do i have 10 nodes at 30 % CPU but one constantly at 95 % ? I used coalesce to detects unbalanced partitions in Kafka topics (caused by a bad partition key) for example.

  • …​

Event expiration

Coalesce takes care to expire events. From the API

When events expire, they are included in the emitted sequence of events once, and removed from the state table thereafter.

This stream is perfect for monitoring distributed systems.

Code here.

Tags: devops english

Add a comment

If you have a bug/issue with the commenting system, please send me an email (my email is in the "About" section).

Top of page