Data

rxsci.data.cache(field=None)

cache items

Each received items is cached, and the cached item is returned. This operator can save memory on graphs that buffer some items, with many identical values.

The source must be an Observable.

Parameters

field – [Optional] cache the provided field is set. Otherwise the whole item is cached. When field is set, this field value must be accessible via the getattr function.

Returns

An Observable returning the same items than the source observable, but with cached values.

rxsci.data.clip(lower_bound=None, higher_bound=None)

clip values between lower_bound (included) and higher_bound (included)

The source can be an Observable or a MuxObservable.

clip

Parameters
  • lower_bound – [Optional] The minimal value to emit.

  • higher_bound – [Optional] The maximal value to emit.

Returns

AN observable emiting the source items, clipped to the provided bounds.

Raises

ValueError if no bound is provided or lower_bound is bigger than – higher_bound

rxsci.data.fill_none(value)

Replaces None values with value

The source can be an Observable or a MuxObservable.

fill_none

Parameters

value – The value used to replace None values.

Returns

An observable where None items are replaced with value.

rxsci.data.lag(size=1, data_type='obj')

Buffers a lag of size on source items

The source must be a MuxObservable.

lag1

lag

Parameters
  • size – [Optional] size of the lag.

  • data_type – [Optional] the type of the lag data.

Returns

An observable where each item is a tuple of (lag, current) items. On the first iterations, the item (first, current) is emitted.

rxsci.data.roll(window, stride, pipeline)

Projects each element of an observable sequence into zero or more windows which are produced based on element window information.

The source must be a MuxObservable.

roll

Examples

>>> rs.data.roll(3),
>>> rs.data.roll(window=3, step=2),
Parameters
  • window – Length of each window.

  • stride – Number of elements to step between creation of consecutive windows.

  • pipeline – The Rx pipe to execute on each window.

Returns

An observable sequence of windows.

Raises

ValueError if window or stride is negative

rxsci.data.sort(key=<function <lambda>>)

sort items according to key

Items are sorted in ascending order.

Impementation note: This operator caches all the items of the source observable before sorting them. It can be used ONLY on BATCH source, and consumes a lot of memory.

The source must be an Observable.

Parameters

key – [Optional] function used to extract the sorting key on each item.

Returns

An observable emitting the sorted items of the source observable.

rxsci.data.split(predicate, pipeline)

Split an observable based on a predicate criteria.

The source must be a MuxObservable.

split

Parameters
  • predicate – A function called for each item, that returns the split criteria.

  • pipeline – The Rx pipe to execute on each split.

Returns

A higher order observable returning on observable for each split criteria.

rxsci.data.to_deque(extend=False)

flattens list items to a deque fifo and publish them when the source observable completes.

This buffers and emits the items of the source observable as is. Items on the deque are pop as they are emitted. This is useful when working on batch data since it allows to dereference items as they are processed.

The source must be an Observable.

Parameters

extend – [Optional] When set to true, the deque is extended for each item received. The default behavior is to append items to the deque.