streamsx.standard.files¶
Reading and writing of files.
Classes
BlockFilesReader ([block_size, compression, …]) |
Reads files given by input stream and generates tuples with the file content (block of binary data) on the output stream. |
CSVFilesReader ([header, encoding, …]) |
Reads files given by input stream and generates tuples with the file content on the output stream. |
CSVReader (schema, file[, header, encoding, …]) |
Read a comma separated value file as a stream. |
CSVWriter (file[, append, encoding, …]) |
Write a stream as a comma separated value file. |
DirectoryScan (directory[, pattern, schema]) |
Watches a directory, and generates file names on the output, one for each file that is found in the directory. |
FileSink (file, **options) |
Write a stream to a file |
LineFilesReader ([file_name, compression]) |
Reads files line by line given by input stream and generates tuples with the file content on the output stream. |
-
class
streamsx.standard.files.
BlockFilesReader
(block_size=None, compression=None, file_name=None)¶ Bases:
streamsx.topology.composite.Map
Reads files given by input stream and generates tuples with the file content (block of binary data) on the output stream.
Note
Each input tuple holds the file name to be read
Example, scanning for files with “wav” file extension and reading them:
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.wav$')) r = s.map(files.BlockFilesReader(block_size=512, file_name='filename'), schema=StreamSchema('tuple<blob speech, rstring filename>'))
Example, scanning for files with “zip” file extension and reading them:
import streamsx.standard.files as files from streamsx.standard import Compression from streamsx.topology.topology import Topology topo = Topology() s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.zip$')) r = s.map(files.BlockFilesReader(block_size=1024, compression=Compression.gzip.name), schema=StreamSchema('tuple<blob data>'))
Parameters: - block_size (int) – Specifies the block size. If the block_size parameter is not specified, the entire file is read into a single tuple.
- compression (str) – Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.
- file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- schema – Schema passed into
map
. - name – Name passed into
map
. - **options – Future options passed to
map
.
Returns: Single stream representing the transformation of stream.
Return type: Stream
-
class
streamsx.standard.files.
CSVFilesReader
(header=False, encoding=None, separator=None, ignoreExtraFields=False, file_name=None, compression=None)¶ Bases:
streamsx.topology.composite.Map
Reads files given by input stream and generates tuples with the file content on the output stream.
Note
Each input tuple holds the file name to be read
See also
Use
CSVReader()
for single file given as parameterExample, scanning for files with “csv” file extension and reading them:
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.csv$')) r = s.map(files.CSVFilesReader(), schema=StreamSchema('tuple<rstring a, int32 b>'))
Parameters: - header – Does the file contain a header.
- encoding – Specifies the character set encoding that is used in the output file.
- separator (str) – Separator between records (defaults to comma
,
). - ignoreExtraFields (bool) – When True then if the file contains more fields than schema has attributes they will be ignored. Otherwise if there are extra fields an error is raised.
- file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
- compression (str) –
Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.
New in version 1.1.
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- schema – Schema passed into
map
. - name – Name passed into
map
. - **options – Future options passed to
map
.
Returns: Single stream representing the transformation of stream.
Return type: Stream
-
class
streamsx.standard.files.
CSVReader
(schema, file, header=False, encoding=None, separator=None, ignoreExtraFields=False, hot=False, compression=None)¶ Bases:
streamsx.topology.composite.Source
Read a comma separated value file as a stream.
The file defined by file is read and mapped to a stream with a structured schema of schema.
Note
Reads a single file only
See also
Use
CSVFilesReader()
for reading multiple files, each input tuple holds the file name to be readExample for reading a file from application directory (file is part of application bundle):
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() sample_file = '/tmp/local/data.csv' # local file topo.add_file_dependency(sample_file, 'etc') # add sample file to etc dir in bundle fn = os.path.join('etc', 'data.csv') # file name relative to application dir sch = 'tuple<rstring a, int32 b>' r = topo.source(files.CSVReader(schema=sch, file=fn))
Example for reading a file from file system accessible from the running job, for example persistent volume claim (Cloud Pak for Data):
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() sample_file = '/opt/ibm/streams-ext/data.csv' # file location accessible from running Streams application r = topo.source(files.CSVReader(schema='tuple<rstring a, int32 b>', file=sample_file))
Parameters: - schema (StreamSchema) – Schema of the returned stream.
- file (str|Expression) – Name of the source file. File name in relative path is expected in application directory, for example the file is added to the application bundle.
- header – Does the file contain a header.
- encoding – Specifies the character set encoding that is used in the output file.
- separator (str) – Separator between records (defaults to comma
,
). - ignoreExtraFields (bool) – When True then if the file contains more fields than schema has attributes they will be ignored. Otherwise if there are extra fields an error is raised.
- hot (bool) – Specifies whether the input file is hot, which means it is appended continuously.
- compression (str) –
Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.
New in version 1.1.
Returns: Stream containing records from the file.
Return type: (Stream)
-
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
Parameters: - topology – Topology containing the source.
- name – Name passed into
source
. - **options – Future options passed to
source
.
Returns: Single stream representing the source.
Return type: Stream
-
class
streamsx.standard.files.
CSVWriter
(file, append=None, encoding=None, separator=None, flush=None)¶ Bases:
streamsx.topology.composite.ForEach
Write a stream as a comma separated value file.
The file defined by file is used as output file.
Example for writing lines to a file:
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() s = topo.source(range(13)) sch = 'tuple<rstring a, int32 b>' s = s.map(lambda v: ('A'+str(v), v+7), schema=sch) s.for_each(files.CSVWriter(file='/tmp/data.txt'))
Note
Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.
Parameters: - file (str|Expression) – Name of the output file. File name in relative path is relative to data directory.
- append (bool) – Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.
- encoding – Specifies the character set encoding that is used in the output file.
- separator (str) – Separator between records (defaults to comma
,
). - flush (int) – Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.
Returns: Sink operator
Return type: -
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- name – Name passed into
for_each
. - **options – Future options passed to
for_each
.
Returns: Termination for this composite transformation of stream.
Return type:
-
class
streamsx.standard.files.
DirectoryScan
(directory, pattern=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ Bases:
streamsx.topology.composite.Source
Watches a directory, and generates file names on the output, one for each file that is found in the directory.
Example, scanning for files in application directory:
import streamsx.standard.files as files from streamsx.topology.topology import Topology dir = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/etc"') s = topo.source(files.DirectoryScan(directory=dir))
Example, scanning for files with “csv” file extension:
s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.csv$'))
-
directory
¶ Specifies the name of the directory to be scanned
Type: str|Expression
-
pattern
¶ Instructs the operator to ignore file names that do not match the regular expression pattern
Type: str
-
schema
¶ Output schema, defaults to CommonSchema.String
Type: StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
Type: kwargs
-
ignore_dot_files
¶ Specifies whether the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed.
Type: bool
-
ignore_existing_files_at_startup
¶ Specifies whether the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted.
Type: bool
-
init_delay
¶ Specifies the number of seconds that the DirectoryScan operator delays before it starts to produce tuples.
Type: float
-
move_to_directory
¶ Specifies the name of the directory to which files are moved before the output tuple is generated.
Type: str
-
order
¶ Controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending.
Type: enum
-
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
Parameters: - topology – Topology containing the source.
- name – Name passed into
source
. - **options – Future options passed to
source
.
Returns: Single stream representing the source.
Return type: Stream
-
sleep_time
¶ Specifies the minimal time between scans of the directory, in seconds. If this parameter is not specified, the default is 5.0 seconds.
Type: float
-
sort_by
¶ Determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sort_by parameter is not specified, the default sort order is set to date.
Type: enum
-
-
class
streamsx.standard.files.
FileSink
(file, **options)¶ Bases:
streamsx.topology.composite.ForEach
Write a stream to a file
Note
Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.
Example for writing a stream to a file:
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() s = topo.source(['Hello', 'World!']).as_string() s.for_each(files.FileSink(file='/tmp/data.txt'))
Example with specifying parameters as kwargs and construct the name of the file with the attribute
filename
of the input stream:config = { 'format': Format.txt.name, 'tuples_per_file': 50000, 'close_mode': CloseMode.count.name, 'write_punctuations': True, 'suppress': 'filename' } fsink = files.FileSink(file=streamsx.spl.op.Expression.expression('"/tmp/"+'+'filename'), **config) to_file.for_each(fsink)
New in version 0.5.
-
file
¶ Name of the output file.
Type: str
-
options
¶ The additional optional parameters as variable keyword arguments.
Type: kwargs
-
append
¶ Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.
Type: bool
-
bytes_per_file
¶ Specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. This parameter must be specified when the
close_mode()
parameter is set to size.Type: int
-
close_mode
¶ Specifies when the file closes and a new one opens. This parameter has type enum {punct, count, size, time, dynamic, never}
CloseMode()
. The default value is never. For any other value except dynamic, when the specified condition is satisfied, the current output file is closed and a new file is opened for writing. The parameter value: punct specifies to close the file when a window or final punctuation is received. count is used with thetuples_per_file()
parameter to close the file when the specified number of tuples have been received. size is used with thebytes_per_file()
parameter to close the file when the specified number of bytes have been received. time is used with thetime_per_file()
parameter to close the file when the specified time has elapsed. If this parameter value is dynamic, the file parameter can reference input attributes and is evaluated at each input tuple to compute the file name. If the file name is different from the previous value, the output file closes, and a new file opens. In all cases, the file parameter can contain modifiers that are used to generate the file name to be used. The supported modifiers are: {id}: Each {id} in the file name is replaced with the file number created by the FileSink operator. It has value 0 for the first file, 1 for the second file, and so on. {localtime:strftimeString}: The contents are replaced by the current local time, formatted as if by the strftime library call. {gmtime:strftimeString}: The contents are replaced by the current time in the GMT timezone. They are formatted as if by the strftime library call. The modifiers can be repeated in the string, and are all replaced with their values. Ifclose_mode()
is dynamic, the file names are compared after the modifiers are substituted.Type: enum
-
compression
¶ Specifies the compression mode
Compression()
Type: enum
-
encoding
¶ Specifies the character set encoding that is used in the output file. Data that is written to the output file is converted from the UTF-8 character set to the specified character set before any compression is performed. The encoding parameter is not valid with formats bin or block.
Type: str
-
eol_marker
¶ Specifies the end of line marker.
Type: str
-
flush
¶ Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.
Type: int
-
flush_on_punctuation
¶ Specifies to flush the output file when a window or final punctuation is received. This parameter defaults to true.
Type: bool
-
has_delay_field
¶ Specifies whether to output an extra attribute per tuple, which specifies the inter-arrival delays between the input tuples
Type: bool
-
move_file_to_directory
¶ Specifies that the file is moved to the named directory after the file is closed. Any existing file with same name is removed before the file is moved to the move_file_to_directory directory.
Type: str
-
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- name – Name passed into
for_each
. - **options – Future options passed to
for_each
.
Returns: Termination for this composite transformation of stream.
Return type:
-
quote_strings
¶ Controls the quoting of top-level rstrings. This parameter can be used only with the csv format. This parameter value is true by default.
Type: bool
-
separator
¶ Separator between records (defaults to comma
,
).Type: str
-
suppress
¶ Specifies input attributes to be omitted from the output file. This parameter accepts one or more input attributes as values. Those named attributes are not output to the file. For example, you can use this parameter to omit file name inputs from the output file.
Type: str
-
time_per_file
¶ Specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened. If the
close_mode()
parameter is set to time, this parameter must be specified.Type: float
-
truncate_on_reset
¶ Specifies to truncate the file when a consistent region is reset.
Type: bool
-
tuples_per_file
¶ Specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. This parameter must be specified when the
close_mode()
parameter is set to count.Type: int
-
write_failure_action
¶ Specifies the action to take when file write fails. This parameter has values of ignore, log, and terminate of type
WriteFailureAction()
Type: enum
-
write_punctuations
¶ Specifies to write punctuations to the output file. It is false by default. This parameter can be used only with txt, csv, and bin formats.
Type: bool
-
write_state_handler_callbacks
¶ Specifies to write to the output file a commented out line that contains the name of the invoked StateHandler callbacks. This parameter is valid only when the file is in csv format.
Type: bool
-
-
class
streamsx.standard.files.
LineFilesReader
(file_name=None, compression=None)¶ Bases:
streamsx.topology.composite.Map
Reads files line by line given by input stream and generates tuples with the file content on the output stream.
Note
Each input tuple holds the file name to be read
Example, scanning for files with “json” file extension and reading them line by line:
import streamsx.standard.files as files from streamsx.topology.topology import Topology topo = Topology() s = topo.source(files.DirectoryScan(directory='/opt/ibm/streams-ext/input', pattern='.*\.json$')) r = s.map(files.CSVFilesReader(), schema=CommonSchema.String)
Parameters: - file_name (str) – Each output tuple contains the name of the file that the tuple is read from. Ensure that the name given with this parameter is part of the output schema.
- compression (str) – Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2. For example, use Compression.gzip.name for gzip.
New in version 1.5.
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- schema – Schema passed into
map
. - name – Name passed into
map
. - **options – Future options passed to
map
.
Returns: Single stream representing the transformation of stream.
Return type: Stream