streamsx.standard.utility

Standard utilities for processing streams.

Functions

gate(stream, control[, max_unacked, …]) Gate tuple flow based upon downstream processing.
merge(inputs[, matching, buffer_size, name]) Merge tuples across two (or more) streams.
pair(stream0, stream1[, matching, name]) Pair tuples across two streams.
spray(stream, count[, queue, name]) Spray tuples to a number of streams.
union(inputs, schema[, name]) Union structured streams with disparate schemas.

Classes

Deduplicate(count, period, key, …) Deduplicate tuples on a stream.
Delay(delay, max_delayed) Delay tuples on a stream.
Sequence(period, iterations, delay, …) A sequence source.
Throttle(rate, precise, …) Throttle the rate of a stream.
class streamsx.standard.utility.Deduplicate(count: int = None, period: float = None, key: str = None, flush_on_punctuation: bool = None)

Bases: streamsx.topology.composite.Map

Deduplicate tuples on a stream.

If a tuple on stream is followed by a duplicate tuple within count tuples or period number of seconds then the duplicate is discarded from the returned stream.

Only one of count or period can be set.

Parameters:
  • count (int) – Number of tuples.
  • period (float) – Time period to check for duplicates.
  • key (string) – Expression used to determine whether a tuple is a duplicate. If this parameter is omitted, the whole tuple is used as the key.
  • flush_on_punctuation (bool) – Specifies whether punctuation causes the operator to forget all history of remembered tuples. If this parameter is not specified, the default value is False. If the parameter value is True, all remembered keys are erased when punctuation is received.

Example discarding duplicate tuples wth a=1 and a=2:

import streamsx.standard.utility as U
topo = Topology()
s = topo.source([1,2,1,4,5,2,6,3,7,8,9])
s = s.map(lambda v : {'a':v}, schema='tuple<int32 a>')
s = s.map(U.Deduplicate(count=10))
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

class streamsx.standard.utility.Delay(delay: float, max_delayed: int = 1000)

Bases: streamsx.topology.composite.Map

Delay tuples on a stream.

Delays tuples on stream maintaining inter-arrival times of tuples and punctuation.

Parameters:
  • delay (float) – Seconds to delay each tuple.
  • max_delayed (int) – Number of items that can be delayed before upstream processing is blocked.

Example delaying a stream readings by 1.5 seconds:

import streamsx.standard.utility as U

readings = readings.map(U.Delay(delay=1.5))
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

streamsx.standard.utility.SEQUENCE_SCHEMA = <streamsx.topology.schema.StreamSchema object>

Structured schema containing a sequence identifier and a timestamp.

'tuple<uint64 seq, timestamp ts>'

class streamsx.standard.utility.Sequence(period: float = None, iterations: int = None, delay: float = None, trigger_count: int = None)

Bases: streamsx.topology.composite.Source

A sequence source.

Creates a structured stream with schema SEQUENCE_SCHEMA with the seq attribute starting at zero and monotonically increasing and ts attribute set to the time the tuple was generated.

Parameters:
  • period (float) – Period of tuple generation in seconds, if None then tuples are generated as fast as possible.
  • iterations (int) – Number of tuples on the stream, if None then the stream is infinite.
  • delay (float) – Delay in seconds before the first tuple is submitted, if None then the tuples are submitted as soon as possible.
  • trigger_count (int) – Specifies how many tuples are submitted before the operator starts to drain the pipeline of a consistent region and establish a consistent state.

Example, create a infinite sequence stream of twenty tuples per second:

from streamsx.topology.topology import Topology
import streamsx.standard.utility as U

topo = Topology()
seq = topo.source(U.Sequence(period=0.5), name='20Hz')

Example, start operator of an operator-driven consistent region:

from streamsx.topology.topology import Topology
import streamsx.standard.utility as U
from streamsx.topology.state import ConsistentRegionConfig

