streamsx.standard.relational¶
Stream transformations using relational predicates.
Classes
Aggregate (window, schema[, group, name]) |
Aggregation against a window of a structured schema stream. |
Filter (stream[, non_matching, name]) |
Removes tuples from a stream by passing along only those tuples that satisfy a user-specified condition. |
Functor (stream, schemas[, filter, name]) |
Transform input tuples into output ones, and optionally filter them. |
Join (left, right, schemas[, match, name]) |
Correlate tuples from two streams that are based on user-specified match predicates and window configurations. |
-
class
streamsx.standard.relational.
Aggregate
(window, schema, group=None, name=None)¶ Bases:
streamsx.spl.op.Map
Aggregation against a window of a structured schema stream.
The resuting stream (attribute
stream
) will contain aggregations of the window defined by the methods invoked against the instance.The aggregation is invoked when the window is triggered. A grouped aggregation will result in N output tuples per aggregation where N is the number of groups in the window at the time of the trigger. A non-grouped window results in a single tuple per aggregation. A window punctuation mark is submitted after the tuples resulting from the aggregation.
In all examples, an input schema of
tuple<int32 id, timestamp ts, float64 reading>
is assumed representing an input stream of sensor readings.Example of aggregating sensor readings to produce a stream containing the maximum, minimum and average reading over the last ten minutes updating every minute and grouped by sensor:
from streamsx.standard.relational import Aggregate # Declare the window win = readings.last(datetime.timedelta(minutes=10)).trigger(datetime.timedelta(minutes=1)) # Declare the output schema schema = 'tuple<int32 id, timestamp ts, float64 max_reading, float64 min_reading, float64 avg_reading>' # Invoke the aggregation agg = Aggregate.invoke(win, schema, group='id') # Declare the output attribute assignments. agg.min_reading = agg.min('reading') agg.max_reading = agg.max('reading') agg.avg_reading = agg.average('reading') # resulting stream is agg.stream agg.stream.print()
When an output attribute is not assigned and it has a matching input attribute the value in the last tuple for the group is used. In the example above
id
will be set to the sensor identifier of the group andts
will be set to the timestamp of the most recent tuple for the group.For the output attribute assignment methods that return a grouped value, such as
count()
,max()
, when the window is not grouped the value is against all tuples in the window.The aggregation is implemented using the
spl.relational::Aggregate
SPL primitive operator from the SPL Standard toolkit.-
average
(attribute)¶ Average of values for an input attribute.
Returns an output expression representing the average of values of the input attribute in the group.
Example:
# Get the average reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 avg_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.avg_reading = agg.average('reading')
Parameters: attribute (str) – Attribute name to find the average value of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
count
()¶ Count of tuples in the group.
Returns an output expression of type
int32
representing the number of tuples in the group.Example:
# Count the number of tuples grouped by sensor id in the last minute schema = 'tuple<int32 id, timestamp ts, int32 n>' agg = Aggregate.invoke(s.last(datetime.timedelta(minutes=1)), schema, group='id') agg.n = agg.count()
Returns: Output expression with the type int32
.Return type: Expression
-
count_all
()¶ Count of all tuples in the window.
Returns: Output expression with the type int32
.Return type: Expression
-
count_groups
()¶ Count of all groups in the window.
Returns: Output expression with the type int32
.Return type: Expression
-
first
(attribute)¶ First value for an input attribute.
Returns an output expression representing the first (earliest) value of the input attribute in the group.
Example:
# Get the first reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 first_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.first_reading = agg.first('reading')
Parameters: attribute (str) – Attribute name to find the first value of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
interval_end
()¶ Get the end of the current time-interval window.
New in version 1.2.
Returns: Output expression with the type timestamp
.Return type: Expression
-
interval_start
()¶ Get the start of the current time-interval window.
New in version 1.2.
Returns: Output expression with the type timestamp
.Return type: Expression
-
static
invoke
(window, schema, group=None, name=None)¶ Invoke an aggregation against a window.
Parameters: - window (streamsx.topology.topology.Window) – Window to aggregate against.
- schema (str,StreamSchema) – Schema of output stream containing aggregations.
- group (str) – Attribute name to group aggregations.
- name (str) – Invocation name, defaults to a generated name.
Returns: Aggregate invocation.
Return type:
-
last
(attribute)¶ Last value for an input attribute.
Returns an output expression representing the last (latest, most recent) value of the input attribute in the group.
Example:
# Get the last reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 last_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.last_reading = agg.last('reading')
Parameters: attribute (str) – Attribute name to find the first value of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
max
(attribute)¶ Maximum value for an input attribute.
Returns an output expression representing the maximum value of the input attribute in the group.
Example:
# Get the maximum reading of the last ten tuples grouped by sensor id in the last minute schema = 'tuple<int32 id, timestamp ts, float64 max_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.max_reading = agg.max('reading')
Parameters: attribute (str) – Attribute name to find the maximum value of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
min
(attribute)¶ Minimum value for an input attribute.
Returns an output expression representing the minimum value of the input attribute in the group.
Example:
# Get the minimum reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 min_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.min_reading = agg.min('reading')
Parameters: attribute (str) – Attribute name to find the minimum value of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
pane_timing
()¶ Get timing of a time-interval window pane.
The timing of a window pane triggering in relation to the enclosing operator’s watermark that is used for predicting pane completion.
- “paneEarly”: The system has not yet predicted that it has seen all data which may contribute to a pane’s window.
- “paneOnComplete”: The system predicts that it has seen all data which may contribute to a pane’s window.
- “paneLate”: The system encountered new data for a pane’s window after predicting no more could arrive.
New in version 1.3.
Returns: Output expression with the type rstring
.Return type: Expression
-
std
(attribute, sample=False)¶ Standard deviation of values for an input attribute.
Returns an output expression representing the standard deviation of values of the input attribute in the group.
Example:
# Get the standard deviation reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 sd_reading>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.sd_reading = agg.standard_('reading')
Parameters: - attribute (str) – Attribute name to find the average value of.
- sample (bool) – True to use sample standard deviation otherwise population standard deviation is used.
Returns: Output expression with the type of the input attribute.
Return type: Expression
-
sum
(attribute)¶ Sum of values for an input attribute.
Returns an output expression representing the sum of values of the input attribute in the group.
Example:
# Get the sum reading of the last ten tuples grouped by sensor id # updating every input tuple. schema = 'tuple<int32 id, timestamp ts, float64 sum_readings>' agg = Aggregate.invoke(s.last(10), schema, group='id') agg.sum_readings = agg.sum('reading')
Parameters: attribute (str) – Attribute name to find the sum of. Returns: Output expression with the type of the input attribute. Return type: Expression
-
-
class
streamsx.standard.relational.
Filter
(stream, non_matching=False, name=None)¶ Bases:
streamsx.spl.op.Invoke
Removes tuples from a stream by passing along only those tuples that satisfy a user-specified condition.
Non-matching tuples can be sent to a second optional output.
The schema transformation is implemented using the
spl.relational::Filter
SPL primitive operator from the SPL Standard toolkit.Example with one output stream:
import streamsx.standard.relational as R import streamsx.standard.utility as U topo = Topology() s = U.sequence(topo, iterations=4) matches = R.Filter.matching(s, filter='seq>=2ul')
Example with matching and non matching streams:
topo = Topology() s = U.sequence(topo, iterations=4) matches, non_matches = R.Filter.matching(s, filter='seq<2ul', non_matching=True)
-
static
matching
(stream, filter, non_matching=False, name=None)¶ Filters input tuples to one or two output streams
Parameters: - stream – Input stream
- filter (str) – Specifies that the condition that determines the tuples to be passed along by the Filter operator
- non_matching (bool) – Non-matching tuples are sent to a second optional output stream
- name (str) – Invocation name, defaults to a generated name.
Returns: matching tuples (optional second stream for non matching tuples).
Return type: Stream
-
static
-
class
streamsx.standard.relational.
Functor
(stream, schemas, filter=None, name=None)¶ Bases:
streamsx.spl.op.Invoke
Transform input tuples into output ones, and optionally filter them.
If you do not filter an input tuple, any incoming tuple results in a tuple on each output stream
The schema transformation is implemented using the
spl.relational::Functor
SPL primitive operator from the SPL Standard toolkit.Example with schema transformation and two output streams:
import streamsx.standard.relational as R import streamsx.standard.utility as U topo = Topology() s = U.sequence(topo, iterations=10) # schema is 'tuple<uint64 seq, timestamp ts>' fo = R.Functor.map(s, [StreamSchema('tuple<uint64 seq>'),StreamSchema('tuple<timestamp ts>')]) seq_stream = fo.outputs[0] # schema is 'tuple<uint64 seq>' only ts_stream = fo.outputs[1] # schema is 'tuple<timestamp ts>' only
Example with filter some tuples:
topo = Topology() s = U.sequence(topo, iterations=5) fo = R.Functor.map(s, StreamSchema('tuple<uint64 seq>'), filter='seq>=2ul') fstream = fo.outputs[0] fstream.print()
-
static
map
(stream, schema, filter=None, name=None)¶ Map input stream schema to one or more output schemas
Parameters: - stream – Input stream
- schema (str,StreamSchema) – Schema of output stream(s).
- filter (str) – Specifies the condition that determines which input tuples are to be operated on.
- name (str) – Invocation name, defaults to a generated name.
Returns: Functor invocation.
Return type:
-
static
-
class
streamsx.standard.relational.
Join
(left, right, schemas, match=None, name=None)¶ Bases:
streamsx.spl.op.Invoke
Correlate tuples from two streams that are based on user-specified match predicates and window configurations.
The correlation is implemented using the
spl.relational::Join
SPL primitive operator from the SPL Standard toolkit.-
static
lookup
(reference, reference_key, lookup, lookup_key, schema, match=None, name=None)¶ Used to correlate tuples from two streams that are based on user-specified match predicates and window configurations.
When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple is produced for each match.
The
reference_key
andlookup_key
parameters are used to specify equijoin match predicates, which result in using a hash-based join implementation.Parameters: - reference (streamsx.topology.topology.Window) – Input window
- reference_key (str) – Name of the attribute
- lookup (Stream) – Input stream
- lookup_key (str) – Name of the attribute
- schema (str,StreamSchema) – Schema of output stream.
- match (str) – Specifies an expression to be used for matching the tuples. The expression might refer to attributes from both input ports. When this parameter is omitted, the default value of true is used.
- name (str) – Invocation name, defaults to a generated name.
Returns: Join invocation.
Return type:
-
static