streamsx.standard.relational

Stream transformations using relational predicates.

Classes

Aggregate(window, schema[, group, name]) Aggregation against a window of a structured schema stream.
Filter(stream[, non_matching, name]) Removes tuples from a stream by passing along only those tuples that satisfy a user-specified condition.
Functor(stream, schemas[, filter, name]) Transform input tuples into output ones, and optionally filter them.
Join(left, right, schemas[, match, name]) Correlate tuples from two streams that are based on user-specified match predicates and window configurations.
class streamsx.standard.relational.Aggregate(window, schema, group=None, name=None)

Bases: streamsx.spl.op.Map

Aggregation against a window of a structured schema stream.

The resuting stream (attribute stream) will contain aggregations of the window defined by the methods invoked against the instance.

The aggregation is invoked when the window is triggered. A grouped aggregation will result in N output tuples per aggregation where N is the number of groups in the window at the time of the trigger. A non-grouped window results in a single tuple per aggregation. A window punctuation mark is submitted after the tuples resulting from the aggregation.

In all examples, an input schema of tuple<int32 id, timestamp ts, float64 reading> is assumed representing an input stream of sensor readings.

Example of aggregating sensor readings to produce a stream containing the maximum, minimum and average reading over the last ten minutes updating every minute and grouped by sensor:

from streamsx.standard.relational import Aggregate

# Declare the window
win = readings.last(datetime.timedelta(minutes=10)).trigger(datetime.timedelta(minutes=1))

# Declare the output schema
schema = 'tuple<int32 id, timestamp ts, float64 max_reading, float64 min_reading, float64 avg_reading>'


# Invoke the aggregation
agg = Aggregate.invoke(win, schema, group='id')

# Declare the output attribute assignments.
agg.min_reading = agg.min('reading')
agg.max_reading = agg.max('reading')
agg.avg_reading = agg.average('reading')

# resulting stream is agg.stream
agg.stream.print()

When an output attribute is not assigned and it has a matching input attribute the value in the last tuple for the group is used. In the example above id will be set to the sensor identifier of the group and ts will be set to the timestamp of the most recent tuple for the group.

For the output attribute assignment methods that return a grouped value, such as count(), max(), when the window is not grouped the value is against all tuples in the window.

The aggregation is implemented using the spl.relational::Aggregate SPL primitive operator from the SPL Standard toolkit.

average(attribute)

Average of values for an input attribute.

Returns an output expression representing the average of values of the input attribute in the group.

Example:

# Get the average reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 avg_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.avg_reading = agg.average('reading')
Parameters:attribute (str) – Attribute name to find the average value of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
count()

Count of tuples in the group.

Returns an output expression of type int32 representing the number of tuples in the group.

Example:

# Count the number of tuples grouped by sensor id in the last minute
schema = 'tuple<int32 id, timestamp ts, int32 n>'
agg = Aggregate.invoke(s.last(datetime.timedelta(minutes=1)), schema, group='id')
agg.n = agg.count()
Returns:Output expression with the type int32.
Return type:Expression
count_all()

Count of all tuples in the window.

Returns:Output expression with the type int32.
Return type:Expression
count_groups()

Count of all groups in the window.

Returns:Output expression with the type int32.
Return type:Expression
first(attribute)

First value for an input attribute.

Returns an output expression representing the first (earliest) value of the input attribute in the group.

Example:

# Get the first reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 first_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.first_reading = agg.first('reading')
Parameters:attribute (str) – Attribute name to find the first value of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
interval_end()

Get the end of the current time-interval window.

New in version 1.2.

Returns:Output expression with the type timestamp.
Return type:Expression
interval_start()

Get the start of the current time-interval window.

New in version 1.2.

Returns:Output expression with the type timestamp.
Return type:Expression
static invoke(window, schema, group=None, name=None)

Invoke an aggregation against a window.

Parameters:
  • window (streamsx.topology.topology.Window) – Window to aggregate against.
  • schema (str,StreamSchema) – Schema of output stream containing aggregations.
  • group (str) – Attribute name to group aggregations.
  • name (str) – Invocation name, defaults to a generated name.
Returns:

Aggregate invocation.

Return type:

Aggregate

last(attribute)

Last value for an input attribute.

Returns an output expression representing the last (latest, most recent) value of the input attribute in the group.

Example:

# Get the last reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 last_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.last_reading = agg.last('reading')
Parameters:attribute (str) – Attribute name to find the first value of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
max(attribute)

Maximum value for an input attribute.

Returns an output expression representing the maximum value of the input attribute in the group.

Example:

# Get the maximum reading of the last ten tuples grouped by sensor id in the last minute
schema = 'tuple<int32 id, timestamp ts, float64 max_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.max_reading = agg.max('reading')
Parameters:attribute (str) – Attribute name to find the maximum value of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
min(attribute)

Minimum value for an input attribute.

Returns an output expression representing the minimum value of the input attribute in the group.

Example:

# Get the minimum reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 min_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.min_reading = agg.min('reading')
Parameters:attribute (str) – Attribute name to find the minimum value of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
pane_timing()

Get timing of a time-interval window pane.

