the source code:
-- coding: utf-8 --
import sys
from PyQt5.QtWidgets import QApplication,QWidget,QMessageBox
from PyQt5.QtCore import pyqtSlot,pyqtSignal
import os
import os.path as op
import numpy as np
import re
import mne
import mne_connectivity
from mne.beamformer import make_lcmv, apply_lcmv_epochs
from mne_connectivity import envelope_correlation
from mne_connectivity.viz import plot_sensors_connectivity
from mne_connectivity import spectral_connectivity_epochs
from mne.datasets import fetch_fsaverage
from gui import connectivity_analysis_gui
##import connectivity_analysis_gui
##import warnings
##warnings.filterwarnings(“ignore”, category=DeprecationWarning)
def read_data_into_matrix(eeg_path, chan_n):
“”"
使用正则表达式从指定文件中读取浮点数数据并构造矩阵。
参数:
eeg_path: 要读取的文件路径
skip_rows: 要跳过的行数,默认为65
chan_n: 矩阵的行数
sample_n: 矩阵的列数
返回:
一个形状为 (chan_n, sample_n) 的 NumPy 数组
"""
data = []
skip_rows = 65
float_pattern = re.compile(r'-?\d+\.\d+')
with open(eeg_path, 'r') as file:
for _ in range(skip_rows):
next(file)
for line in file:
floats = float_pattern.findall(line)
data.extend(map(float, floats))
total_len = len(data)
if total_len % chan_n != 0 and total_len < chan_n:
raise ValueError(f"文件中的数据不足。")
data = data[:total_len]
sample_n = int(total_len / chan_n)
data_matrix = np.array(data).reshape((chan_n, sample_n))
return data_matrix
def crop_the_data(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax):
data = read_data_into_matrix(eeg_path, chan_n)
print(data.shape)
ch_names = ['Fp1', 'Fp2', 'PO5', 'PO6', 'Fpz', 'AF3', 'AF4', 'AF7', 'AF8', 'F1', 'F2', 'F3', 'F4', 'F5',
'F6', 'F7', 'F8', 'Fz', 'FC1', 'FC2', 'FC3', 'FC4', 'FC5', 'FC6', 'FCz', 'FT7', 'FT8', 'C1',
'C2', 'C3', 'C4', 'C5', 'C6', 'Cz', 'T7', 'T8', 'CP1', 'CP2', 'CP3', 'CP4', 'CP5', 'CP6',
'CPz', 'TP7', 'TP8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'Pz', 'PO3', 'PO4',
'PO7', 'PO8', 'POz', 'O1', 'O2', 'Oz', 'HEOG', 'VEOG']
ch_types = ['eeg'] * 62 + ['eog'] * 2
info = mne.create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq)
info.set_montage('standard_1005')
info['description'] = 'My custom dataset'
scalings = {'eeg': 0.0001, 'eog': 0.0001}
raw = mne.io.RawArray(data, info)
raw.apply_function(lambda x: x * 1e-6, channel_wise=False)
print(raw.info)
print(raw.info['ch_names'])
#raw.plot(show_scrollbars=False, show_scalebars=False)
raw.plot(n_channels=chan_n, scalings=scalings, title='Data from matrix', show=True, block=True)
raw.plot_sensors(ch_type='eeg', show_names='True')
eeg_data = raw.pick_types(meg=False, eeg=True, eog=False, exclude='bads')
eeg_data.crop(crop_tmin, crop_tmax)
nchan = eeg_data.info['nchan']
print(f"EEG通道数量:{nchan}")
print(eeg_data.info)
print(eeg_data.info['ch_names'])
return eeg_data
def crop_the_data1(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax):
#第66行开始是数据行
# 示例:假设文件名是 "Data331.txt",第66行开始是数据行
data = read_data_into_matrix(eeg_path, chan_n)
print(data.shape) # 应输出 (64, 339749)
ch_names = ['Fp1', 'Fp2', 'PO5', 'PO6', 'Fpz', 'AF3', 'AF4', 'AF7', 'AF8', 'F1', 'F2', 'F3', 'F4', 'F5',
'F6', 'F7', 'F8', 'Fz', 'FC1', 'FC2', 'FC3', 'FC4', 'FC5', 'FC6', 'FCz', 'FT7', 'FT8', 'C1',
'C2', 'C3', 'C4', 'C5', 'C6', 'Cz', 'T7', 'T8', 'CP1', 'CP2', 'CP3', 'CP4', 'CP5', 'CP6',
'CPz', 'TP7', 'TP8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'Pz', 'PO3', 'PO4',
'PO7', 'PO8', 'POz', 'O1', 'O2','Oz', 'HEOG', 'VEOG']
# 确保 ch_types 与 ch_names 长度一致,前62个为'eeg',后两个为'eog'
ch_types = ['eeg'] * 62 + ['eog'] * 2
info = mne.create_info(ch_names=ch_names,ch_types=ch_types,sfreq=sfreq)
info.set_montage('standard_1005')
info['description'] = 'My custom dataset'
scalings = {'eeg':0.0001,'eog':0.0001}
raw = mne.io.RawArray(data, info)
raw.apply_function(lambda x: x * 1e-6, channel_wise=False)
print(raw.info)
print(raw.info['ch_names'])
eeg_data = raw.pick_types(meg=False, eeg=True, eog=False, exclude='bads')
eeg_data.crop(crop_tmin, crop_tmax)
# 对原始数据进行带通滤波
eeg_data.filter(l_freq=1.0, h_freq=40.0) # 带通滤波,1-40 Hz
# 获取 EEG 类型数据的通道数量
nchan = eeg_data.info['nchan']
print(f"EEG通道数量:{nchan}")
print(eeg_data.info)
print(eeg_data.info['ch_names'])
return eeg_data
def load_local_bst_resting_data(local_data_path):
“”"
加载本地的 bst_resting 数据集。
参数:
local_data_path: 数据集的本地存储路径,例如 'D:\\ERPMaster\\mne_data\\data'
返回:
数据集的路径
"""
if not op.exists(local_data_path):
raise FileNotFoundError(f"指定的本地路径不存在: {local_data_path}")
# 检查数据文件是否存在
bst_resting_path = op.join(local_data_path, 'MNE-brainstorm-data', 'bst_resting')
if not op.exists(bst_resting_path):
raise FileNotFoundError(f"数据文件未找到在指定的本地路径: {bst_resting_path}")
return local_data_path
def source_connectivity_glass(mri_path, eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high):
# 加载本地数据路径
data_path = load_local_bst_resting_data(mri_path)
# 指向正确的 subjects 目录
subjects_dir = op.join(data_path, 'MNE-brainstorm-data', 'bst_resting', 'subjects')
subject1 = 'bst_resting'
print(subjects_dir)
# 指向正确的 fsaverage 目录
default_dir=op.dirname(mri_path)
print(default_dir)
fs_dir = fetch_fsaverage(default_dir,verbose=True)
subjects_dir = op.dirname(fs_dir)
# The files live in:
subject = 'fsaverage'
trans = 'fsaverage' # MNE has a built-in fsaverage transformation
##src = op.join(fs_dir, 'bem', 'fsaverage-vol-5-src.fif')#容积源空间
##src1 = op.join(fs_dir, 'bem', 'fsaverage-ico-5-src.fif')#非容积源空间
bem = op.join(fs_dir, 'bem', 'fsaverage-5120-5120-5120-bem-sol.fif')#边界元模型
print("边界元模型的目录:",bem)
##############################################################################
# 加载EEG数据
raw = crop_the_data1(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax)
print(raw.ch_names)
# 设置电极位置
montage = mne.channels.make_standard_montage('standard_1005')
raw.set_montage(montage)
raw.set_eeg_reference(projection=True)
new_events = mne.make_fixed_length_events(raw, id=1, duration=2.0, overlap=0.5)
##############################################################################
"""
====================================================
Compute envelope correlations in volume source space
====================================================
Compute envelope correlations of orthogonalized activity
:footcite:`HippEtAl2012,KhanEtAl2018` in source space using resting state
CTF data in a volume source space.
"""
# Authors: Eric Larson <larson.eric.d@gmail.com>
# Sheraz Khan <sheraz@khansheraz.com>
# Denis Engemann <denis.engemann@gmail.com>
#
# License: BSD (3-clause)
# 数据预处理
raw1 = raw.copy()
raw1.load_data().resample(80)
raw1.apply_proj()
cov = mne.compute_raw_covariance(raw1)
raw1.filter(low, high)
events = mne.make_fixed_length_events(raw1, duration=5.)
epochs = mne.Epochs(raw1, events=events, tmin=0, tmax=5.,
baseline=None, reject=dict(eeg=200e-6), preload=True)
data_cov = mne.compute_covariance(epochs)
print("计算完相关数据相关性及分段相关性!")
del raw1
##############################################################################
# Compute the forward and inverse
# -------------------------------
# This source space is really far too coarse, but we do this for speed
# considerations here
# 计算前向模型和滤波器
pos = 15. # 1.5 cm is very broad, done here for speed!
# 加载本地数据路径
data_path = load_local_bst_resting_data(mri_path)
# 指向正确的 subjects 目录
subjects_dir1 = op.join(data_path, 'MNE-brainstorm-data', 'bst_resting', 'subjects')
print(subjects_dir1)
# 指向正确的 fsaverage 目录
default_dir=op.dirname(mri_path)
print(default_dir)
fs_dir = fetch_fsaverage(default_dir,verbose=True)
subjects_dir = op.dirname(fs_dir)
subject = 'bst_resting'
# The files live in:
subject = 'bst_resting'
trans = 'fsaverage'
bem = op.join(fs_dir, 'bem', 'fsaverage-5120-5120-5120-bem-sol.fif')#边界元模型
src = mne.setup_volume_source_space('bst_resting', pos, bem=bem,
subjects_dir=subjects_dir1, verbose=True)
fwd = mne.make_forward_solution(epochs.info, trans, src, bem)
filters = make_lcmv(epochs.info, fwd, data_cov, 0.05, cov,
pick_ori='max-power', weight_norm='nai')
print("计算完正向解和LCMV空间滤波!")
del fwd, data_cov, cov
# 计算相关性和程度
epochs.apply_hilbert()
stcs = apply_lcmv_epochs(epochs, filters, return_generator=True)
corr = envelope_correlation(stcs, verbose=True)
del stcs, epochs, filters
print("完成希尔伯特滤波,LCMV分段滤波及包络相关性连接度!")
corr = corr.combine()
degree = mne_connectivity.degree(corr, 0.15)
stc = mne.VolSourceEstimate(degree, [src[0]['vertno']], 0, 1, 'bst_resting')
brain = stc.plot(
src, clim=dict(kind='percent', lims=[75, 85, 95]), colormap='gnuplot',
subjects_dir=subjects_dir1, mode='glass_brain')
def source_volume_connecitivity(mri_path, eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high):
# 加载本地数据路径
data_path = load_local_bst_resting_data(mri_path)
# 指向正确的 subjects 目录
subjects_dir = op.join(data_path, 'MNE-brainstorm-data', 'bst_resting', 'subjects')
subject1 = 'bst_resting'
print(subjects_dir)
# 指向正确的 fsaverage 目录
default_dir=op.dirname(mri_path)
print(default_dir)
fs_dir = fetch_fsaverage(default_dir,verbose=True)
subjects_dir = op.dirname(fs_dir)
# The files live in:
subject = 'fsaverage'
trans = 'fsaverage' # MNE has a built-in fsaverage transformation
##src = op.join(fs_dir, 'bem', 'fsaverage-vol-5-src.fif')#容积源空间
##src1 = op.join(fs_dir, 'bem', 'fsaverage-ico-5-src.fif')#非容积源空间
bem = op.join(fs_dir, 'bem', 'fsaverage-5120-5120-5120-bem-sol.fif')#边界元模型
print("边界元模型的目录:",bem)
##############################################################################
# Load the data
#提供弹窗选择目录下的文件或输入文件名,以获得脑电数据
raw = crop_the_data1(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax)
print(raw.ch_names)
# 设置电极位置
montage = mne.channels.make_standard_montage('standard_1005')
raw.set_montage(montage)
raw.set_eeg_reference(projection=True)
new_events = mne.make_fixed_length_events(raw, id=1, duration=2.0, overlap=0.5)
##############################################################################
"""
====================================================
Compute envelope correlations in volume source space
====================================================
Compute envelope correlations of orthogonalized activity
:footcite:`HippEtAl2012,KhanEtAl2018` in source space using resting state
CTF data in a volume source space.
"""
# Authors: Eric Larson <larson.eric.d@gmail.com>
# Sheraz Khan <sheraz@khansheraz.com>
# Denis Engemann <denis.engemann@gmail.com>
#
# License: BSD (3-clause)
# 数据预处理
raw1 = raw.copy()
raw1.load_data().resample(80)
raw1.apply_proj()
cov = mne.compute_raw_covariance(raw1)
##############################################################################
# Now we band-pass filter our data and create epochs.
raw1.filter(low, high)
events = mne.make_fixed_length_events(raw1, duration=5.)
epochs = mne.Epochs(raw1, events=events, tmin=0, tmax=5.,
baseline=None, reject=dict(eeg=200e-6), preload=True)
data_cov = mne.compute_covariance(epochs)
del raw1
print("计算完数据相关性及分段相关性!")
##############################################################################
# Compute the forward and inverse
# -------------------------------
# This source space is really far too coarse, but we do this for speed
# considerations here
# 计算前向模型和滤波器
pos = 15. # 1.5 cm is very broad, done here for speed!
# 加载本地数据路径
data_path = load_local_bst_resting_data(mri_path)
# 指向正确的 subjects 目录
subjects_dir1 = op.join(data_path, 'MNE-brainstorm-data', 'bst_resting', 'subjects')
print(subjects_dir1)
# 指向正确的 fsaverage 目录
default_dir=op.dirname(mri_path)
print(default_dir)
fs_dir = fetch_fsaverage(default_dir,verbose=True)
subjects_dir = op.dirname(fs_dir)
subject = 'bst_resting'
# The files live in:
trans = 'fsaverage'
bem = op.join(fs_dir, 'bem', 'fsaverage-5120-5120-5120-bem-sol.fif')#边界元模型
src = mne.setup_volume_source_space('bst_resting', pos, bem=bem,
subjects_dir=subjects_dir1, verbose=True)
fwd = mne.make_forward_solution(epochs.info, trans, src, bem)
filters = make_lcmv(epochs.info, fwd, data_cov, 0.05, cov,
pick_ori='max-power', weight_norm='nai')
del fwd, data_cov, cov
print("计算完正向解和LCMV空间滤波!")
# 计算相关性和程度
##############################################################################
# Compute label time series and do envelope correlation
# -----------------------------------------------------
epochs.apply_hilbert() # faster to do in sensor space
stcs = apply_lcmv_epochs(epochs, filters, return_generator=True)
corr = envelope_correlation(stcs, verbose=True)
del stcs, epochs, filters
print("完成希尔伯特滤波,LCMV分段滤波及包络相关性连接度!")
# average over epochs
corr = corr.combine()
##############################################################################
# Compute the degree and plot it
# ------------------------------
degree = mne_connectivity.degree(corr, 0.15)
stc = mne.VolSourceEstimate(degree, [src[0]['vertno']], 0, 1, 'bst_resting')
lims = [75, 85, 95]
kwargs = dict(src=src, subject=subject1, subjects_dir=subjects_dir1,
initial_time=0, verbose=True)
stc.plot(mode='stat_map', clim=dict(kind='percent', pos_lims=lims), **kwargs)
def brain_sensor_connectivity(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high):
raw = crop_the_data(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax)
print(raw.ch_names)
montage = mne.channels.make_standard_montage('standard_1005')
raw.set_montage(montage)
raw.set_eeg_reference(projection=True)
new_events = mne.make_fixed_length_events(raw, id=1, duration=2.0, overlap=0.5)
epochs = mne.Epochs(raw, new_events)
event_id = 1
tmin, tmax = -0.2, 1.5
sfreq = raw.info['sfreq']
tmin = 0.0
raw.pick(['eeg'])
con = spectral_connectivity_epochs(
epochs, method='wpli', mode='multitaper', sfreq=sfreq, fmin=low, fmax=high,
faverage=True, tmin=tmin, mt_adaptive=False, n_jobs=1)
plot_sensors_connectivity(
epochs.info,
con.get_data(output='dense')[:, :, 0])
class QmyWidget(QWidget):
numberSignal = pyqtSignal(int)
def __init__(self, parent=None):
super().__init__(parent)
self.ui = connectivity_analysis_gui.Ui_Form()
self.ui.setupUi(self)
self.ui.eeg_path.setText("D:\\ERPMaster\\Data3422.txt")
self.ui.mri_path.setText("D:\\ERPMaster\\mne_data\\data")
@pyqtSlot()
def on_btnElectrodeConn_clicked(self):
s1 = self.ui.eeg_path.text()
eeg_path = str(s1)
print(eeg_path)
s3 = self.ui.ch_n.text()
chan_n = int(s3)
print(chan_n)
s4 = self.ui.sfreq.text()
sfreq = int(s4)
print(sfreq)
s5 = self.ui.crop_tmin.text()
crop_tmin = str(s5)
print(crop_tmin)
crop_tmin = float(crop_tmin[0:])
s6 = self.ui.crop_tmax.text()
crop_tmax = str(s6)
print(crop_tmax)
crop_tmax = float(crop_tmax[0:])
s7 = self.ui.low.text()
low = str(s7)
low = float(low[0:])
print(low)
s8 = self.ui.high.text()
high = str(s8)
high = float(high[0:])
print(high)
brain_sensor_connectivity(eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high)
@pyqtSlot()
def on_btnGlassTrans_clicked(self):
s1 = self.ui.eeg_path.text()
eeg_path = str(s1)
s2 = self.ui.mri_path.text()
mri_path = str(s2)
s3 = self.ui.ch_n.text()
chan_n = int(s3)
print(chan_n)
s4 = self.ui.sfreq.text()
sfreq = int(s4)
print(sfreq)
s5 = self.ui.crop_tmin.text()
crop_tmin = str(s5)
print(crop_tmin)
crop_tmin = float(crop_tmin[0:])
s6 = self.ui.crop_tmax.text()
crop_tmax = str(s6)
print(crop_tmax)
crop_tmax = float(crop_tmax[0:])
s7 = self.ui.low.text()
low = str(s7)
low = float(low[0:])
print(low)
s8 = self.ui.high.text()
high = str(s8)
high = float(high[0:])
print(high)
source_connectivity_glass(mri_path, eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high)
@pyqtSlot()
def on_btnMRISlice_clicked(self):
s1 = self.ui.eeg_path.text()
eeg_path = str(s1)
s2 = self.ui.mri_path.text()
mri_path = str(s2)
s3 = self.ui.ch_n.text()
chan_n = int(s3)
print(chan_n)
s4 = self.ui.sfreq.text()
sfreq = int(s4)
print(sfreq)
s5 = self.ui.crop_tmin.text()
crop_tmin = str(s5)
print(crop_tmin)
crop_tmin = float(crop_tmin[0:])
s6 = self.ui.crop_tmax.text()
crop_tmax = str(s6)
print(crop_tmax)
crop_tmax = float(crop_tmax[0:])
s7 = self.ui.low.text()
low = str(s7)
low = float(low[0:])
print(low)
s8 = self.ui.high.text()
high = str(s8)
high = float(high[0:])
print(high)
source_volume_connecitivity(mri_path, eeg_path, chan_n, sfreq, crop_tmin, crop_tmax, low, high)
if name == “main”:
app = QApplication(sys.argv)
myWindow = QmyWidget()
myWindow.show()
n = app.exec()
sys.exit(n)