This UDF aggregates incoming points by groups or by batches (depending on input data type).
Files in this directory represent AggregateUDF usage.
This UDF can be used with stream input edge only.
@streamAggregateUDF()
.aggregate('last(idle) as idle.last')
.aggregate('mean(idle) as idle.mean')
.aggregate('last(interrupt) as interrupt.last')
.aggregate('mean(interrupt) as interrupt.mean')
.aggregate('last(nice) as nice.last')
.aggregate('mean(nice) as nice.mean')
.aggregate('last(softirq) as softirq.last')
.aggregate('mean(softirq) as softirq.mean')
.aggregate('last(steal) as steal.last')
.aggregate('mean(steal) as steal.mean')
.aggregate('last(system) as system.last')
.aggregate('mean(system) as system.mean')
.aggregate('last(user) as user.last')
.aggregate('mean(user) as user.mean')
.aggregate('last(wait) as wait.last')
.aggregate('mean(wait) as wait.mean')
.timeAggregateRule('last')
.emitTimeout(10s)
.tolerance(1s)
aggregate-- defines one aggregation using syntax:<aggregateFunction>(<fieldName>) as <resultFieldName>. Currently available aggregate functions are:firstlastminmaxmean
timeAggregateRule-- defines aggregate rule for result timestamp. It takes one of available aggregate functions as an argumentemitTimeout-– UDF accumulates several points before processing.emitTimeoutproperty defines timeout between two sequential processing momentstolerance(optional) -- defines time interval between the first and the last points in aggregating batch. Defaulttolerancevalue is0swhich means that points aggregated together have equal timestamps.
This UDF can be used with batch input edge only. Its properties do not contain
emitTimeout and tolerance properties as this UDF aggregate every batch it
receives separately.
@batchAggregateUDF()
.aggregate('last(idle) as idle.last')
.aggregate('mean(idle) as idle.mean')
.aggregate('last(interrupt) as interrupt.last')
.aggregate('mean(interrupt) as interrupt.mean')
.aggregate('last(nice) as nice.last')
.aggregate('mean(nice) as nice.mean')
.aggregate('last(softirq) as softirq.last')
.aggregate('mean(softirq) as softirq.mean')
.aggregate('last(steal) as steal.last')
.aggregate('mean(steal) as steal.mean')
.aggregate('last(system) as system.last')
.aggregate('mean(system) as system.mean')
.aggregate('last(user) as user.last')
.aggregate('mean(user) as user.mean')
.aggregate('last(wait) as wait.last')
.aggregate('mean(wait) as wait.mean')
.timeAggregateRule('last')
Just run the following in the current directory:
$ docker-compose up
Now you can use kapacitor commands right from your host system. For example,
you can run cpu.tick and watch the results:
$ kapacitor define cpu_task -tick cpu.tick
$ kapacitor enable cpu_task
$ kapacitor watch cpu_task
After that you will able to see points coming from UDF.
To remove intermediate build container, call:
$ docker image prune --filter "label=stage=builder" --filter "label=project=aggregate_udf_example" --force
This example uses number of Docker containers:
- kapacitor-udf -- container with UDF. Uses Docker image built from Dockerfile.
- collectd -- metrics producer. Sends them to InfluxDB. In theory, can be replaced.
- indluxdb -- InfluxDB container. Parses Graphite data format coming from collectd. Provides metrics for Kapacitor.
- kapacitor -- Kapacitor container. Subscribes to InfluxDB to receive metrics, processes them with TICK scripts, communicates with UDF.