Data Classification Physionet

Hi I am going to use physionet dataset,https://www.physionet.org/content/eegmmidb/1.0.0/ ,
her is my code how get the epochs and classify them with annotations as the task for right vs left is in annotations as T1 and T2. The accuracy is pretty low, I want to make sure that I preprocess the data correct.

Here is my code:

def add_edf_files_in_directory_to_dataset(directory_path):
    dataset = []
    desired_file_paths=[]
    file_paths = glob.glob(os.path.join(directory_path,'S024', '*.edf'), recursive=True)
    Desired_Run = ['03', '07',  '11']
    for file_path in file_paths:
        run_number = file_path[-6:-4]
        if run_number in Desired_Run:
            raw = mne.io.read_raw_edf(file_path, preload=True)
            raw.set_eeg_reference()
            raw.notch_filter(freqs=60,picks=raw.ch_names)
            raw.filter(l_freq=2, h_freq=60, picks=raw.ch_names)
            epochs = mne.make_fixed_length_epochs(raw, duration=4, overlap=0, preload=False)
            epochs = epochs.drop_bad()
            dataset.append(epochs)
            desired_file_paths.append(file_path)
        else:
            continue  # Skip the file if it doesn't belong to T1 or T2
    return dataset, desired_file_paths
def label_epochs(dataset):
    epochs_data = []
    labels = []
    for dataset_idx, epochs in enumerate(dataset):
            epochs.load_data()
        if len(epochs) != len(epochs.annotations.description):
            continue 
        for epoch_idx in range(len(epochs)):
            epoch_data = epochs[epoch_idx].get_data(copy=False)
            annotation_desc = epochs.annotations.description[epoch_idx]
            if annotation_desc == 'T1':
                epochs_data.append(epoch_data)
                labels.append('left')
            elif annotation_desc == 'T2':
                epochs_data.append(epoch_data)
                labels.append('right')
    return epochs_data, labels
Dataset, desired_file_paths = add_edf_files_in_directory_to_dataset("Data")
epochs_data, labels = label_epochs(Dataset)

This seems to look okay at a first glance. The question is how you actually set up your classification pipeline…

Thank you so much for your reply @richard .
Here is my classification pipeline:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
# Train the model
def train_model(model, train_loader, val_loader, optimizer, scheduler, loss_function, epochs, patience, device):
    train_losses, val_losses, train_accuracies, val_accuracies = [], [], [], []
    best_val_accuracy = 0
    early_stopping_counter = 0

    for epoch in range(epochs):
        model.train()
        total_train_loss, total_val_loss = 0, 0
        correct_train, correct_val = 0, 0
        total_train, total_val = 0, 0

        for input_seq, label in train_loader:
            # print("Input shape before model:", input_seq.shape)
            input_seq, label = input_seq.to(device), label.to(device)
            optimizer.zero_grad()
            y_pred = model(input_seq)
            loss = loss_function(y_pred, label)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            correct_train += (y_pred.argmax(1) == label).sum().item()
            total_train += label.size(0)

        train_loss = total_train_loss / len(train_loader)
        train_accuracy = 100 * correct_train / total_train
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        model.eval()
        with torch.no_grad():
            for input_seq, label in val_loader:
                input_seq, label = input_seq.to(device), label.to(device)
                # ... [continued from the previous snippet]
                # print("Input shape before model:", input_seq.shape)
                y_pred = model(input_seq)
                loss = loss_function(y_pred, label)
                total_val_loss += loss.item()
                correct_val += (y_pred.argmax(1) == label).sum().item()
                total_val += label.size(0)

        val_loss = total_val_loss / len(val_loader)
        val_accuracy = 100 * correct_val / total_val
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        print(f'Epoch {epoch+1}/{epochs} - '
              f'Train Loss: {train_loss:.4f}, '
              f'Train Accuracy: {train_accuracy:.2f}%, '
              f'Val Loss: {val_loss:.4f}, '
              f'Val Accuracy: {val_accuracy:.2f}%')

        scheduler.step(val_loss)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                print("Early stopping triggered.")
                break

    # Plotting training/validation loss and accuracy
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Loss per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Training Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Accuracy per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    return train_losses, val_losses, train_accuracies, val_accuracies

# Assume Dataset and labels are already defined and loaded from your previous steps
epochs_data, labels = label_epochs(Dataset)   # Use your existing labeled data and labels

# epochs_data=np.moveaxis(epochs_data,1,2)
# epochs_data = np.squeeze(epochs_data, axis=2)
print("Shape of epochs_data:", epochs_data.shape)
# labeled_data=np.moveaxis(labeled_data,1,2)
label_to_int = {'T1': 0, 'T2': 1}
int_labels = [label_to_int[label] for label in labels]

# Convert the data to the appropriate PyTorch format
X = torch.FloatTensor(epochs_data)
X = X.permute(0, 2, 1)
# y = torch.LongTensor(labels)
y = torch.LongTensor(int_labels)
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create TensorDataset objects for train and validation sets
train_data = TensorDataset(X_train, y_train)
val_data = TensorDataset(X_test, y_test)

# Create DataLoader objects for train and validation sets
train_loader = DataLoader(train_data, batch_size=20, shuffle=True)
val_loader = DataLoader(val_data, batch_size=10, shuffle=True)

# CNN and GRU combined model definition
class ModifiedCNNModel(nn.Module):
    def __init__(self, num_channels=64, seq_len=656, output_size=2, dropout=0.7):
        super(ModifiedCNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=num_channels, out_channels=64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=5, padding=2)
        self.bn3 = nn.BatchNorm1d(256)
        self.conv4 = nn.Conv1d(in_channels=256, out_channels=512, kernel_size=5, padding=2)  # Additional convolutional layer
        self.bn4 = nn.BatchNorm1d(512)  # Additional batch normalization layer
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.dropout = nn.Dropout(dropout)

        self.flattened_size = 512 * (seq_len // 16)  # Adjust the flattened size based on the added layers
        self.fc1 = nn.Linear(self.flattened_size, output_size)
    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))  # Apply the additional convolutional layer and batch normalization
        x = self.dropout(x)

        x = x.view(-1, self.flattened_size)
        x = self.fc1(x)
        return x
# Initialize model, optimizer, scheduler
model = ModifiedCNNModel()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=20, factor=0.5)
loss_function = nn.CrossEntropyLoss()
# Train the model with the defined function
train_losses, val_losses, train_accuracies, val_accuracies = train_model(
    model, train_loader, val_loader, optimizer, scheduler, loss_function, epochs=1000, patience=100, device=device)

I would greatly appreciate it if you could take a look at it and give me any feedback to improve it.

I cannot help you with your PyTorch model, sorry. But you may want to consider taking a look at Braindecode. There are examples using Physionet:

https://braindecode.org/stable/auto_examples/index.html#applied-examples-on-real-world-datasets

Richard