streamsx.standard.files

Reading and writing of files.

Classes

BlockFilesReader([block_size, compression, …]) Reads files given by input stream and generates tuples with the file content (block of binary data) on the output stream.
CSVFilesReader([header, encoding, …]) Reads files given by input stream and generates tuples with the file content on the output stream.
CSVReader(schema, file[, header, encoding, …]) Read a comma separated value file as a stream.
CSVWriter(file[, append, encoding, …]) Write a stream as a comma separated value file.
DirectoryScan(directory[, pattern, schema]) Watches a directory, and generates file names on the output, one for each file that is found in the directory.
FileSink(file, **options) Write a stream to a file
LineFilesReader([file_name, compression]) Reads files line by line given by input stream and generates tuples with the file content on the output stream.
class streamsx.standard.files.BlockFilesReader(block_size=None, compression=None, file_name=None)

Bases: streamsx.topology.composite.Map

Reads files given by input stream and generates tuples with the file content (block of binary data) on the output stream.

Note

Each input tuple holds the file name to be read

Example, scanning for files with “wav” file extension and reading them:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.wav$'))
r = s.map(files.BlockFilesReader(block_size=512, file_name='filename'), schema=StreamSchema('tuple<blob speech, rstring filename>'))

Example, scanning for files with “zip” file extension and reading them:

import streamsx.standard.files as files
from streamsx.standard import Compression
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.zip$'))
r = s.map(files.BlockFilesReader(block_size=1024, compression=Compression.gzip.name), schema=StreamSchema('tuple<blob data>'))
Parameters:
  • block_size (int) – Specifies the block size. If the block_size parameter is not specified, the entire file is read into a single tuple.
  • compression (str) – Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.
  • file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

class streamsx.standard.files.CSVFilesReader(header=False, encoding=None, separator=None, ignoreExtraFields=False, file_name=None, compression=None)

Bases: streamsx.topology.composite.Map

Reads files given by input stream and generates tuples with the file content on the output stream.

Note

Each input tuple holds the file name to be read

See also

Use CSVReader() for single file given as parameter

Example, scanning for files with “csv” file extension and reading them:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.csv$'))
r = s.map(files.CSVFilesReader(), schema=StreamSchema('tuple<rstring a, int32 b>'))
Parameters:
  • header – Does the file contain a header.
  • encoding – Specifies the character set encoding that is used in the output file.
  • separator (str) – Separator between records (defaults to comma ,).
  • ignoreExtraFields (bool) – When True then if the file contains more fields than schema has attributes they will be ignored. Otherwise if there are extra fields an error is raised.
  • file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
  • compression (str) –

    Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.

    New in version 1.1.

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

class streamsx.standard.files.CSVReader(schema, file, header=False, encoding=None, separator=None, ignoreExtraFields=False, hot=False, compression=None)

Bases: streamsx.topology.composite.Source

Read a comma separated value file as a stream.

The file defined by file is read and mapped to a stream with a structured schema of schema.

Note

Reads a single file only

See also

Use CSVFilesReader() for reading multiple files, each input tuple holds the file name to be read

Example for reading a file from application directory (file is part of application bundle):

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
sample_file = '/tmp/local/data.csv' # local file
topo.add_file_dependency(sample_file, 'etc') # add sample file to etc dir in bundle
fn = os.path.join('etc', 'data.csv') # file name relative to application dir
sch = 'tuple<rstring a, int32 b>'
r = topo.source(files.CSVReader(schema=sch, file=fn))

Example for reading a file from file system accessible from the running job, for example persistent volume claim (Cloud Pak for Data):

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
sample_file = '/opt/ibm/streams-ext/data.csv' # file location accessible from running Streams application
r = topo.source(files.CSVReader(schema='tuple<rstring a, int32 b>', file=sample_file))
Parameters:
  • schema (StreamSchema) – Schema of the returned stream.
  • file (str|Expression) – Name of the source file. File name in relative path is expected in application directory, for example the file is added to the application bundle.
  • header – Does the file contain a header.
  • encoding – Specifies the character set encoding that is used in the output file.
  • separator (str) – Separator between records (defaults to comma ,).
  • ignoreExtraFields (bool) – When True then if the file contains more fields than schema has attributes they will be ignored. Otherwise if there are extra fields an error is raised.
  • hot (bool) – Specifies whether the input file is hot, which means it is appended continuously.
  • compression (str) –

    Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.

    New in version 1.1.

