reduce

Accumulate collections of points and aggregate them using reducers, optionally within a moving time window

reduce [-every duration [-on duration-or-calendar-offset]]
  [-over duration]
  [-reset true|false]
  [-forget false]
  fieldname1=expr1 [, fieldnameN=exprN]
  [by field1, [field2, ...]]
Parameter Description Required?
-every The interval at which values will be computed, as a duration literal. A new result is produced each time this duration passes. No; if not specified, the upstream batch interval is used. If there is no upstream batch, a single result is produced after the stream has ended.
-over The moving time window over which the value will be computed, as a duration literal. This is typically some multiple of the -everyinterval (for example, reduce -every :minute: -over :hour: value=avg(value)updates a trailing hourly average every minute).

When -overis specified, results will only be produced for full windows of data. No results will be produced until -overhas passed after the first point. No result will be produced for any final points that span interval less than -over

If -overis :forever:, the reducer emits results cumulatively, over all points seen so far.

See Moving time windows for more information about moving time windows.
No; defaults to not using a moving window.
-reset Set this to 'false' to emit results cumulatively, over all points seen so far. No; defaults to resetting every interval.
-forget When grouping reducer results with 'by', set this to 'false' to cause results to be emitted for every value of 'by' that has been seen in previous batches. Values that do not appear in the current batch will report their 'empty' value (for example, count() will produce 0, avg() will produce null) No; defaults to forgetting.
-from When used with -over the start time of the earliest full window of data. No; if not specified, the earliest window begins with the earliest point or batch received.
-to When used with -over the end time of the last full window of data. No; if not specified, the last point or batch time stamp received is assumed.
-on A time alignment for -every It may be a duration or a calendar offset less than -every For example, -every :hour: -on :00:30:00:runs every hour on the half-hour, while -every :month: -on :day 10:runs a monthly computation on day 10 of the month. No; if -onis not specified, output batches are aligned with the UNIX epoch, as if from a batch node.
fieldname=expr An assignment expression, where exprcan be a reducer or a constant value. See Note 2. At least one
by One or more fields for which the assignment expression is computed separately.

If grouping fields are present, then the assignment expression is computed independently for each unique combination of values present in the grouping fields. See Processors for more about grouping with by.

No; if no grouping fields are present, then the assignment expression is computed over all points in the batch.
Note 1

The output points contain:

  • All the fields specified as assignment expressions or grouping fields in the arguments
  • Each of the grouping fields with the unique value(s) for which the assignment expressions were computed
  • When operating on batched points, every output point contains a time field containing the end time of the incoming batch.

When reduce operates on batched points (that is, when there is a batch processor preceding reduce), it generates a set of output points for each incoming batch of points, resetting its internal state after each batch. When the points flowing into reduce are not batched, output points are generated for all points when the stream ends.

See Field referencing for additional information relevant to this processor.

Note 2

In the assignment expression fieldname=expr, if expr is a reducer, the resulting point will have the return value of reducer's result() function as the value of field fieldname.

If expr is a constant value (Juttle constant, string literal, number), the resulting point will have field 'fieldname' with that value. This is commonly used to place "naming" fields into the data point, such as:

... | reduce name = 'pct90', value = percentile (value, 0.9)

Assignments that try to dereference fields from the incoming data point are invalid; the field values of the point being processed by reduce are only accessible in the context of a reducer.

For example, this is not valid Juttle, it would emit error "a is not defined" because it would try looking for a constant named 'a':

// INVALID JUTTLE
emit -points [ { a: 5, b: 7 }, { a: 12, b: 88 } ]
| reduce c = a + b

This is also not valid Juttle, it would error out when trying to add up two null values, because dereferencing of fields 'a' and 'b' would attempt to access such fields in the data point being created by the reduce, not the data point being processed by it, since the fields of incoming point are not in scope.

// INVALID JUTTLE
emit -points [ { a: 5, b: 7 }, { a: 12, b: 88 } ]
| reduce c = *'a' + *'b'

The legitimate way to achieve the desired result (compute a cumulative sum of field values for 'a' and 'b' over all points) would be to use reduce to get sums of 'a' and 'b' separately, with the built-in reducer sum(), and then add resulting field values in a put expression:

emit -points [ { a: 5, b: 7 }, { a: 12, b: 88 } ]
| reduce sum_a = sum(a), sum_b = sum(b)
| put c = sum_a + sum_b

More examples of using reduce:

Example: Trailing ten-minute average

// Trailing average example:
//
// Smooth a stream of random values occurring every 10 seconds
// by computing a trailing 10 minute average
// 
sub random() {
    emit 
      -from :2014-01-01: 
      -limit 500 
      -every :10s:
    | put name = 'random'
    | put value = (Math.random() - .5) * 20 + (Math.random() - .5) * 10 + 5
}
random
| reduce 
    -every :m: 
    -over :10m: avg = avg(value), value = last(value)
| split value, avg
| view timechart

Example: Superimposing yesterday's CPU usage over today's

// Day over Day graph example:
//
// display a graph of cpu usage superimposed over the previous day
// by using a moving window to get the value from 24 hours ago.
//
// The graph will begin at 2014-01-01, but we need to start the data
// source a day earlier so the windowed reducer can produce a point
// for 2014-01-01
//
read stochastic -source 'cdn' -from :2013-12-31: -to :2014-01-05: -daily .5 -source_type 'metric' name = 'cpu'
| reduce 
    -every :2h: value = avg(value)
| put 
    -over :25h: prev = first(value)
| filter time >= :2014-01-01: // discard unfilled window points from Dec.
| split value,prev
| view timechart

Example: Call records, day by day

// Call Record billing example:
//
// Call records arrive as a stream of points indicating duration in minutes.
// Your phone bill is the total of these, charged at $.05/minute, from the
// 20th of each month.
//
// This program displays a day-by-day running total of your bill:
//
sub call_record() {
  emit -from :2014-01-01: -limit 4000 -every :h:
  | put name = 'duration'
  | put value = (Math.random() - .5) * 20 + (Math.random() - .5) * 10 + 5
}
call_record
| batch 
    -every :month:  
    -on :day 20: 
| put name = 'total', value = sum(value) * 0.05
| view timechart
;
//
// This program displays a table with monthly totals
//
call_record
| reduce 
    -every :month:
    -on :day 20: value = sum(value) * 0.05
| put name = "total", value = Math.floor(value * 100) / 100 
| view table 

Example: Output a single point that counts the number of points in the data set (in this case 10)

emit -hz 10 -limit 10 
| reduce count() 
| view text

Example: Any number of assignments can be used and they can be any expression

emit -hz 10 -limit 10 
| reduce x = 3 * 4, count() 
| view text

Example: Aggregate the points in batches of three seconds

emit -every :0.1s: -limit 100 
| batch :2s:
| reduce count()
| view text

Example: Aggregate the ten points by unique values of y

emit -from :0: -limit 10
| put x = Math.random() * 2, y = Math.floor(x)
| reduce by y 
| view text

Example: Count the number of points per unique value of y (over the entire stream of points)

emit -from :0: -limit 10 
| put x = Math.random() * 2, y = Math.floor(x) 
| reduce cnt = count() by y 
| view text

Example: Count the number of points per unique value of y (over batches of five seconds of points)

emit -from :0: -limit 10 
| put x = Math.random() * 2, y = Math.floor(x) 
| batch :5s: 
| reduce cnt = count() by y 
| view text