pyemma.coordinates.data._base.transformer.StreamingTransformer

class pyemma.coordinates.data._base.transformer.StreamingTransformer(chunksize=1000)

Basis class for pipelined Transformers.

This class derives from DataSource, so follow up pipeline elements can stream the output of this class.

Parameters:chunksize (int (optional)) – the chunksize used to batch process underlying data.
__init__(chunksize=1000)

Methods

__init__([chunksize])
describe() Get a descriptive string representation of this class.
dimension()
fit_transform(X[, y]) Fit to data, then transform it.
get_output([dimensions, stride, skip, chunk]) Maps all input data of this transformer and returns it as an array or list of arrays
iterator([stride, lag, chunk, ...]) creates an iterator to stream over the (transformed) data.
n_chunks(chunksize[, stride, skip]) how many chunks an iterator of this sourcde will output, starting (eg. after calling reset())
n_frames_total([stride, skip]) Returns total number of frames.
number_of_trajectories() Returns the number of trajectories.
output_type() By default transformers return single precision floats.
register_progress_callback(call_back[, stage]) Registers the progress reporter.
trajectory_length(itraj[, stride, skip]) Returns the length of trajectory of the requested index.
trajectory_lengths([stride, skip]) Returns the length of each trajectory.
transform(X) Maps the input data through the transformer to correspondingly shaped output data array/list.
write_to_csv([filename, extension, ...]) write all data to csv with numpy.savetxt

Attributes

chunksize chunksize defines how much data is being processed at once.
data_producer
default_chunksize How much data will be processed at once, in case no chunksize has been provided.
filenames Property which returns a list of filenames the data is originally from.
in_memory are results stored in memory?
is_random_accessible Check if self._is_random_accessible is set to true and if all the random access strategies are implemented.
is_reader Property telling if this data source is a reader or not.
logger The logger for this class instance
name The name of this instance
ndim
ntraj
ra_itraj_cuboid Implementation of random access with slicing that can be up to 3-dimensional, where the first dimension corresponds to the trajectory index, the second dimension corresponds to the frames and the third dimension corresponds to the dimensions of the frames.
ra_itraj_jagged Behaves like ra_itraj_cuboid just that the trajectories are not truncated and returned as a list.
ra_itraj_linear Implementation of random access that takes arguments as the default random access (i.e., up to three dimensions with trajs, frames and dims, respectively), but which considers the frame indexing to be contiguous.
ra_linear Implementation of random access that takes a (maximal) two-dimensional slice where the first component corresponds to the frames and the second component corresponds to the dimensions.
show_progress whether to show the progress of heavy calculations on this object.
chunksize

chunksize defines how much data is being processed at once.

default_chunksize

How much data will be processed at once, in case no chunksize has been provided.

describe()

Get a descriptive string representation of this class.

filenames

Property which returns a list of filenames the data is originally from. :returns: list of str :rtype: list of filenames if data is originating from a file based reader

fit_transform(X, y=None, **fit_params)

Fit to data, then transform it. Fits transformer to X and y with optional parameters fit_params and returns a transformed version of X. :param X: Training set. :type X: numpy array of shape [n_samples, n_features] :param y: Target values. :type y: numpy array of shape [n_samples]

Returns:X_new – Transformed array.
Return type:numpy array of shape [n_samples, n_features_new]
get_output(dimensions=slice(0, None, None), stride=1, skip=0, chunk=None)

Maps all input data of this transformer and returns it as an array or list of arrays

Parameters:
  • dimensions (list-like of indexes or slice, default=all) – indices of dimensions you like to keep.
  • stride (int, default=1) – only take every n’th frame.
  • skip (int, default=0) – initially skip n frames of each file.
  • chunk (int, default=None) – How many frames to process at once. If not given obtain the chunk size from the source.
Returns:

output – the mapped data, where T is the number of time steps of the input data, or if stride > 1, floor(T_in / stride). d is the output dimension of this transformer. If the input consists of a list of trajectories, Y will also be a corresponding list of trajectories

Return type:

list of ndarray(T_i, d)

in_memory

are results stored in memory?

is_random_accessible

Check if self._is_random_accessible is set to true and if all the random access strategies are implemented. :returns: bool :rtype: Returns True if random accessible via strategies and False otherwise.

is_reader

Property telling if this data source is a reader or not. :returns: bool :rtype: True if this data source is a reader and False otherwise

iterator(stride=1, lag=0, chunk=None, return_trajindex=True, cols=None, skip=0)

creates an iterator to stream over the (transformed) data.

If your data is too large to fit into memory and you want to incrementally compute some quantities on it, you can create an iterator on a reader or transformer (eg. TICA) to avoid memory overflows.

Parameters:
  • stride (int, default=1) – Take only every stride’th frame.
  • lag (int, default=0) – how many frame to omit for each file.
  • chunk (int, default=None) – How many frames to process at once. If not given obtain the chunk size from the source.
  • return_trajindex (boolean, default=True) – a chunk of data if return_trajindex is False, otherwise a tuple of (trajindex, data).
  • cols (array like, default=None) – return only the given columns.
  • skip (int, default=0) – skip ‘n’ first frames of each trajectory.
Returns:

iter – a implementation of a DataSourceIterator to stream over the data

Return type:

instance of DataSourceIterator

Examples

>>> from pyemma.coordinates import source; import numpy as np
>>> data = [np.arange(3), np.arange(4, 7)]
>>> reader = source(data)
>>> iterator = reader.iterator(chunk=1)
>>> for array_index, chunk in iterator:
...     print(array_index, chunk)
0 [[0]]
0 [[1]]
0 [[2]]
1 [[4]]
1 [[5]]
1 [[6]]
logger

