Data

rxsci.data.cache(field=None)

cache items

Each received items is cached, and the cached item is returned. This operator can save memory on graphs that buffer some items, with many identical values.

The source must be an Observable.

Parameters

field – [Optional] cache the provided field is set. Otherwise the whole item is cached. When field is set, this field value must be accessible via the getattr function.

Returns

An Observable returning the same items than the source observable, but with cached values.

rxsci.data.clip(lower_bound=None, higher_bound=None)

clip values between lower_bound (included) and higher_bound (included)

The source can be an Observable or a MuxObservable.

clip

Parameters
  • lower_bound – [Optional] The minimal value to emit.

  • higher_bound – [Optional] The maximal value to emit.

Returns

AN observable emiting the source items, clipped to the provided bounds.

Raises

ValueError if no bound is provided or lower_bound is bigger than – higher_bound

rxsci.data.fill_none(value)

Replaces None values with value

The source can be an Observable or a MuxObservable.

fill_none

Parameters

value – The value used to replace None values.

Returns

An observable where None items are replaced with value.

rxsci.data.lag(size=1, data_type='obj')

Buffers a lag of size on source items

The source must be a MuxObservable.

lag1

lag

Parameters
  • size – [Optional] size of the lag.

  • data_type – [Optional] the type of the lag data.

Returns

An observable where each item is a tuple of (lag, current) items. On the first iterations, the item (first, current) is emitted.

rxsci.data.roll(window, stride, pipeline)

Projects each element of an observable sequence into zero or more windows which are produced based on element window information.

The source must be a MuxObservable.

roll

Examples

>>> rs.data.roll(3),
>>> rs.data.roll(window=3, step=2),
Parameters
  • window – Length of each window.

  • stride – Number of elements to step between creation of consecutive windows.

  • pipeline – The Rx pipe to execute on each window.

Returns

An observable sequence of windows.

Raises

ValueError if window or stride is negative

rxsci.data.sort(key=<function <lambda>>)

sort items according to key

Items are sorted in ascending order.

Impementation note: This operator caches all the items of the source observable before sorting them. It can be used ONLY on BATCH source, and consumes a lot of memory.

The source must be an Observable.

Parameters

key – [Optional] function used to extract the sorting key on each item.

Returns

An observable emitting the sorted items of the source observable.

rxsci.data.split(predicate, pipeline)

Split an observable based on a predicate criteria.

The source must be a MuxObservable.

split

Parameters
  • predicate – A function called for each item, that returns the split criteria.

  • pipeline – The Rx pipe to execute on each split.

Returns

A higher order observable returning on observable for each split criteria.

rxsci.data.time_split(time_mapper, active_timeout=None, inactive_timeout=None, closing_mapper=None, include_closing_item=True, pipeline=None)

Splits an observable based on timing criterias.

Timestamps used to create and expire windows are retrived from the time_mapper function.

The first item of a window is used as a reference to close the window after a duration of active_timeout. Each timestamp of the source items is used to expire the window after a duration of inactive_timeout without receiving any event.

Additional custom loggic can be implemented with the closing_mapper: In addition to the active and inactive timeouts, this mapper can force the closing of the current window, and create a new one.

The source must be a MuxObservable.

time_split

Parameters
  • time_mapper (Callable[[Any], datetime]) – A function that maps the source items to a datetime object.

  • active_timeout (Optional[timedelta]) – The window expiration duration from the reception date of the first item.

  • inactive_timeout (Optional[timedelta]) – The window expiration duration when no event is received.

  • closing_mapper (Optional[Callable[[Any], bool]]) – A function call for each source item and returns whether the window must be closed.

  • include_closing_item (bool) – whether to include closing items from closing_mapper in the current window (True) or the next window (False).

  • pipeline – The Rx pipe to execute on each split.

Returns

A higher order observable returning on observable for each split window.

rxsci.data.to_deque(extend=False)

flattens list items to a deque fifo and publish them when the source observable completes.

This buffers and emits the items of the source observable as is. Items on the deque are pop as they are emitted. This is useful when working on batch data since it allows to dereference items as they are processed.

The source must be an Observable.

Parameters

extend – [Optional] When set to true, the deque is extended for each item received. The default behavior is to append items to the deque.