Source code for braindecode.models.sleep_stager_eldele_2021

# Authors: Divyesh Narayanan <divyesh.narayanan@gmail.com>
#
# License: BSD (3-clause)

import math
import copy
from copy import deepcopy
import warnings

import numpy as np

import torch
from torch import nn
import torch.nn.functional as F


[docs]class SleepStagerEldele2021(nn.Module): """Sleep Staging Architecture from Eldele et al 2021. Attention based Neural Net for sleep staging as described in [Eldele2021]_. The code for the paper and this model is also available at [1]_. Takes single channel EEG as input. Feature extraction module based on multi-resolution convolutional neural network (MRCNN) and adaptive feature recalibration (AFR). The second module is the temporal context encoder (TCE) that leverages a multi-head attention mechanism to capture the temporal dependencies among the extracted features. Warning - This model was designed for signals of 30 seconds at 100Hz or 125Hz (in which case the reference architecture from [1]_ which was validated on SHHS dataset [2]_ will be used) to use any other input is likely to make the model perform in unintended ways. Parameters ---------- sfreq : float EEG sampling frequency. n_tce : int Number of TCE clones. d_model : int Input dimension for the TCE. Also the input dimension of the first FC layer in the feed forward and the output of the second FC layer in the same. Increase for higher sampling rate/signal length. It should be divisible by n_attn_heads d_ff : int Output dimension of the first FC layer in the feed forward and the input dimension of the second FC layer in the same. n_attn_heads : int Number of attention heads. It should be a factor of d_model dropout : float Dropout rate in the PositionWiseFeedforward layer and the TCE layers. input_size_s : float Size of the input, in seconds. n_classes : int Number of classes. after_reduced_cnn_size : int Number of output channels produced by the convolution in the AFR module. return_feats : bool If True, return the features, i.e. the output of the feature extractor (before the final linear layer). If False, pass the features through the final linear layer. References ---------- .. [Eldele2021] E. Eldele et al., "An Attention-Based Deep Learning Approach for Sleep Stage Classification With Single-Channel EEG," in IEEE Transactions on Neural Systems and Rehabilitation Engineering, vol. 29, pp. 809-818, 2021, doi: 10.1109/TNSRE.2021.3076234. .. [1] https://github.com/emadeldeen24/AttnSleep .. [2] https://sleepdata.org/datasets/shhs """ def __init__(self, sfreq, n_tce=2, d_model=80, d_ff=120, n_attn_heads=5, dropout=0.1, input_size_s=30, n_classes=5, after_reduced_cnn_size=30, return_feats=False): super(SleepStagerEldele2021, self).__init__() input_size = np.ceil(input_size_s * sfreq).astype(int) if not ((input_size_s == 30 and sfreq == 100 and d_model == 80) or (input_size_s == 30 and sfreq == 125 and d_model == 100)): warnings.warn("This model was designed originally for input windows of 30sec at 100Hz, " "with d_model at 80 or at 125Hz, with d_model at 100, to use anything " "other than this may cause errors or cause the model to perform in " "other ways than intended", UserWarning) # the usual kernel size for the mrcnn, for sfreq 100 kernel_size = 7 if sfreq == 125: kernel_size = 6 mrcnn = _MRCNN(after_reduced_cnn_size, kernel_size) attn = _MultiHeadedAttention(n_attn_heads, d_model, after_reduced_cnn_size) ff = _PositionwiseFeedForward(d_model, d_ff, dropout) tce = _TCE(_EncoderLayer(d_model, deepcopy(attn), deepcopy(ff), after_reduced_cnn_size, dropout), n_tce) self.feature_extractor = nn.Sequential(mrcnn, tce) self.len_last_layer = self._len_last_layer(input_size) self.return_feats = return_feats if not return_feats: self.fc = nn.Linear(d_model * after_reduced_cnn_size, n_classes) def _len_last_layer(self, input_size): self.feature_extractor.eval() with torch.no_grad(): out = self.feature_extractor(torch.Tensor(1, 1, input_size)) self.feature_extractor.train() return len(out.flatten())
[docs] def forward(self, x): """ Forward pass. Parameters ---------- x: torch.Tensor Batch of EEG windows of shape (batch_size, n_channels, n_times). """ encoded_features = self.feature_extractor(x) encoded_features = encoded_features.contiguous().view(encoded_features.shape[0], -1) if self.return_feats: return encoded_features else: final_output = self.fc(encoded_features) return final_output
class _SELayer(nn.Module): def __init__(self, channel, reduction=16): super(_SELayer, self).__init__() self.avg_pool = nn.AdaptiveAvgPool1d(1) self.fc = nn.Sequential( nn.Linear(channel, channel // reduction, bias=False), nn.ReLU(inplace=True), nn.Linear(channel // reduction, channel, bias=False), nn.Sigmoid() ) def forward(self, x): b, c, _ = x.size() y = self.avg_pool(x).view(b, c) y = self.fc(y).view(b, c, 1) return x * y.expand_as(x) class _SEBasicBlock(nn.Module): expansion = 1 def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None, *, reduction=16): super(_SEBasicBlock, self).__init__() self.conv1 = nn.Conv1d(inplanes, planes, stride) self.bn1 = nn.BatchNorm1d(planes) self.relu = nn.ReLU(inplace=True) self.conv2 = nn.Conv1d(planes, planes, 1) self.bn2 = nn.BatchNorm1d(planes) self.se = _SELayer(planes, reduction) self.downsample = downsample self.stride = stride self.features = nn.Sequential(self.conv1, self.bn1, self.relu, self.conv2, self.bn2, self.se) def forward(self, x): residual = x out = self.features(x) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return out class _MRCNN(nn.Module): def __init__(self, after_reduced_cnn_size, kernel_size=7): super(_MRCNN, self).__init__() drate = 0.5 self.GELU = nn.GELU() self.features1 = nn.Sequential( nn.Conv1d(1, 64, kernel_size=50, stride=6, bias=False, padding=24), nn.BatchNorm1d(64), self.GELU, nn.MaxPool1d(kernel_size=8, stride=2, padding=4), nn.Dropout(drate), nn.Conv1d(64, 128, kernel_size=8, stride=1, bias=False, padding=4), nn.BatchNorm1d(128), self.GELU, nn.Conv1d(128, 128, kernel_size=8, stride=1, bias=False, padding=4), nn.BatchNorm1d(128), self.GELU, nn.MaxPool1d(kernel_size=4, stride=4, padding=2) ) self.features2 = nn.Sequential( nn.Conv1d(1, 64, kernel_size=400, stride=50, bias=False, padding=200), nn.BatchNorm1d(64), self.GELU, nn.MaxPool1d(kernel_size=4, stride=2, padding=2), nn.Dropout(drate), nn.Conv1d(64, 128, kernel_size=kernel_size, stride=1, bias=False, padding=3), nn.BatchNorm1d(128), self.GELU, nn.Conv1d(128, 128, kernel_size=kernel_size, stride=1, bias=False, padding=3), nn.BatchNorm1d(128), self.GELU, nn.MaxPool1d(kernel_size=2, stride=2, padding=1) ) self.dropout = nn.Dropout(drate) self.inplanes = 128 self.AFR = self._make_layer(_SEBasicBlock, after_reduced_cnn_size, 1) def _make_layer(self, block, planes, blocks, stride=1): # makes residual SE block downsample = None if stride != 1 or self.inplanes != planes * block.expansion: downsample = nn.Sequential( nn.Conv1d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), nn.BatchNorm1d(planes * block.expansion), ) layers = [] layers.append(block(self.inplanes, planes, stride, downsample)) self.inplanes = planes * block.expansion for i in range(1, blocks): layers.append(block(self.inplanes, planes)) return nn.Sequential(*layers) def forward(self, x): x1 = self.features1(x) x2 = self.features2(x) x_concat = torch.cat((x1, x2), dim=2) x_concat = self.dropout(x_concat) x_concat = self.AFR(x_concat) return x_concat ########################################################################################## def _attention(query, key, value, dropout=None): """Implementation of Scaled dot product attention""" # d_k - dimension of the query and key vectors d_k = query.size(-1) scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) p_attn = F.softmax(scores, dim=-1) if dropout is not None: p_attn = dropout(p_attn) return torch.matmul(p_attn, value), p_attn class _CausalConv1d(torch.nn.Conv1d): def __init__(self, in_channels, out_channels, kernel_size, stride=1, dilation=1, groups=1, bias=True): self.__padding = (kernel_size - 1) * dilation super(_CausalConv1d, self).__init__( in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=self.__padding, dilation=dilation, groups=groups, bias=bias) def forward(self, input): result = super(_CausalConv1d, self).forward(input) if self.__padding != 0: return result[:, :, :-self.__padding] return result class _MultiHeadedAttention(nn.Module): def __init__(self, h, d_model, after_reduced_cnn_size, dropout=0.1): """Take in model size and number of heads.""" super(_MultiHeadedAttention, self).__init__() assert d_model % h == 0 self.d_per_head = d_model // h self.h = h self.convs = _clones(_CausalConv1d(after_reduced_cnn_size, after_reduced_cnn_size, kernel_size=7, stride=1), 3) self.linear = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(p=dropout) def forward(self, query, key, value): """Implements Multi-head attention""" nbatches = query.size(0) query = query.view(nbatches, -1, self.h, self.d_per_head).transpose(1, 2) key = self.convs[1](key).view(nbatches, -1, self.h, self.d_per_head).transpose(1, 2) value = self.convs[2](value).view(nbatches, -1, self.h, self.d_per_head).transpose(1, 2) x, self.attn = _attention(query, key, value, dropout=self.dropout) x = x.transpose(1, 2).contiguous() \ .view(nbatches, -1, self.h * self.d_per_head) return self.linear(x) class _SublayerOutput(nn.Module): """ A residual connection followed by a layer norm. """ def __init__(self, size, dropout): super(_SublayerOutput, self).__init__() self.norm = nn.LayerNorm(size, eps=1e-6) self.dropout = nn.Dropout(dropout) def forward(self, x, sublayer): """Apply residual connection to any sublayer with the same size.""" return x + self.dropout(sublayer(self.norm(x))) def _clones(module, n): """Produce n identical layers.""" return nn.ModuleList([copy.deepcopy(module) for _ in range(n)]) class _TCE(nn.Module): """ Transformer Encoder It is a stack of n layers. """ def __init__(self, layer, n): super(_TCE, self).__init__() self.layers = _clones(layer, n) self.norm = nn.LayerNorm(layer.size, eps=1e-6) def forward(self, x): for layer in self.layers: x = layer(x) return self.norm(x) class _EncoderLayer(nn.Module): """ An encoder layer Made up of self-attention and a feed forward layer. Each of these sublayers have residual and layer norm, implemented by _SublayerOutput. """ def __init__(self, size, self_attn, feed_forward, after_reduced_cnn_size, dropout): super(_EncoderLayer, self).__init__() self.self_attn = self_attn self.feed_forward = feed_forward self.sublayer_output = _clones(_SublayerOutput(size, dropout), 2) self.size = size self.conv = _CausalConv1d(after_reduced_cnn_size, after_reduced_cnn_size, kernel_size=7, stride=1, dilation=1) def forward(self, x_in): """Transformer Encoder""" query = self.conv(x_in) # Encoder self-attention x = self.sublayer_output[0](query, lambda x: self.self_attn(query, x_in, x_in)) return self.sublayer_output[1](x, self.feed_forward) class _PositionwiseFeedForward(nn.Module): """Positionwise feed-forward network.""" def __init__(self, d_model, d_ff, dropout=0.1): super(_PositionwiseFeedForward, self).__init__() self.w_1 = nn.Linear(d_model, d_ff) self.w_2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): """Implements FFN equation.""" return self.w_2(self.dropout(F.relu(self.w_1(x))))