topo = Topology()
s = topo.source(U.Sequence(iterations=1000, delay=0.1, trigger_count=10))
s.set_consistent(ConsistentRegionConfig.operator_driven())
populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters:
  • topology – Topology containing the source.
  • name – Name passed into source.
  • **options – Future options passed to source.
Returns:

Single stream representing the source.

Return type:

Stream

class streamsx.standard.utility.Throttle(rate: float, precise: bool = False, include_punctuations: bool = False, period: float = None)

Bases: streamsx.topology.composite.Map

Throttle the rate of a stream.

Parameters:
  • rate (float) – Throttled rate of the returned stream in tuples/second.
  • precise (bool) – Try to make the rate precise at the cost of increased overhead.
  • include_punctuations (bool) – Specifies whether punctuation is to be included in the rate computation
  • period (float) – The period to be used for maintaining the wanted rate in seconds. When making rate adjustments, the Throttle operator considers only the last period, going back from the current time. By default, the period is set to 10.0/rate.

Example throttling a stream readings to around 10,000 tuples per second:

import streamsx.standard.utility as U
readings = readings.map(U.Throttle(rate=10000.0))
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

streamsx.standard.utility.gate(stream, control, max_unacked=1, ack_count=1, name=None)

Gate tuple flow based upon downstream processing.

Tuples on stream are passed through unmodified to the returned stream but the flow is gated by tuples on the control stream.

Up to max_unacked tuples flow through the gate before stream is is blocked. Each tuple arriving on control acknowledges ack_count tuples and thus unblocks stream until the number of unacknowledged tuples reaches max_unacked again.

The output of some downstream processing is typically used as control and thus control is usually a stream obtained from a streamsx.topology.topology.PendingStream.

Example with feedback loop:

import streamsx.standard.utility as U

topo = Topology()
s = topo.source(range(100))
c = PendingStream(topo)
g = U.gate(s, c.stream, max_unacked=1)
g = g.map(lambda _ : time.time())
r = g.map(U.Delay(delay=1.0))
c.complete(r)
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream to be gated.
  • control (streamsx.topology.topology.Stream) – Controlling stream.
  • max_unacked (int) – Maximum of tuples allowed through the gate without acknowledgement.
  • ack_count (int) – Count of tuples to acknowledge with each tuple arriving on control.
  • name (str) – Name of resultant stream, defaults to a generated name.
Returns:

Gated stream.

Return type:

streamsx.topology.topology.Stream

streamsx.standard.utility.merge(inputs, matching=None, buffer_size=None, name=None)

Merge tuples across two (or more) streams.

This method is used to merge results from performing parallel tasks on the same stream, for example perform multiple model scoring on the same stream.

Holds tuples on the input streams until a matched tuple has been received by each input stream. Once matching tuples have received for all input streams the tuples are submitted to the returned stream in order of the input ports.

Tuples are matched according to the matching parameter which is an attribute name from the input tuple schema, typically representing the application key of the tuple.

If matching is None then a match occurs when a tuple is received, so that tuples are emitted when a tuple has been received by each input port.

All input streams must have the same schema and the resultant stream has the same schema.

These schemas are not supported when matching is specified.

  • CommonSchema.Python
  • CommonSchema.Json
Parameters:
  • inputs (list[streamsx.topology.topology.Stream]) – Input streams to be matched.
  • matching (str) – Attribute name for matching.
  • buffer_size (int) – Specifies the size of the internal buffer that is used to queue up tuples from an input port that do not yet have matching tuples from other ports. This parameter is not supported in a consistent region.
  • name (str) – Name of resultant stream, defaults to a generated name.
Returns:

Merged stream.

Return type:

streamsx.topology.topology.Stream

streamsx.standard.utility.pair(stream0, stream1, matching=None, buffer_size: int = None, name=None)

Pair tuples across two streams.

This method is used to merge results from performing parallel tasks on the same stream, for example perform multiple model scoring on the same stream.

