Data

rxsci.data.cache(field=None)

cache items

Each received items is cached, and the cached item is returned. This operator can save memory on graphs that buffer some items, with many identical values.

The source must be an Observable.

Parameters:

field – [Optional] cache the provided field is set. Otherwise the whole item is cached. When field is set, this field value must be accessible via the getattr function.

Returns:

An Observable returning the same items than the source observable, but with cached values.

rxsci.data.clip(lower_bound=None, higher_bound=None)

clip values between lower_bound (included) and higher_bound (included)

The source can be an Observable or a MuxObservable.

clip

Parameters:
  • lower_bound – [Optional] The minimal value to emit.

  • higher_bound – [Optional] The maximal value to emit.

Returns:

AN observable emiting the source items, clipped to the provided bounds.

Raises:

ValueError if no bound is provided or lower_bound is bigger than – higher_bound

rxsci.data.decode(encoding='utf8', incremental=True)

Decode bytes to strings

Parameters:
  • encoding – [Optional] The encoding to use.

  • incremental – [Optional] When True, decode all items incrementaly, otherwise decode them as independent data.

Returns:

An Observable of strings.

rxsci.data.encode(encoding='utf8', incremental=True)

Encode strings to bytes

Parameters:
  • encoding – [Optional] The encoding to use.

  • incremental – [Optional] When True, encode all items incrementaly, otherwise encode them as independent data.

Returns:

An Observable of bytes.

rxsci.data.fill_none(value)

Replaces None values with value

The source can be an Observable or a MuxObservable.

fill_none

Parameters:

value – The value used to replace None values.

Returns:

An observable where None items are replaced with value.

rxsci.data.lag(size=1, data_type='obj')

Buffers a lag of size on source items

The source must be a MuxObservable.

lag1

lag

Parameters:
  • size – [Optional] size of the lag.

  • data_type – [Optional] the type of the lag data.

Returns:

An observable where each item is a tuple of (lag, current) items. On the first iterations, the item (first, current) is emitted.

rxsci.data.roll(window, stride, pipeline)

Projects each element of an observable sequence into zero or more windows which are produced based on element window information.

The source must be a MuxObservable.

roll

Examples

>>> rs.data.roll(3),
>>> rs.data.roll(window=3, step=2),
Parameters:
  • window – Length of each window.

  • stride – Number of elements to step between creation of consecutive windows.

  • pipeline – The Rx pipe to execute on each window.

Returns:

An observable sequence of windows.

Raises:

ValueError if window or stride is negative

rxsci.data.sort(key=<function <lambda>>, reverse=False)

sort items according to key

Items are sorted in ascending order by default. When reverse is set to True, they are sorted by descending order.

Impementation note: This operator caches all the items of the source observable before sorting them. Si, it can be used only on a batch source, and can lead to high memory usage.

The source must be an Observable.

Parameters:
  • key – [Optional] function used to extract the sorting key on each item.

  • reverse – [Optional] Set to True for descendig sorting.

Returns:

An observable emitting the sorted items of the source observable.

rxsci.data.split(predicate, pipeline)

Split an observable based on a predicate criteria.

The source must be a MuxObservable.

split

Parameters:
  • predicate – A function called for each item, that returns the split criteria.

  • pipeline – The Rx pipe to execute on each split.

Returns:

A higher order observable returning on observable for each split criteria.

rxsci.data.time_split(time_mapper, active_timeout=None, inactive_timeout=None, closing_mapper=None, include_closing_item=True, pipeline=None)

Splits an observable based on timing criterias.

Timestamps used to create and expire windows are retrived from the time_mapper function.

The first item of a window is used as a reference to close the window after a duration of active_timeout. Each timestamp of the source items is used to expire the window after a duration of inactive_timeout without receiving any event.

Additional custom loggic can be implemented with the closing_mapper: In addition to the active and inactive timeouts, this mapper can force the closing of the current window, and create a new one.

The source must be a MuxObservable.

time_split

Parameters:
  • time_mapper (Callable[[Any], datetime]) – A function that maps the source items to a datetime object.

  • active_timeout (Optional[timedelta]) – The window expiration duration from the reception date of the first item.

  • inactive_timeout (Optional[timedelta]) – The window expiration duration when no event is received.

  • closing_mapper (Optional[Callable[[Any], bool]]) – A function call for each source item and returns whether the window must be closed.

  • include_closing_item (bool) – whether to include closing items from closing_mapper in the current window (True) or the next window (False).

  • pipeline – The Rx pipe to execute on each split.

Returns:

A higher order observable returning on observable for each split window.

rxsci.data.to_array(typecode)

flattens list items to an array and publishes them when the source observable completes.

The source can be an Observable or a MuxObservable.

Parameters:

typecode – The type of data to use in the array

rxsci.data.to_deque(extend=False)

flattens list items to a deque fifo and publish them when the source observable completes.

This buffers and emits the items of the source observable as is. Items on the deque are pop as they are emitted. This is useful when working on batch data since it allows to dereference items as they are processed.

The source must be an Observable.

Parameters:

extend – [Optional] When set to true, the deque is extended for each item received. The default behavior is to append items to the deque.

rxsci.data.to_list()

flattens list items to a list and publish them when the source observable completes.

The source can be an Observable or a MuxObservable.

to_list

Returns:

An observable emitting a single list item then the source observable completes.