The logger for this class instance

n_chunks(chunksize, stride=1, skip=0)

how many chunks an iterator of this sourcde will output, starting (eg. after calling reset())

Parameters:
  • chunksize
  • stride
  • skip
n_frames_total(stride=1, skip=0)

Returns total number of frames.

Parameters:
  • stride (int) – return value is the number of frames in trajectories when running through them with a step size of stride.
  • skip (int, default=0) – skip the first initial n frames per trajectory.
Returns:

n_frames_total – total number of frames.

Return type:

int

name

The name of this instance

number_of_trajectories()

Returns the number of trajectories.

Parameters:stride (None (default) or np.ndarray) –
Returns:int
Return type:number of trajectories
output_type()

By default transformers return single precision floats.

ra_itraj_cuboid

Implementation of random access with slicing that can be up to 3-dimensional, where the first dimension corresponds to the trajectory index, the second dimension corresponds to the frames and the third dimension corresponds to the dimensions of the frames.

The with the frame slice selected frames will be loaded from each in the trajectory-slice selected trajectories and then sliced with the dimension slice. For example: The data consists out of three trajectories with length 10, 20, 10, respectively. The slice data[:, :15, :3] returns a 3D array of shape (3, 10, 3), where the first component corresponds to the three trajectories, the second component corresponds to 10 frames (note that the last 5 frames are being truncated as the other two trajectories only have 10 frames) and the third component corresponds to the selected first three dimensions.

Returns:Returns an object that allows access by slices in the described manner.
ra_itraj_jagged

Behaves like ra_itraj_cuboid just that the trajectories are not truncated and returned as a list.

Returns:Returns an object that allows access by slices in the described manner.
ra_itraj_linear

Implementation of random access that takes arguments as the default random access (i.e., up to three dimensions with trajs, frames and dims, respectively), but which considers the frame indexing to be contiguous. Therefore, it returns a simple 2D array.

Returns:A 2D array of the sliced data containing [frames, dims].
ra_linear

Implementation of random access that takes a (maximal) two-dimensional slice where the first component corresponds to the frames and the second component corresponds to the dimensions. Here it is assumed that the frame indexing is contiguous, i.e., the first frame of the second trajectory has the index of the last frame of the first trajectory plus one.

Returns:Returns an object that allows access by slices in the described manner.
register_progress_callback(call_back, stage=0)

Registers the progress reporter.

Parameters:
  • call_back (function) –

    This function will be called with the following arguments:

    1. stage (int)
    2. instance of pyemma.utils.progressbar.ProgressBar
    3. optional *args and named keywords (**kw), for future changes
  • stage (int, optional, default=0) – The stage you want the given call back function to be fired.
show_progress

whether to show the progress of heavy calculations on this object.

trajectory_length(itraj, stride=1, skip=None)

Returns the length of trajectory of the requested index.

Parameters:
  • itraj (int) – trajectory index
  • stride (int) – return value is the number of frames in the trajectory when running through it with a step size of stride.
  • skip (int or None) – skip n frames.
Returns:

int

Return type:

length of trajectory

trajectory_lengths(stride=1, skip=0)

Returns the length of each trajectory.

Parameters:
  • stride (int) – return value is the number of frames of the trajectories when running through them with a step size of stride.
  • skip (int) – skip parameter
Returns:

array(dtype=int)

Return type:

containing length of each trajectory

transform(X)

Maps the input data through the transformer to correspondingly shaped output data array/list.

Parameters:X (ndarray(T, n) or list of ndarray(T_i, n)) – The input data, where T is the number of time steps and n is the number of dimensions. If a list is provided, the number of time steps is allowed to vary, but the number of dimensions are required to be to be consistent.
Returns:Y – The mapped data, where T is the number of time steps of the input data and d is the output dimension of this transformer. If called with a list of trajectories, Y will also be a corresponding list of trajectories
Return type:ndarray(T, d) or list of ndarray(T_i, d)
write_to_csv(filename=None, extension='.dat', overwrite=False, stride=1, chunksize=100, **kw)

write all data to csv with numpy.savetxt

Parameters:
  • filename (str, optional) –

    filename string, which may contain placeholders {itraj} and {stride}:

    • itraj will be replaced by trajetory index
    • stride is stride argument of this method

    If filename is not given, it is being tried to obtain the filenames from the data source of this iterator.

  • extension (str, optional, default='.dat') – filename extension of created files
  • overwrite (bool, optional, default=False) – shall existing files be overwritten? If a file exists, this method will raise.
  • stride (int) – omit every n’th frame
  • chunksize (int) – how many frames to process at once
  • kw (dict) – named arguments passed into numpy.savetxt (header, seperator etc.)

Example

Assume you want to save features calculated by some FeatureReader to ASCII:

>>> import numpy as np, pyemma
>>> import os
>>> from pyemma.util.files import TemporaryDirectory
>>> from pyemma.util.contexts import settings
>>> data = [np.random.random((10,3))] * 3
>>> reader = pyemma.coordinates.source(data)
>>> filename = "distances_{itraj}.dat"
>>> with TemporaryDirectory() as td, settings(show_progress_bars=False):
...    out = os.path.join(td, filename)
...    reader.write_to_csv(out, header='', delimiter=';')
...    print(sorted(os.listdir(td)))
['distances_0.dat', 'distances_1.dat', 'distances_2.dat']