join

Create new points from the points of multiple input streams.

join
   [-nearest :duration: | -zip <true|:duration:> | -once true]
   [-table inputNumber, inputNumber, ...]
   [-outer inputNumber]
   [fieldName1, fieldName2,...fieldNameN]

It should be placed after a merge point in a parallel Juttle expression, like this:

(
  stream-1;
  stream-2;
)
| join [options]
| ... // process joined stream

For example:

read source1
             \
               join | reduce ...
             /
read source2

In its simplest form, join aligns its input streams by time stamp and produces new output points by unioning the fields of successive matching input points.

To join only points with matching field values, use join fieldNameN where fieldNameN serves as a foreign key in SQL join terms.

By default, an inner join is performed; use -outer option to do an outer join instead. Left or right outer join is achieved by specifying the input number of the "base" stream.

The -nearest option causes a streaming many-to-one join, while -zip option causes a streaming one-to-one join.

The time intervals between points are ignored by default when joining streams; set :duration: parameter in -nearest or -zip to join only points that are close in time.

For a non-streaming join that runs over the entire set of inputs, use -once option.

For a join that treats one or more inputs as a lookup table with timeless points, akin to a SQL relational join, use -table option.

Join can be used on a single stream, see Notes.

Parameter Description Required?
-nearest :duration: Streaming many-to-one join of each input point or batch with the points or batches on the other inputs having the nearest time stamp.

If one stream has fewer points than the others, its points may be joined multiple times as needed so that all points or batches on the other inputs participate in a join. The duration is the maximum acceptable difference in time stamps between matched groups of points, and will cause points to be dropped rather than joined if the gap in time is too large.
No; defaults to unbounded duration; mutually exclusive with -zip and -once.
-zip <true | :duration:> Streaming one-to-one join of each input point or batch with the points or batched on the other inputs having the nearest time stamp.

If one stream has more points than the others, extra points are dropped, such that any point is joined at most once.

If specified, the durationis the maximum acceptable difference in time stamps between matched groups of points. If instead -zip trueis specified, there is no limit to the difference in time stamps.
No; mutually exclusive with -nearest and -once.
-once true Non-streaming join over the entire set of input points, after all points have been received.

The time field may be specified as a join field if desired, giving results similar to a streaming one-to-one join with exactly matching time stamps. There is no analogue to the duration parameter of -zip and -nearest for non-streaming joins.
No; mutually exclusive with -nearest and -zip.
-table inputNumber1 [, 2, ...] Treat the specified input streams as timeless, i.e. as passive tables with no associated time stamp; input streams are numbered top to bottom (or left to right) from 1 in the program. No
-outer inputNumber Perform an outer join preserving the specified input stream. Input streams are numbered top to bottom (or left to right) from 1 in the program (that is, -outer 1specifies an outer left join where the first series in the flowgraph is preserved). When a point on the outer input does not have a matching join key on the other inputs, -outerforwards it unchanged, or partially joined against any matching inputs. Without -outer an inner join is performed, which only produces points when all inputs match. No; defaults to inner join.
fieldnameN The fields to match when joining points. No; by default, points will be unioned without matching on fields.
Notes
Single Stream Join

If used on a single stream, its points are grouped by their time stamp or batch, and all points in a group are unioned to create an output point. If optional joinFields are specified for the single stream, they act like group-by fields, and there is an output point for each distinct value of the joinFields among the points.

Multiple Stream Join

Options to join can cause it to operate like an SQL relational join across multiple input streams, where batches of points on an input are analogous to SQL tables, and individual points are analogous to rows in those tables. An optional list of fieldNames specifies relational join keys between the input batches. When points with matching key values appear on each input, new points are produced containing the union of these matching points' fields.

Batched vs Unbatched Join