The timing of a window pane triggering in relation to the enclosing operator’s watermark that is used for predicting pane completion.

  • “paneEarly”: The system has not yet predicted that it has seen all data which may contribute to a pane’s window.
  • “paneOnComplete”: The system predicts that it has seen all data which may contribute to a pane’s window.
  • “paneLate”: The system encountered new data for a pane’s window after predicting no more could arrive.

New in version 1.3.

Returns:Output expression with the type rstring.
Return type:Expression
std(attribute, sample=False)

Standard deviation of values for an input attribute.

Returns an output expression representing the standard deviation of values of the input attribute in the group.

Example:

# Get the standard deviation reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 sd_reading>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.sd_reading = agg.standard_('reading')
Parameters:
  • attribute (str) – Attribute name to find the average value of.
  • sample (bool) – True to use sample standard deviation otherwise population standard deviation is used.
Returns:

Output expression with the type of the input attribute.

Return type:

Expression

sum(attribute)

Sum of values for an input attribute.

Returns an output expression representing the sum of values of the input attribute in the group.

Example:

# Get the sum reading of the last ten tuples grouped by sensor id
# updating every input tuple.
schema = 'tuple<int32 id, timestamp ts, float64 sum_readings>'

agg = Aggregate.invoke(s.last(10), schema, group='id')
agg.sum_readings = agg.sum('reading')
Parameters:attribute (str) – Attribute name to find the sum of.
Returns:Output expression with the type of the input attribute.
Return type:Expression
class streamsx.standard.relational.Filter(stream, non_matching=False, name=None)

Bases: streamsx.spl.op.Invoke

Removes tuples from a stream by passing along only those tuples that satisfy a user-specified condition.

Non-matching tuples can be sent to a second optional output.

The schema transformation is implemented using the spl.relational::Filter SPL primitive operator from the SPL Standard toolkit.

Example with one output stream:

import streamsx.standard.relational as R
import streamsx.standard.utility as U

topo = Topology()
s = U.sequence(topo, iterations=4)
matches = R.Filter.matching(s, filter='seq>=2ul')

Example with matching and non matching streams:

topo = Topology()
s = U.sequence(topo, iterations=4)
matches, non_matches = R.Filter.matching(s, filter='seq<2ul', non_matching=True)
static matching(stream, filter, non_matching=False, name=None)

Filters input tuples to one or two output streams

Parameters:
  • stream – Input stream
  • filter (str) – Specifies that the condition that determines the tuples to be passed along by the Filter operator
  • non_matching (bool) – Non-matching tuples are sent to a second optional output stream
  • name (str) – Invocation name, defaults to a generated name.
Returns:

matching tuples (optional second stream for non matching tuples).

Return type:

Stream

class streamsx.standard.relational.Functor(stream, schemas, filter=None, name=None)

Bases: streamsx.spl.op.Invoke

Transform input tuples into output ones, and optionally filter them.

If you do not filter an input tuple, any incoming tuple results in a tuple on each output stream

The schema transformation is implemented using the spl.relational::Functor SPL primitive operator from the SPL Standard toolkit.

Example with schema transformation and two output streams:

import streamsx.standard.relational as R
import streamsx.standard.utility as U

topo = Topology()
s = U.sequence(topo, iterations=10) # schema is 'tuple<uint64 seq, timestamp ts>'
fo = R.Functor.map(s, [StreamSchema('tuple<uint64 seq>'),StreamSchema('tuple<timestamp ts>')])
seq_stream = fo.outputs[0] # schema is 'tuple<uint64 seq>' only
ts_stream = fo.outputs[1] # schema is 'tuple<timestamp ts>' only

Example with filter some tuples:

topo = Topology()
s = U.sequence(topo, iterations=5)
fo = R.Functor.map(s, StreamSchema('tuple<uint64 seq>'), filter='seq>=2ul')
fstream = fo.outputs[0]
fstream.print()
static map(stream, schema, filter=None, name=None)

Map input stream schema to one or more output schemas

Parameters:
  • stream – Input stream
  • schema (str,StreamSchema) – Schema of output stream(s).
  • filter (str) – Specifies the condition that determines which input tuples are to be operated on.
  • name (str) – Invocation name, defaults to a generated name.
Returns:

Functor invocation.

Return type:

Functor

class streamsx.standard.relational.Join(left, right, schemas, match=None, name=None)

Bases: streamsx.spl.op.Invoke

Correlate tuples from two streams that are based on user-specified match predicates and window configurations.

The correlation is implemented using the spl.relational::Join SPL primitive operator from the SPL Standard toolkit.

static lookup(reference, reference_key, lookup, lookup_key, schema, match=None, name=None)

Used to correlate tuples from two streams that are based on user-specified match predicates and window configurations.

When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple is produced for each match.

The reference_key and lookup_key parameters are used to specify equijoin match predicates, which result in using a hash-based join implementation.

Parameters:
  • reference (streamsx.topology.topology.Window) – Input window
  • reference_key (str) – Name of the attribute
  • lookup (Stream) – Input stream
  • lookup_key (str) – Name of the attribute
  • schema (str,StreamSchema) – Schema of output stream.
  • match (str) – Specifies an expression to be used for matching the tuples. The expression might refer to attributes from both input ports. When this parameter is omitted, the default value of true is used.
  • name (str) – Invocation name, defaults to a generated name.
Returns:

Join invocation.

Return type:

Join