Holds tuples on the two input streams until a matched tuple has been received by both input streams. Once matching tuples have received the two tuples are submitted to the returned stream with the tuple from stream0 followed by the one from stream1.

Tuples are matched according to the matching parameter which is an attribute name from the input tuple schema, typically representing the application key of the tuple.

If matching is None then a match occurs when a tuple is received, so that tuples are emitted when a tuple has been received by both input streams.

stream0 and stream1 must have the same schema and the resultant stream has the same schema.

These schemas are not supported when matching is specified.

  • CommonSchema.Python
  • CommonSchema.Json

This is equivalent to merge([stream0, stream1], matching, name).

Example of scoring in parallel:

import streamsx.standard.utility as U

# Stream of customer information with customer identifier
# as the id attribute.
customers = ...
score_schema = schema.extend(StreamSchema('tuple<float64 score>'))

# Score each tuple on customers in parallel
cust_churn = s.map(customer_churn_score, schema=score_schema)
cust_renew = s.map(customer_renew_score, schema=score_schema)

# Pair back as single stream
# cust_churn_renew stream will contain two tuples for
# each customer, the churn score followed by the renew score.
cust_churn_renew = U.pair(cust_churn, cust_renew, matching='id');
Parameters:
  • stream0 (streamsx.topology.topology.Stream) – First input stream.
  • stream1 (streamsx.topology.topology.Stream) – Second input stream.
  • matching (str) – Attribute name for matching tuples.
  • buffer_size (int) – Specifies the size of the internal buffer that is used to queue up tuples from an input port that do not yet have matching tuples from other ports. This parameter is not supported in a consistent region.
  • name (str) – Name of resultant stream, defaults to a generated name.
Returns:

Paired stream.

Return type:

streamsx.topology.topology.Stream

streamsx.standard.utility.spray(stream, count, queue=1000, name=None)

Spray tuples to a number of streams. Each tuple on stream is sent to one (and only one) of the returned streams. The stream for a specific tuple is not defined, instead each stream has a dedicated thread and the first available thread will take the tuple and submit it.

Each tuple on stream is placed on internal queue before it is submitted to an output stream. If the queue fills up then processing of the input stream is blocked until there is space in the queue.

Example, spray the source tuples to 8 streams:

from streamsx.topology.topology import Topology
import streamsx.standard.utility as U

topo = Topology()
s = topo.source(U.Sequence())
outs = []
for so in U.spray(s, count=8):
    outs.append(so.map(lambda x : (x['seq'], x['ts']), schema=U.SEQUENCE_SCHEMA))
s = outs[0].union(set(outs))
Parameters:
  • count (int) – Number of output streams the input stream will be sprayed across.
  • queue (int) – Maximum queue size.
  • name (str) – Name of the stream, if None a generated name is used.
Returns:

List of output streams.

Return type:

list(streamsx.topology.topology.Stream)

streamsx.standard.utility.union(inputs, schema, name=None)

Union structured streams with disparate schemas.

Each tuple on any of the streams in inputs results in a tuple on the returned stream.

All attributes of the output tuple are set from the input tuple, thus the schema of each input must include attributes matching (name and type) the output schema.

The order of attributes in the input schemas need not match the output schemas and the input schemas may contain additional attributes which will be discarded.

Example, the output of union() contains attribute c only:

from streamsx.topology.topology import Topology
import streamsx.standard.utility as U

topo = Topology()
...
# schema of stream a: 'tuple<int32 a, int32 c>'
# schema of stream b: 'tuple<int32 c, int32 b>'
r = U.union([a,b], schema='tuple<int32 c>')

Note

This method differs from streamsx.topology.topology.Stream.union() in that the schemas of input and output streams can differ, while union() requires matching input and output attributes.

Parameters:
Returns:

Stream that is a union of inputs.

Return type:

streamsx.topology.topology.Stream