Source code for mindpype.source

"""
Currently supported sources:
    - Lab Streaming Layer
    - xdf files


"""

from .core import MPBase, MPEnums
from .containers import Tensor
from scipy.io import loadmat
import numpy as np
import pylsl
import pyxdf
import os
import re
import sys
import warnings
import liesl
import threading
import json
import time

[docs] class InputXDFFile(MPBase): """ Utility class for extracting trial data from an XDF file for MindPype. Parameters ---------- sess : Session Object Session where the MPXDF data source will exist. files : list of str XDF file(s) where data should be extracted from. tasks : list or tuple of strings List or Tuple of strings corresponding to the tasks to be completed by the user. For example, the tasks 'target' and 'non-target'/'flash' can be used for P300-type setups. channels : list or tuple of int Values corresponding to the stream channels used during the session relative_start : float, default = 0 Value corresponding to the start of the trial relative to the marker onset. Ns : int, default = 1 Number of samples to be extracted per trial. For epoched data, this value determines the size of each epoch, whereas this value is used in polling for continuous data. mode : 'continuous', 'class-separated' or 'epoched', default = 'epoched' Mode indicates whether the inputted data will be epoched sequentially as individual trials, epoched by class, or to leave the data in a continuous format .. warning:: The task list used in the InputXDFFile object MUST REFLECT the task list used in the XDF file. Differences will cause the program to fail. .. note:: There are 3 types of modes for the MPXDF object: 'continuous', 'class-separated' and 'epoched'. Continuous mode will leave the data in a continuous format, and will poll the data for the next Ns samples each time the poll_data method is called. Class-separated mode will epoch the data by class, and will poll the data for the next Ns samples of the specified class each time the poll_data method is called. Epoched mode will epoch the data sequentially, and will poll the data for the next Ns samples of the next trial (Ns < length of the epoch) each time the poll_data method is called. For P300/MI paradigms, where there are specified task names (i.e. 'target' and 'non-target'/'flash', etc.), class-separated mode is recommended. For other paradigms, where there are no specified task names, and data will be polled sequentially, either continuous or epoched mode is recommended. Class-separated mode will store the data in a dictionary with the following format: .. code-block:: python self.trial_data = { "Data": {"time_series": {task_name1: np.array([Nt x Nc x Ns]), task_name2: np.array([Nt x Nc x Ns]),}, "time_stamps": np.array([Ns])}}, "Markers": {"time_series": np.array([Ns]), "time_stamps": np.array([Ns])}, } Continuous mode will store the data in a dictionary with the following format: .. code-block:: python self.trial_data = { "Data": {"time_series": np.array([Nc x Ns]), "time_stamps": np.array([Ns])}, "Markers": {"time_series": np.array([Ns]), "time_stamps": np.array([Ns])}, Epoched mode will store the data in a dictionary with the following format: .. code-block:: python self.trial_data = { "Data": {"time_series": np.array([Nt x Nc x Ns]), "time_stamps": np.array([Ns])}, "Markers": {"time_series": np.array([Ns]), "time_stamps": np.array([Ns])}, } Attributes ---------- files : list of str XDF file(s) where data should be extracted from. relative_start : float, default = 0 Value corresponding to the start of the trial relative to the marker onset. Ns : int, default = 1 Number of samples to be extracted per trial. For epoched data, this value determines the size of each epoch, whereas this value is used in polling for continuous data. tasks : list or tuple of strings List or Tuple of strings corresponding to the tasks to be completed by the user. For example, the tasks 'target' and 'non-target'/'flash' can be used for P300-type setups. channels : list or tuple of int Values corresponding to the stream channels used during the session mode : 'continuous', 'class-separated' or 'epoched', default = 'epoched' Mode indicates whether the inputted data will be epoched sequentially as individual trials, epoched by class, or to leave the data in a continuous format stream_type: str Type of stream (Data or Markers) """ def __init__(self, sess, files, channels, tasks=None, relative_start=0, Ns=1, stype='EEG', mode="epoched"): """ Create a new xdf file reader interface """ super().__init__(MPEnums.SRC, sess) if type(files) == str: files = [files] self.files = files self.relative_start = relative_start self.Ns = int(Ns) self.tasks = tasks self._inferred_tasks = tasks is None self.channels = channels self._label_counter = None self.mode = mode self.stream_type = stype # Epoched mode will store trial data in a 3D array, with the first dimension corresponding to the trial number # and the second and third dimensions corresponding to the channel and sample number, respectively # The markers will be stored in a 1D tuple, with the first dimension corresponding to the sample number. if mode == "epoched": self._data = {"Data": [], "Markers": []} for filename in files: # open file and extract data data, header = pyxdf.load_xdf(filename) # extract the marker and data streams marker_stream = None data_stream = None for stream in data: if (stream["info"]["type"][0] == "Marker" or stream["info"]["type"][0] == "Markers"): if marker_stream is None or stream["time_stamps"].shape[0] > marker_stream["time_stamps"].shape[0]: marker_stream = stream elif stream["info"]["type"][0] == self.stream_type: data_stream = stream if marker_stream is None or data_stream is None: raise ValueError(f"The XDF file {filename} does not contain the required streams") sample_indices = np.zeros(data_stream["time_stamps"].shape) # used to extract data samples, pre-allocated here # filter the marker stream for the specified tasks marker_stream = self._filter_marker_stream(marker_stream) # iterate throught the markers and extract the data for each task for i_m, marker in enumerate(marker_stream["time_series"]): # compute the correct start and end indices for the current trial marker_time = marker_stream["time_stamps"][i_m] data_window_start = marker_time + relative_start # find the index of the first sample after the marker sample_indices = np.array(data_stream["time_stamps"] >= data_window_start) first_sample_ix = np.argwhere(sample_indices)[0][0] sample_indices[first_sample_ix + self.Ns:] = False # remove the samples after the end of the trial # extract the data and append to the data dictionary sample_data = data_stream["time_series"][np.ix_(sample_indices, channels)].T # Nc X Ns self._data["Data"].append(sample_data) self._data["Markers"].append(marker) # convert the data to a numpy array and the markers to a tuple self._data["Data"] = np.stack(self._data["Data"], axis=0) # Nt x Nc x Ns self._data["Markers"] = tuple(self._data["Markers"]) # create a corresponding numerical task label for each task self._data["numerical_labels"] = np.array([self.tasks.index(task) for task in self._data["Markers"]]) # Continuous mode will leave the data in a continuous format, and will poll the data for the next Ns samples elif mode == "continuous": self._data = {"Data": {"time_series": None, "time_stamps": None}, "Markers": {"time_series": None, "time_stamps": None}} # First order the files by the first marker timestamp file_first_marker = np.zeros((len(files),)) for i_f, filename in enumerate(files): data, _ = pyxdf.load_xdf(filename) # extract the first marker timestamp from the file for stream in data: if (stream["info"]["type"][0] == "Marker" or stream["info"]["type"][0] == "Markers"): file_first_marker[i_f] = stream["time_series"][0][0] # Sort the files by the first marker value file_order = np.argsort(file_first_marker) files = [files[i] for i in file_order] data_streams = [] marker_streams = [] # Iterate through all files and extract the data for filename in files: data, _ = pyxdf.load_xdf(filename) # Iterate through all streams in every file, add current file's data to the previously loaded data for stream in data: if (stream["info"]["type"][0] == "Marker" or stream["info"]["type"][0] == "Markers"): marker_stream = stream # If the data stream already exists, concatenate the new data to the existing data elif stream["info"]["type"][0] == self.stream_type: data_stream = stream if marker_stream is None or data_stream is None: raise ValueError(f"The XDF file {filename} does not contain the required streams") # Extract the data from the data stream data_stream["time_series"] = data_stream["time_series"][:, channels].T # Filter the marker stream for the specified tasks marker_stream = self._filter_marker_stream(marker_stream) # Append the data and marker streams to the list data_streams.append(data_stream) marker_streams.append(marker_stream) # Concatenate the data and marker streams self._data["Data"]["time_series"] = np.concatenate([stream["time_series"] for stream in data_streams], axis=1) self._data["Data"]["time_stamps"] = np.concatenate([stream["time_stamps"] for stream in data_streams]) self._data["Markers"]["time_series"] = marker_streams[0]["time_series"] if len(marker_streams) > 1: for stream in marker_streams[1:]: self._data["Markers"]["time_series"].extend(stream["time_series"]) self._data["Markers"]["time_stamps"] = np.concatenate([stream["time_stamps"] for stream in marker_streams]) self._data["numerical_labels"] = np.array([self._tasks.index(task) for task in self._data["Markers"]["time_series"]]) # create a counter to keep track of the number of trials extracted when polling self._task_counter = {task: 0 for task in self.tasks} def _filter_marker_stream(self, marker_stream): """ Filter the marker stream for the specified tasks. If no task list is provided, try to infer the tasks from the marker stream (currently only supported for Mindset P300 data). Parameters ---------- marker_stream: dictionary Time series and time stamps for data Returns ------- marker_stream: dictionary Time series and time stamps for inferred tasks """ if not self._inferred_tasks and self._tasks: # filter for markers that are tasks task_marker_mask = np.array([marker[0] in self.tasks for marker in marker_stream["time_series"]]) marker_stream["time_series"] = [marker[0] for marker, mask in zip(marker_stream["time_series"], task_marker_mask) if mask] marker_stream["time_stamps"] = marker_stream["time_stamps"][task_marker_mask] else: # infer tasks from Marker stream - only works for Mindset P300 data warnings.warn("No task list provided. Infering tasks from the marker stream. This is only supported for Mindset P300 data.", RuntimeWarning, stacklevel=2) marker_stream["time_series"] = [marker[0] for marker in marker_stream["time_series"]] self._tasks = ['non-target', 'target'] # default tasks for Mindset P300 data inferred_markers = [] inferred_marker_times = [] current_target = None for i_m, marker in enumerate(marker_stream["time_series"]): if "target" in marker: # if the marker identifies a target, store the target grid current_target = json.loads(marker)["target"] elif current_target is not None and "flash" in marker: # if the marker is a flash, check if it is a target or non-target flash_positions = json.loads(marker)["flash"] if current_target in flash_positions: inferred_markers.append("target") else: inferred_markers.append("non-target") # record the time of the marker inferred_marker_times.append(marker_stream["time_stamps"][i_m]) # overwrite the original marker stream with the inferred markers marker_stream["time_series"] = inferred_markers marker_stream["time_stamps"] = np.array(inferred_marker_times) return marker_stream
[docs] def poll_data(self, label=None): """ Polls the data source for new data. Parameters ---------- label : string Marker of next trial to be polled. If None, the next trial according to timestamps will be polled. Returns ------- sample_data: dictionary """ if label is not None and label not in self.tasks: # check if the coorresponding numerical label has been provided if label in self._data["numerical_labels"]: label = self.tasks[np.argwhere(self._data["numerical_labels"]==label)[0][0]] else: raise ValueError(f"Label {label} is not in the list of tasks") # determine the index of the next trial to be polled if self._mode == "epoched": markers = self._data["Markers"] else: markers = self._data["Markers"]["time_series"] if label is None: num_prev_polled = sum(self._task_counter.values()) poll_index = num_prev_polled # default, assumes that the trials have been polled in order for task in self.tasks: # find the first trial for the specified task that has not been polled task_min = markers.index(task, self._task_counter[task]) if task_min < poll_index: poll_index = task_min label = markers[poll_index] else: poll_index = markers.index(label, self._task_counter[label]) if self._mode == "epoched": # Extract sample data from epoched trial data sample_data = self._data["Data"][poll_index, :, :] else: # Extract the nth marker timestamp, corresponding to the nth trial in the XDF file data_window_start = ( self._data["Markers"]["time_stamps"][poll_index] + self.relative_start ) # Construct the boolean array for samples that fall after the marker timestamp sample_indices = self._data["Data"]["time_stamps"] >= data_window_start first_sample_ix = np.argwhere(sample_indices)[0][0] sample_indices[first_sample_ix + self.Ns:] = False # remove the samples after the end of the trial sample_data = self._data["Data"]["time_series"][:, sample_indices] # increment the task counter self._task_counter[label] += 1 return sample_data
[docs] def load_into_tensors(self, include_timestamps=False): """ Loads entirity of InputXDFFile data object into a tensor. Returns 2-4 MindPype Tensor objects, in the following order. 1. Tensor containing the Stream data 2. Tensor containing the Marker data 3. Tensor containing the Stream timestamps (if continuous data and include_timestamps is True) 4. Tensor containing the Marker timestamps (if continuous data and include_timestamps is True) Parameters ---------- include_timestamps : bool, default = False If True, the function will return the Marker timestamps as well as the data. Only applicable for continuous data. Returns ------- data : Tensor Tensor containing the stream data labels : Tensor Tensor containing the numerical encoded markers data_ts : Tensor Tensor containing the stream timestamps labels_ts : Tensor Tensor containing the Marker timestamps """ if self._mode == "epoched": data = Tensor.create_from_data(self.session, self._data["Data"]) labels = Tensor.create_from_data(self.session, self._data["numerical_labels"]) return data, labels elif self._mode == "continuous": data = Tensor.create_from_data(self.session, self._data["Data"]["time_series"]) labels = Tensor.create_from_data(self.session, self._data["numerical_labels"]) if include_timestamps: data_ts = Tensor.create_from_data(self.session, self._data["Data"]["time_stamps"]) labels_ts = Tensor.create_from_data(self.session, self._data["Markers"]["time_stamps"]) return data, labels, data_ts, labels_ts else: return data, labels
[docs] @classmethod def create_continuous(cls, sess, files, channels, tasks=None, relative_start=0, Ns=1): """ Factory Method for creating continuous XDF File input source. Parameters --------- sess : Session Object Session where the MPXDF data source will exist. files : list of str XDF file(s) where data should be extracted from. tasks : list or tuple of strings (default = None) List or Tuple of strings corresponding to the tasks to be completed by the user. For P300-type setups, the tasks 'target' and 'non-target'/'flash' can be used. If None, the tasks will be inferred from the marker stream. This is only supported for P300 data recorded using Mindset. channels : list or tuple of int Values corresponding to the data stream channels used during the session relative_start : float, default = 0 Value corresponding to the start of the trial relative to the marker onset. Ns : int, default = 1 Number of samples to be extracted per trial. For epoched data, this value determines the size of each epoch, whereas this value is used in polling for continuous data. Returns ------- src: InputXDFFile Continous XDF file input source """ src = cls(sess, files, channels, tasks, relative_start, Ns, mode="continuous") sess.add_ext_src(src) return src
[docs] @classmethod def create_epoched(cls, sess, files, channels, tasks=None, relative_start=0, Ns=1, stype='EEG'): """ Factory Method for creating epoched XDF File input source. Parameters --------- sess : Session Object Session where the MPXDF data source will exist. files : list of str XDF file(s) where data should be extracted from. tasks : list or tuple of strings (default = None) List or Tuple of strings corresponding to the tasks to be completed by the user. For P300-type setups, the tasks 'target' and 'non-target'/'flash' can be used. If None, the tasks will be inferred from the marker stream. This is only supported for P300 data recorded using Mindset. channels : list or tuple of int Values corresponding to the data stream channels used during the session relative_start : float, default = 0 Value corresponding to the start of the trial relative to the marker onset. stype : str, default = EEG String indicating the data type Ns : int, default = 1 Number of samples to be extracted per trial. For class-separated data, this value determines the size of each epoch, whereas this value is used in polling for continuous data. Returns ------- src: InputXDFFile Epoched XDF file input source """ src = cls(sess, files, channels, tasks, relative_start, Ns, stype=stype, mode="epoched") sess.add_ext_src(src) return src
[docs] class InputLSLStream(MPBase): """ An object for maintaining an LSL inlet Attributes ---------- data_buffer : dict {'Data': np.array, 'time_stamps': np.array} A dictionary containing the data and time stamps from past samples (used when trials have overlapping data) data_inlet : pylsl.StreamInlet The LSL inlet object marker_inlet : pylsl.StreamInlet The LSL inlet object for the marker stream marker_pattern : re.Pattern The regular expression pattern for the marker stream. Use "task1$|task2$|task3$" if task1, task2, and task3 are the markers channels : tuple of ints Index value of channels to poll from the stream, if None all channels will be polled. TODO: update attributes docstring """ MAX_NULL_READS = 1000 def __init__( self, sess, pred=None, channels=None, relative_start=0, marker_coupled=True, marker_fmt=None, marker_pred=None, stream_info=None, marker_stream_info=None, active=True, interval=None, Ns=1 ): """ Create a new LSL inlet stream object Parameters ---------- sess : session object Session object where the data source will exist pred : str The predicate string, e.g. "name='BioSemi'" or "type='EEG' and starts-with(name, 'BioSemi') and count(description/desc/channels/channel)=32" channels : tuple of ints Index value of channels to poll from the stream, if None all channels will be polled relative_start : float, default = 0 Duration of tiem before marker from which samples should be extracted during polling. marker_coupled : bool true if there is an associated marker to indicate relative time where data should begin to be polled marker_fmt : Regex or list Regular expression template of the marker to be matched, if none all markers will be matched. Alternatively, a list of markers can be provided. marker_pred : str The predicate string for the marker stream stream_info : pylsl.StreamInfo The stream info object for the stream can be passed instead of the predicate to avoid the need to resolve the stream marker_stream_info : pylsl.StreamInfo The stream info object for the marker stream can be passed instead of the predicate to avoid the need to resolve the stream active : bool True if the stream should be opened immediately, false if the stream should be opened later interval : float The minimum interval between polling the stream for new data. Only used for marker uncoupled streams. If None, then the stream will be polled as fast as possible. Ns : int, default = 1 The number of samples to be extracted per poll. .. note:: The active parameter is used when the session is created before the LSL stream is started, or the stream is not available when the session is created. In that case, the stream can be updated later by calling the update_input_stream() method. """ super().__init__(MPEnums.SRC, sess) self.marker_coupled = marker_coupled self._marker_inlet = None self.marker_pattern = None self.relative_start = relative_start self._already_peeked = False self._peeked_marker = None self._marker_buffer = {"time_series": None, "time_stamps": None} # only keeps most recent value, can expand in future if needed self._time_correction = None self._interval = interval self.channels = channels self.Ns = Ns self._data_buffer = {"time_series": None, "time_stamps": None} if active: self._active = False # will be set to True when the stream is opened self.update_input_streams(pred, channels, marker_coupled, marker_fmt, marker_pred, stream_info, marker_stream_info, Ns)
[docs] def poll_data(self, label=None): """ Pull data from the inlet stream until we have Ns data points for each channel. Parameters ---------- Ns: int number of samples to collect Label : None used for file-based polling, not used here """ if not self._active: raise RuntimeWarning("InputLSLStream.poll_data() called on inactive stream. Please call update_input_streams() first to configure the stream object.") if self._marker_inlet is not None: # start by getting the timestamp for this trial's marker t_begin = None null_reads = 0 while t_begin is None: marker, t = self._marker_inlet.pull_sample(timeout=0.0) if marker is not None: null_reads = 0 # reset the null reads counter marker = marker[0] # extract the string portion of the marker if (self.marker_pattern is None) or self.marker_pattern.match(marker): t_begin = t self._marker_buffer["time_stamps"] = t_begin self._marker_buffer["time_series"] = marker else: null_reads += 1 if null_reads > self.MAX_NULL_READS: raise RuntimeError( f"The marker stream has not been updated in the last {self.MAX_NULL_READS} read attemps. Please check the stream." ) time.sleep(0.001) else: # marker-uncoupled stream, determine the start time based on the interval attribute if self._data_buffer["time_series"] is not None: if self._interval is not None: t_begin = self._data_buffer["time_stamps"][0] + self._interval # shift forward by interval elif self._data_buffer["time_stamps"].shape[0] > 1: t_begin = self._data_buffer["time_stamps"][1] # shift forward by 1 sample else: # rare situation where the buffer only contains one sample # and the interval is None. Shift forward by a very small amount. t_begin = self._data_buffer["time_stamps"][0] + 10**(-6) # shift forward by 1 microsecond else: t_begin = 0 # i.e. all data is valid t_begin += self.relative_start # pull the data in chunks until we get the total number of samples samples_polled = 0 # First, pull the data required data from the buffer if self._data_buffer["time_series"] is not None: # Create a boolean array to index the data buffer for the required data valid_indices = self._data_buffer["time_stamps"] >= t_begin # Find the number of samples in the buffer that are valid samples_polled = np.sum(valid_indices) # discard old data self._data_buffer["time_series"] = self._data_buffer["time_series"][:, valid_indices] self._data_buffer["time_stamps"] = self._data_buffer["time_stamps"][valid_indices] # If the number of samples in the buffer is greater than the number of samples required, extract the required data if samples_polled >= self.Ns: # Buffer contains a backlog of data, warn that execution may be too slow for target polling rate warnings.warn("Buffer contains a backlog of data. Execution may be too slow for target polling rate.", RuntimeWarning, stacklevel=2) if self.marker_coupled: # if this is a marker-coupled stream, use the oldest valid data in the buffer # to ensure that the data is aligned with the marker self._trial_data = self._data_buffer["time_series"][:, :self.Ns] self._trial_timestamps = self._data_buffer["time_stamps"][:self.Ns] else: # if this is a marker-uncoupled stream, use the newest valid data in the buffer # to ensure that the data is as recent as possible self._trial_data = self._data_buffer["time_series"][:, -self.Ns:] self._trial_timestamps = self._data_buffer["time_stamps"][-self.Ns:] # If the number of valid samples in the buffer is less than the number of samples required, extract all the data in the buffer else: self._trial_data[:, :samples_polled] = self._data_buffer["time_series"] self._trial_timestamps[:samples_polled] = self._data_buffer["time_stamps"] # If the buffer does not contain enough data, pull data from the inlet null_reads = 0 while samples_polled < self.Ns: data, timestamps = self._data_inlet.pull_chunk(timeout=0.0) if len(timestamps) > 0: timestamps = np.asarray(timestamps) null_reads = 0 # reset the null reads counter # apply time correction to timestamps self._time_correction = self._data_inlet.time_correction() timestamps += self._time_correction # check if the data is within the target time window if np.any(timestamps >= t_begin): # convert data to numpy arrays data = np.asarray(data).T # now in Nchannel x Nsamples format valid_timestamps = timestamps >= t_begin # discard extra channels and old data data = data[np.ix_(self.channels, valid_timestamps)] timestamps = timestamps[valid_timestamps] # append the latest chunk to the trial_data array # start by indentifying the start and end indices # of the source and destination arrays chunk_sz = data.shape[1] if samples_polled + chunk_sz > self.Ns: # more data in the chunk than required dst_end_ix = self.Ns src_end_ix = self.Ns - samples_polled else: # less data in the chunk than required dst_end_ix = samples_polled + chunk_sz src_end_ix = chunk_sz self._trial_data[:, samples_polled:dst_end_ix] = data[:, :src_end_ix] self._trial_timestamps[samples_polled:dst_end_ix] = timestamps[:src_end_ix] if dst_end_ix == self.Ns: # we have polled enough data, update the buffer # with the latest data plus any extra data # that we did not use in this trial self._data_buffer["time_series"] = np.concatenate( (self._trial_data, data[:, src_end_ix:]), axis=1 ) self._data_buffer["time_stamps"] = np.concatenate( (self._trial_timestamps, timestamps[src_end_ix:]) ) samples_polled += chunk_sz else: null_reads += 1 if null_reads > self.MAX_NULL_READS: raise RuntimeError( f"The stream has not been updated in the last {self.MAX_NULL_READS} read attemps. Please check the stream." ) time.sleep(0.001) if self.marker_coupled: # reset the maker peeked flag since we have polled new data self._already_peeked = False return self._trial_data
[docs] def peek_marker(self): """ Peek at the next marker in the marker stream Returns ------- marker : str The marker string """ if not self._active: raise RuntimeError("InputLSLStream.peek_marker() called on inactive stream. Please call update_input_streams() first to configure the stream object.") if self._already_peeked: return self._peeked_marker marker, t = self.peek_marker_inlet.pull_sample() read_attemps = 0 while (self.marker_pattern is not None and not self.marker_pattern.match(marker[0])): marker, t = self.peek_marker_inlet.pull_sample(timeout=0.0) read_attemps += 1 if read_attemps > self.MAX_NULL_READS: raise RuntimeError( f"The marker stream has not been updated in the last {self.MAX_NULL_READS} read attemps. Please check the stream." ) self._peeked_marker = marker[0] self._already_peeked = True return marker[0]
[docs] def last_marker(self): """ Get the last marker in the marker stream Returns ------- marker : str The last marker string """ if not self._active: raise RuntimeError("InputLSLStream.last_marker() called on inactive stream. Please call update_input_streams() first to configure the stream object.") return self._marker_buffer["time_series"]
[docs] def update_input_streams( self, pred=None, channels=None, marker_coupled=True, marker_fmt=None, marker_pred=None, stream_info=None, marker_stream_info=None, Ns=1 ): """ Update the input stream with new parameters Parameters ---------- pred : str The predicate string, e.g. "name='BioSemi'" or "type='EEG' and starts-with(name, 'BioSemi') and count(description/desc/channels/channel)=32" channels : tuple of ints Index value of channels to poll from the stream, if None all channels will be polled marker_coupled : bool true if there is an associated marker to indicate relative time where data should begin to be polled marker_fmt : Regex or list Regular expression template of the marker to be matched, if none all markers will be matched. Alternatively, a list of markers can be provided. marker_pred : str The predicate string for the marker stream stream_info : pylsl.StreamInfo The stream info object for the stream can be passed instead of the predicate to avoid the need to resolve the stream marker_stream_info : pylsl.StreamInfo The stream info object for the marker stream can be passed instead of the predicate to avoid the need to resolve the stream Ns : int, default = 1 The number of samples to be extracted per poll. """ if self._active: return if not stream_info: # resolve the stream on the LSL network available_streams = pylsl.resolve_bypred(pred) else: available_streams = [stream_info] if len(available_streams) == 0: raise RuntimeError("No streams found matching the predicate") else: warnings.warn("More than one stream found matching the predicate. Using the first stream found.", RuntimeWarning, stacklevel=2) self._data_buffer = {"time_series": None, "time_stamps": None} self._data_inlet = pylsl.StreamInlet( available_streams[0], processing_flags=pylsl.proc_clocksync | pylsl.proc_dejitter, recover=False, ) self._data_inlet.open_stream() if channels is not None: if max(channels) >= self._data_inlet.channel_count or min(channels) < 0: raise ValueError( "The number of channels in the stream does not match the channel indices specified in the channels parameter. Please check the channels parameter and try again." ) self.channels = channels else: self.channels = tuple([_ for _ in range(self._data_inlet.channel_count)]) if marker_coupled: if not marker_stream_info: # resolve the stream on the LSL network marker_streams = pylsl.resolve_bypred(marker_pred) else: marker_streams = [marker_stream_info] if len(marker_streams) == 0: raise RuntimeError("No marker streams found matching the predicate") else: warnings.warn("More than one marker stream found matching the predicate. Using the first stream found.", RuntimeWarning, stacklevel=2) self._marker_inlet = pylsl.StreamInlet(marker_streams[0]) self._peek_marker_inlet = pylsl.StreamInlet(marker_streams[0]) # open the inlet self._marker_inlet.open_stream() self._peek_marker_inlet.open_stream() if marker_fmt: self.marker_pattern = re.compile(marker_fmt) self.Ns = Ns # allocate array for trial data and timestamps self._trial_data = np.zeros((len(self.channels), self.Ns)) self._trial_timestamps = np.zeros((self.Ns,)) self._active = True
[docs] @classmethod def create_marker_coupled_data_stream( cls, sess, pred=None, channels=None, relative_start=0, marker_fmt=None, marker_pred="type='Markers'", stream_info=None, marker_stream_info=None, Ns=1, active=True, ): """ Create a LSLStream data object that maintains a data stream and a marker stream Parameters ----------- sess : session object Session object where the data source will exist pred : str The predicate string, e.g. "name='BioSemi'" or "type='EEG' and starts-with(name, 'BioSemi') and count(description/desc/channels/channel)=32" channels : tuple or list of ints Index value of channels to poll from the stream, if None all channels will be polled marker_fmt : str Regular expression template of the marker to be matched, if none all markers will be matched marker_pred : str Predicate string to match the marker stream, if None all streams will be matched stream_info : StreamInfo object StreamInfo object to use for the data stream, if None a default StreamInfo object will be created Ns : int, default = 1 Number of samples to be extracted per poll. """ src = cls( sess, pred, channels, relative_start, True, marker_fmt, marker_pred, stream_info, marker_stream_info, active, Ns=Ns ) sess.add_ext_src(src) return src
[docs] @classmethod def create_marker_uncoupled_data_stream(cls, sess, pred=None, channels=None, relative_start=0, active=True, interval=None, Ns=1): """ Create a LSLStream data object that maintains only a data stream with no associated marker stream Parameters ---------- sess : session object Session object where the data source will exist pred : str The predicate string, e.g. "name='BioSemi'" or "type='EEG' and starts-with(name, 'BioSemi') and count(description/desc/channels/channel)=32" channels : tuple or list of ints Index value of channels to poll from the stream, if None all channels will be polled active : bool Flag to indicate whether the stream is active or will be activated in the future interval : float The minimum interval at which the stream will be polled Ns : int, default = 1 Number of samples to be extracted per poll. """ src = cls(sess, pred, channels, relative_start, marker_coupled=False, active=active, interval=interval, Ns=Ns) sess.add_ext_src(src) return src
[docs] class OutputLSLStream(MPBase): """ An object for maintaining an LSL outlet """ def __init__(self, sess, stream_info, filesave=None, chunk_size=0, max_buffer=360): """Establish a new stream outlet. This makes the stream discoverable. Parameters ---------- stream_info : StreamInfo StreamInfo object to describe this stream. Stays constant over the lifetime of the outlet. chunk_size : int, default = 0 Optionally the desired chunk granularity (in samples) for transmission. If unspecified, each push operation yields one chunk. Inlets can override this setting. (default 0) max_buffered : default = 360 The maximum amount of data to buffer (in seconds if there is a nominal sampling rate, otherwise x100 in samples). The default is 6 minutes of data. Note that, for high-bandwidth data, you will want to use a lower value here to avoid running out of RAM. """ super().__init__(MPEnums.SRC, sess) self._sess = sess self.stream_info = stream_info # resolve the stream on the LSL network self._lsl_marker_outlet = pylsl.StreamOutlet(stream_info, chunk_size, max_buffer) self._liesl_session = None # Start LieSL recording if the user has specified a filesave warnings.filterwarnings( action="ignore", category=RuntimeWarning, module="subprocess" ) output_save_thread = threading.Thread(target=self.check_status, args=(filesave,)) output_save_thread.start()
[docs] def check_status(self, filesave): """ TODO: add description Parameters ---------- filesave: TODO - add type """ if filesave is not None: streamargs = [ { "name": self.stream_info.name(), "type": self.stream_info.type(), "channel_count": self.stream_info.channel_count(), "nominal_srate": self.stream_info.nominal_srate(), "channel_format": self.stream_info.channel_format(), "source_id": self.stream_info.source_id(), } ] self._liesl_session = liesl.Session( mainfolder=f"{os.path.dirname(os.path.realpath(__file__))}\labrecordings", streamargs=streamargs, ) with self._liesl_session(filesave): while True: time.sleep(0.1) if not threading.main_thread().is_alive(): # Suppress output from pyLiesl sys.stdout = open(os.devnull, "w") sys.stderr = open(os.devnull, "w") self._liesl_session.stop_recording() sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ return else: warnings.warn("No file save specified. Data will not be saved to disk.") return
[docs] def push_data(self, data, label=None): """ Push data to the outlet stream. Parameters ---------- data: Tensor Data to be pushed to the output stream """ try: self._lsl_marker_outlet.push_sample(data, pylsl.local_clock()) except (ValueError, TypeError) as ve: try: self._lsl_marker_outlet.push_sample(data[0], pylsl.local_clock()) except Exception as e: additional_msg = "Push data - Irreparable Error in LSL Output. No data pushed to output stream" if sys.version_info[:2] >= (3, 11): e.add_note(additional_msg) else: pretty_msg = f"{'*'*len(additional_msg)}\n{additional_msg}\n{'*'*len(additional_msg)}" print(pretty_msg) raise
@classmethod def _create_outlet_from_streaminfo(cls, sess, stream_info, filesave=None): """ Factory method to create a OutletLSLStream mindpype object from a pylsl.StreamInfo object. Parameters ----------- sess : session object Session object where the data source will exist stream_info : pylsl.StreamInfo object pylsl.StreamInfo object that describes the stream to be created Returns ------- src: OutputLSLStream Output LSL Stream """ src = cls(sess, stream_info, filesave) sess.add_ext_out(src) return src
[docs] @classmethod def create_outlet( cls, sess, name="untitled", type="", channel_count=1, nominal_srate=0.0, channel_format=1, source_id="", filesave=None, ): """ Factory Method to create an OutletLSLStream mindpype object from scratch. Parameters ---------- sess : session object Session object where the data source will exist name : str, default = 'untitled' * Name of the stream. * Describes the device (or product series) that this stream makes available. type str, default = '' * Content type of the stream. * By convention LSL uses the content types defined in the XDF file format specification where applicable. channel_count : int, default = 1 * Number of channels per sample. This stays constant for the lifetime of the stream. nominal_srate : float, default = 0.0 * The sampling rate (in Hz) as advertised by the data source. channel_format : int or str, default = 1 * Format/type of each channel (ie. 'float32'). source_id : str, default = '' * Unique identifier of the device or source of the data, if available (such as the serial number). * This is critical for system robustness since it allows recipients to recover from failure even after the serving app, device or computer crashes (just by finding a stream with the same source id on the network again). filesave : str, default = None If not None, the data will be saved to the given file. Returns ------- src: OutputLSLStream Output LSL Stream """ stream_info = pylsl.StreamInfo( name, type, channel_count, nominal_srate, channel_format, source_id="1007988689", ) src = cls(sess, stream_info, filesave) sess.add_ext_out(src) return src