Reentrancy problem? Using numpy.savez to save processed eeg data, and is corrupted when using multiprocessing

Not sure this is appropriate for this group, but not sure where to go.

I have a function that processes an eeg file, performing filtering, epoching, etc. It then saves the data in a .npz file, which I will used for training a ML model.

Running single-threaded, this works fine.

for filename in rml_filenames:
    process_file(filename)

But when running in multiple threads, the contents of the .npz files seem to be truncated.

pool = multiprocessing.Pool(processes=20)
pool.map(process_file, rml_filenames)
pool.close()
pool.join()

I don’t use any global variables in my script, so everything should be created on the fly, then discarded afterward.

Are there any concerns with reentrancy for the mne code, such as mne.Epochs, mne.io.read_raw, eeg.filter(), eeg.add_channels, etc.?

Thanks for any help, or pointers to more appropriate fora.

Hello,

The MNE API will work fine in threads or process. I’ve used pool.starmap very often to run a processing function on a large batch of files. Whatever is going on is due to how your process_file function is designed. If you post the code of process_file, maybe we can spot something.

Mathieu

1 Like

Thanks for the reply!

Code follows.
I added the verify_npz option after discovering this issue (though I’m not sure how robust it is).
I don’t have one of the bad .npz files at hand, but it seems most of the y values were 0, with a smattering of other sleep stages (1-4).

I didn’t notice this problem with 6 threads (although it may have occurred in a mild form), but when I increased threads to 20, it was very obvious something was up when my model predicted most everything was sleep stage 0.

process_ann_eeg() filters and epochs the annotated eeg file multiple times, returning those data in the features variable, which is a numpy array. groups is just a file identifier.

A thought I just had: could the X, y arrays go out of scope before np.savez is finished writing them?

def process_file(rml_filename, verify_npz=False, provenance="none"):
    filename, ext = splitext(rml_filename)
    eeg_filename = filename + "[001].edf"
    base_filename = basename(filename)

    annotated_eeg_filename = join(
        f"../data/annotated/", base_filename + f"_{FS}Hz_ann_raw.fif"
    )

    if exists(annotated_eeg_filename):
        annotated_eeg = mne.io.read_raw(annotated_eeg_filename, preload=True)
    else:
        annotated_eeg = label_eeg_using_rml(eeg_filename, rml_filename)

    epoched_eeg, features, group = process_ann_eeg(annotated_eeg)

    output_width, num_channels = features.shape[1], features.shape[2]
    num_epochs = len(epoched_eeg.events)
    X = np.empty((num_epochs, output_width, num_channels), dtype=np.float32)
    y = np.empty((num_epochs,), dtype=np.int16)

    # annotations = get_contiguous_annotations(epoched_eeg)

    print(f"Processing {group}: {num_epochs} events")
    for ep_ndx in range(num_epochs):
        X[ep_ndx, :, :] = features[ep_ndx, :, :]
        y[ep_ndx] = epoched_eeg.events[ep_ndx][2]  # class

    not_bad = y != 5
    X = X[not_bad, :, :]
    y = y[not_bad]

    if len(y) == 0:
        print(f"Skipped {group}, y is empty")
        return

    classes, counts = np.unique(y, return_counts=True)
    for ndx in range(len(classes)):
        print(f"{ALL_STAGES[int(classes[ndx])]:10s}: {counts[ndx]:7,d}")

    processed_filename = (
        f"../data/processed_data/{group}_{FS}Hz_{NUM_SAMPLES}x{NUM_CHANNELS}.npz"
    )
    np.savez(
        processed_filename,
        X=X,
        y=y,
        group=group,
        provenance=provenance,
    )
    print(f"Saved: {processed_filename}")

    if verify_npz:
        data = np.load(processed_filename)
        assert np.array_equal(X, data["X"]), "X did not verify!"
        assert np.array_equal(y, data["y"]), "y did not verify!"
        assert np.array_equal(group, data["group"]), "group did not verify!"
        assert provenance == data["provenance"], "provenance did not verify!"
        print("Verified")

    if not exists(annotated_eeg_filename):
        annotated_eeg.save(annotated_eeg_filename)

Sorry, nothing obvious comes to mind.

A thought I just had: could the X, y arrays go out of scope before np.savez is finished writing them?$

I doubt it. np.savez is likely blocking until the save is complete. To confirm, you can try writing a very small array and then a very very very large one. If the second write is almost instantaneous and of similar duration as the first one, then the call is non-blocking and in this case yes you might have an issue at hand.

Mathieu

1 Like