Streaming joins are computed continually as points or batches of points arrive at the inputs. When a stream is batched, all points in a batch are treated as a group for joining (as if they all had the batch's ending time stamp). For an unbatched stream, all points having the same time stamp are treated as a batch. It is okay to join a batched stream with an unbatched stream.

Time Matching in Many-to-One and One-to-One Join

To require exact time stamp matching for batches to be joined, specify time as a join key along with any other join keys. If time is not listed as a join key, inexact time matches are allowed. Equivalent time stamps will always be joined when they are present. A streaming one-to-one join without exact time stamp matching is specified with -zip true or -zip :duration:. A streaming many-to-one join with -nearest :duration: (or simply as join with no options). A one-to-one join matches each successive input batch with one on its other inputs, matching by nearest time stamp, and drops any extra input batches that do not have matches. A many-to-one join performs a similar matching, but will re-use an input batch if it remains the best match.

Both -zip and -nearest accept a duration limiting the difference in time stamps allowed between matched batches, with :0: behaving as if time had been specified as a join key. When time stamps do not match exactly, an output point will be given the maximum time stamp of its input points. A join without -zip or -nearest or time as a join key is implicitly a -nearest join with no limiting duration.

Join with Table Input

A streaming join can treat some of its input batches as passive tables with no associated time stamp. For example, this is useful for joining a stream of event points having user IDs against a table of user names, annotating the event point with its particular user name. The -table option lists which inputs are to be treated in this way. The "table" input can be timeless (i.e. its data points do not contain a time field at all), or if it has timestamps, they will be ignored when joining.

A batch on a table input remains there until updated by a later batch, and joins are always performed against the most recently received complete batch. Joins are only triggered by the arrival of new points on a timeful input. Because tables are timeless, a join is never triggered by an update to a table, and no guarantees can be made about precisely when (in stream time) an update will displace a current table batch.

Non-streaming Join

A non-streaming join over all points at once (at the end of the run) can be specified by the -once flag. The join is not computed until all points have arrived at the inputs (that is, it only makes sense for historic queries or bounded live queries). Time stamps will be ignored unless the time field is specified as a join field, which forces time stamps to be matched exactly between the input sets.

Note: -nearest, -zip, and -once are mutually exclusive. If none are specified, the behavior is as if -nearest was specified with an unbounded duration.

Join Examples

Example: merging metric points

One use for join is with a source that provides several different metrics as name/value pairs on separate points having identical time stamps. Use join to merge these onto a single point for each time stamp:

emit -points [
  {time:"2015-11-26T11:22:33.000Z", "name":"furlongs", value:25},
  {time:"2015-11-26T11:22:33.000Z", "name":"fortnights", value:7},
  {time:"2015-11-27T10:20:50.000Z", "name":"furlongs", value:34},
  {time:"2015-11-27T10:20:50.000Z", "name":"fortnights", value:5}
]
  | put *name = value      // turn {name:"furlongs", value:20} into {furlongs:20}
  | remove name, value       // those fields are no longer needed
  | join                   // combine points that have the same time value
  | put speed = furlongs/fortnights

Example: streaming one-to-one relational join

In this example, two input streams containing distance and time measurements are joined so that a rate can be computed:

// Join 1-1
//
// This demonstrates a synchronized join between two metric
// streams, allowing us to "divide" one stream by another
// to create a new metric. It does this by creating new points
// containing one metric from each input stream.
//
// one-to-one joining is triggered by 'join -zip <duration>'.
// an input point is matched to the point on the other
// input that is nearest (within <duration>) without being in the future.
// If we had specified <duration> of :0s:, timestamps would need to be equal.
// We instead specified -zip true, so any offset is allowed (but exact
// matches will always be made when they are present)
//
( emit 
  | put furlongs = Math.random()
  ; 
  emit -from :+0.5 s: 
  | put fortnights = Math.random()
) | join -zip true 
| put speed = furlongs / fortnights 
| view table

The output is the result of the join: points containing a new metric: "speed".

Example: streaming many-to-one relational join

In this example, two emits create two input streams for the join:

  • A stream of "parts" that include a board-ID
  • A "table" of board-id -> board name mappings. The example strips their time stamps away so that they are treated as a group (similar results occur if they all have the same time stamp)

Joining the stream of parts against the table of board IDs creates output points containing both the part name and its board name.

// 9 parts having board_ids are joined against table of board_id->boardname
( emit -hz 1000 -limit 9
  | put part = "part-"+Number.toString(count()), board_id = count()%3 + 1, t = time-:now:  
  ; 
  emit -from :0: -hz 1000 -limit 3 
| put board = "board-" + Number.toString(count()), board_id = count() 
| remove time
) | join board_id 
| batch 100 
| keep t, part, board 
| view table

The input to the join is two tables: 3 rows of board_id->board names, and 9 rows of parts (table commands are not included in the Juttle above).

The output is the result of the join: points containing part names and board names.

Example: streaming four-way right outer join

In this example, a stream of IDs (on points replayed by emit -points) is simultaneously outer joined against three tables of personal information.

Joining the stream of parts against the table of board IDs creates output points containing both the part name and its board name.

// 4-way right outer join of a point stream of ids against  tables of personal information.
//
// The points in the "tables" all have the same timestamp.
// For the join, the ID in each emit point
// is matched against each table, and an output point is created that is the union of all 
// matching points. This demonstrates partial joins when not all tables have an entry for 
// an ID. There are no matches at all for ID 5, so that point is passed through unchanged.
//---------------------------------------------------------------------------
    const name = [
      {time:"1970-01-01T00:00:00.000Z", "id":1, "name":"fred"},
      {time:"1970-01-01T00:00:00.000Z", "id":2, "name":"wilma"},
      {time:"1970-01-01T00:00:00.000Z", "id":3, "name":"dino"},
      {time:"1970-01-01T00:00:00.000Z", "id":4, "name":"barney"},
      {time:"1970-01-01T00:00:00.000Z", "id":6, "name":"bambam"},
       ]
    ;
    const haircolor = [
      {time:"1970-01-01T00:00:00.000Z", "id":1, "haircolor":"black"},
      {time:"1970-01-01T00:00:00.000Z", "id":2, "haircolor":"orange"},
      {time:"1970-01-01T00:00:00.000Z", "id":4, "haircolor":"blonde"},
      {time:"1970-01-01T00:00:00.000Z", "id":6, "haircolor":"blonde"}
       ]
    ;
    const hobby = [
       {time:"1970-01-01T00:00:00.000Z", "id":1, "hobby":"bowling"},
       {time:"1970-01-01T00:00:00.000Z", "id":3, "hobby":"singing"},
       {time:"1970-01-01T00:00:00.000Z", "id":6, "hobby":"home improvement"},
       ]
    ;
    ( emit -points name
      ;
      emit -points haircolor
      ;
      emit -points hobby
      ;
      emit -from :0: -limit 6
      | put id = count()
    ) | join -outer 4 id
    | remove time, type
    | view table