Returns:

Stream containing records from the file.

Return type:

(Stream)

populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters:
  • topology – Topology containing the source.
  • name – Name passed into source.
  • **options – Future options passed to source.
Returns:

Single stream representing the source.

Return type:

Stream

class streamsx.standard.files.CSVWriter(file, append=None, encoding=None, separator=None, flush=None)

Bases: streamsx.topology.composite.ForEach

Write a stream as a comma separated value file.

The file defined by file is used as output file.

Example for writing lines to a file:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(range(13))
sch = 'tuple<rstring a, int32 b>'
s = s.map(lambda v: ('A'+str(v), v+7), schema=sch)
s.for_each(files.CSVWriter(file='/tmp/data.txt'))

Note

Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.

Parameters:
  • file (str|Expression) – Name of the output file. File name in relative path is relative to data directory.
  • append (bool) – Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.
  • encoding – Specifies the character set encoding that is used in the output file.
  • separator (str) – Separator between records (defaults to comma ,).
  • flush (int) – Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.
Returns:

Sink operator

Return type:

(streamsx.spl.op.Invoke)

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • name – Name passed into for_each.
  • **options – Future options passed to for_each.
Returns:

Termination for this composite transformation of stream.

Return type:

Sink

class streamsx.standard.files.DirectoryScan(directory, pattern=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

Bases: streamsx.topology.composite.Source

Watches a directory, and generates file names on the output, one for each file that is found in the directory.

Example, scanning for files in application directory:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

dir = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/etc"')
s = topo.source(files.DirectoryScan(directory=dir))

Example, scanning for files with “csv” file extension:

s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.csv$'))
directory

Specifies the name of the directory to be scanned

Type:str|Expression
pattern

Instructs the operator to ignore file names that do not match the regular expression pattern

Type:str
schema

Output schema, defaults to CommonSchema.String

Type:StreamSchema
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
ignore_dot_files

Specifies whether the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed.

Type:bool
ignore_existing_files_at_startup

Specifies whether the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted.

Type:bool
init_delay

Specifies the number of seconds that the DirectoryScan operator delays before it starts to produce tuples.

Type:float
move_to_directory

Specifies the name of the directory to which files are moved before the output tuple is generated.

Type:str
order

Controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending.

Type:enum
populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters:
  • topology – Topology containing the source.
  • name – Name passed into source.
  • **options – Future options passed to source.
Returns:

Single stream representing the source.

Return type:

Stream

sleep_time

Specifies the minimal time between scans of the directory, in seconds. If this parameter is not specified, the default is 5.0 seconds.

Type:float
sort_by

Determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sort_by parameter is not specified, the default sort order is set to date.

Type:enum
class streamsx.standard.files.FileSink(file, **options)

Bases: streamsx.topology.composite.ForEach

Write a stream to a file

Note

Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.

Example for writing a stream to a file:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(['Hello', 'World!']).as_string()
s.for_each(files.FileSink(file='/tmp/data.txt'))

Example with specifying parameters as kwargs and construct the name of the file with the attribute filename of the input stream:

config = {
    'format': Format.txt.name,
    'tuples_per_file': 50000,
    'close_mode': CloseMode.count.name,
    'write_punctuations': True,
    'suppress': 'filename'
}
fsink = files.FileSink(file=streamsx.spl.op.Expression.expression('"/tmp/"+'+'filename'), **config)
to_file.for_each(fsink)

New in version 0.5.

file

Name of the output file.

Type:str
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
append

Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.

Type:bool
bytes_per_file

Specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. This parameter must be specified when the close_mode() parameter is set to size.

Type:int
close_mode

Specifies when the file closes and a new one opens. This parameter has type enum {punct, count, size, time, dynamic, never} CloseMode(). The default value is never. For any other value except dynamic, when the specified condition is satisfied, the current output file is closed and a new file is opened for writing. The parameter value: punct specifies to close the file when a window or final punctuation is received. count is used with the tuples_per_file() parameter to close the file when the specified number of tuples have been received. size is used with the bytes_per_file() parameter to close the file when the specified number of bytes have been received. time is used with the time_per_file() parameter to close the file when the specified time has elapsed. If this parameter value is dynamic, the file parameter can reference input attributes and is evaluated at each input tuple to compute the file name. If the file name is different from the previous value, the output file closes, and a new file opens. In all cases, the file parameter can contain modifiers that are used to generate the file name to be used. The supported modifiers are: {id}: Each {id} in the file name is replaced with the file number created by the FileSink operator. It has value 0 for the first file, 1 for the second file, and so on. {localtime:strftimeString}: The contents are replaced by the current local time, formatted as if by the strftime library call. {gmtime:strftimeString}: The contents are replaced by the current time in the GMT timezone. They are formatted as if by the strftime library call. The modifiers can be repeated in the string, and are all replaced with their values. If close_mode() is dynamic, the file names are compared after the modifiers are substituted.

Type:enum
compression

Specifies the compression mode Compression()

Type:enum
encoding

Specifies the character set encoding that is used in the output file. Data that is written to the output file is converted from the UTF-8 character set to the specified character set before any compression is performed. The encoding parameter is not valid with formats bin or block.

Type:str
eol_marker

Specifies the end of line marker.

Type:str
flush

Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.

Type:int
flush_on_punctuation

Specifies to flush the output file when a window or final punctuation is received. This parameter defaults to true.

Type:bool
format

Specifies the format of the data Format()

Type:enum
has_delay_field

Specifies whether to output an extra attribute per tuple, which specifies the inter-arrival delays between the input tuples

Type:bool
move_file_to_directory

Specifies that the file is moved to the named directory after the file is closed. Any existing file with same name is removed before the file is moved to the move_file_to_directory directory.

Type:str
populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • name – Name passed into for_each.
  • **options – Future options passed to for_each.
Returns:

Termination for this composite transformation of stream.

Return type:

Sink

quote_strings

Controls the quoting of top-level rstrings. This parameter can be used only with the csv format. This parameter value is true by default.

Type:bool
separator

Separator between records (defaults to comma ,).

Type:str
suppress

Specifies input attributes to be omitted from the output file. This parameter accepts one or more input attributes as values. Those named attributes are not output to the file. For example, you can use this parameter to omit file name inputs from the output file.

Type:str
time_per_file

Specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened. If the close_mode() parameter is set to time, this parameter must be specified.

Type:float
truncate_on_reset

Specifies to truncate the file when a consistent region is reset.

Type:bool
tuples_per_file

Specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. This parameter must be specified when the close_mode() parameter is set to count.

Type:int
write_failure_action

Specifies the action to take when file write fails. This parameter has values of ignore, log, and terminate of type WriteFailureAction()

Type:enum
write_punctuations

Specifies to write punctuations to the output file. It is false by default. This parameter can be used only with txt, csv, and bin formats.

Type:bool
write_state_handler_callbacks

Specifies to write to the output file a commented out line that contains the name of the invoked StateHandler callbacks. This parameter is valid only when the file is in csv format.

Type:bool
class streamsx.standard.files.LineFilesReader(file_name=None, compression=None)

Bases: streamsx.topology.composite.Map

Reads files line by line given by input stream and generates tuples with the file content on the output stream.

Note

Each input tuple holds the file name to be read

Example, scanning for files with “json” file extension and reading them line by line:

import streamsx.standard.files as files
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.json$'))
r = s.map(files.CSVFilesReader(), schema=CommonSchema.String)
Parameters:
  • file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
  • compression (str) – Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.

New in version 1.5.

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream