Skip to content

Curiousily

Time Series Anomaly Detection using LSTM Autoencoders with PyTorch in Python

Deep Learning, PyTorch, Machine Learning, Neural Network, Autoencoder, Time Series, Python5 min read

Share

TL;DR Use real-world Electrocardiogram (ECG) data to detect anomalies in a patient heartbeat. We’ll build an LSTM Autoencoder, train it on a set of normal heartbeats and classify unseen examples as normal or anomalies

In this tutorial, you’ll learn how to detect anomalies in Time Series data using an LSTM Autoencoder. You’re going to use real-world ECG data from a single patient with heart disease to detect abnormal hearbeats.

By the end of this tutorial, you’ll learn how to:

  • Prepare a dataset for Anomaly Detection from Time Series Data
  • Build an LSTM Autoencoder with PyTorch
  • Train and evaluate your model
  • Choose a threshold for anomaly detection
  • Classify unseen examples as normal or anomaly

Data

The dataset contains 5,000 Time Series examples (obtained with ECG) with 140 timesteps. Each sequence corresponds to a single heartbeat from a single patient with congestive heart failure.

An electrocardiogram (ECG or EKG) is a test that checks how your heart is functioning by measuring the electrical activity of the heart. With each heart beat, an electrical impulse (or wave) travels through your heart. This wave causes the muscle to squeeze and pump blood from the heart. Source

We have 5 types of hearbeats (classes):

  • Normal (N)
  • R-on-T Premature Ventricular Contraction (R-on-T PVC)
  • Premature Ventricular Contraction (PVC)
  • Supra-ventricular Premature or Ectopic Beat (SP or EB)
  • Unclassified Beat (UB).

Assuming a healthy heart and a typical rate of 70 to 75 beats per minute, each cardiac cycle, or heartbeat, takes about 0.8 seconds to complete the cycle. Frequency: 60–100 per minute (Humans) Duration: 0.6–1 second (Humans) Source

The dataset is available on my Google Drive. Let’s get it:

1!gdown --id 16MIleqoIr1vYxlGk4GKnGmrsCPuWkkpT
1!unzip -qq ECG5000.zip
1device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

The data comes in multiple formats. We’ll load the arff files into Pandas data frames:

1with open('ECG5000_TRAIN.arff') as f:
2 train = a2p.load(f)
3
4with open('ECG5000_TEST.arff') as f:
5 test = a2p.load(f)

We’ll combine the training and test data into a single data frame. This will give us more data to train our Autoencoder. We’ll also shuffle it:

1df = train.append(test)
2df = df.sample(frac=1.0)
3df.shape
1(5000, 141)

We have 5,000 examples. Each row represents a single heartbeat record. Let’s name the possible classes:

1CLASS_NORMAL = 1
2
3class_names = ['Normal','R on T','PVC','SP','UB']

Next, we’ll rename the last column to target, so its easier to reference it:

1new_columns = list(df.columns)
2new_columns[-1] = 'target'
3df.columns = new_columns

Exploratory Data Analysis

Let’s check how many examples for each heartbeat class do we have:

1df.target.value_counts()
11 2919
22 1767
34 194
43 96
55 24
6Name: target, dtype: int64

Let’s plot the results:

png

The normal class, has by far, the most examples. This is great because we’ll use it to train our model.

Let’s have a look at an averaged (smoothed out with one standard deviation on top and bottom of it) Time Series for each class:

png

It is very good that the normal class has a distinctly different pattern than all other classes. Maybe our model will be able to detect anomalies?

LSTM Autoencoder

The Autoencoder’s job is to get some input data, pass it through the model, and obtain a reconstruction of the input. The reconstruction should match the input as much as possible. The trick is to use a small number of parameters, so your model learns a compressed representation of the data.

In a sense, Autoencoders try to learn only the most important features (compressed version) of the data. Here, we’ll have a look at how to feed Time Series data to an Autoencoder. We’ll use a couple of LSTM layers (hence the LSTM Autoencoder) to capture the temporal dependencies of the data.

To classify a sequence as normal or an anomaly, we’ll pick a threshold above which a heartbeat is considered abnormal.

Reconstruction Loss

When training an Autoencoder, the objective is to reconstruct the input as best as possible. This is done by minimizing a loss function (just like in supervised learning). This function is known as reconstruction loss. Cross-entropy loss and Mean squared error are common examples.

Anomaly Detection in ECG Data

We’ll use normal heartbeats as training data for our model and record the reconstruction loss. But first, we need to prepare the data:

Data Preprocessing

Let’s get all normal heartbeats and drop the target (class) column:

1normal_df = df[df.target == str(CLASS_NORMAL)].drop(labels='target', axis=1)
2normal_df.shape
1(2919, 140)

We’ll merge all other classes and mark them as anomalies:

1anomaly_df = df[df.target != str(CLASS_NORMAL)].drop(labels='target', axis=1)
2anomaly_df.shape
1(2081, 140)

We’ll split the normal examples into train, validation and test sets:

1train_df, val_df = train_test_split(
2 normal_df,
3 test_size=0.15,
4 random_state=RANDOM_SEED
5)
6
7val_df, test_df = train_test_split(
8 val_df,
9 test_size=0.33,
10 random_state=RANDOM_SEED
11)

We need to convert our examples into tensors, so we can use them to train our Autoencoder. Let’s write a helper function for that:

1def create_dataset(df):
2
3 sequences = df.astype(np.float32).to_numpy().tolist()
4
5 dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]
6
7 n_seq, seq_len, n_features = torch.stack(dataset).shape
8
9 return dataset, seq_len, n_features

Each Time Series will be converted to a 2D Tensor in the shape sequence length x number of features (140x1 in our case).

Let’s create some datasets:

1train_dataset, seq_len, n_features = create_dataset(train_df)
2val_dataset, _, _ = create_dataset(val_df)
3test_normal_dataset, _, _ = create_dataset(test_df)
4test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

LSTM Autoencoder

Autoencoder Sample Autoencoder Architecture Image Source

The general Autoencoder architecture consists of two components. An Encoder that compresses the input and a Decoder that tries to reconstruct it.

We’ll use the LSTM Autoencoder from this GitHub repo with some small tweaks. Our model’s job is to reconstruct Time Series data. Let’s start with the Encoder:

1class Encoder(nn.Module):
2
3 def __init__(self, seq_len, n_features, embedding_dim=64):
4 super(Encoder, self).__init__()
5
6 self.seq_len, self.n_features = seq_len, n_features
7 self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim
8
9 self.rnn1 = nn.LSTM(
10 input_size=n_features,
11 hidden_size=self.hidden_dim,
12 num_layers=1,
13 batch_first=True
14 )
15
16 self.rnn2 = nn.LSTM(
17 input_size=self.hidden_dim,
18 hidden_size=embedding_dim,
19 num_layers=1,
20 batch_first=True
21 )
22
23 def forward(self, x):
24 x = x.reshape((1, self.seq_len, self.n_features))
25
26 x, (_, _) = self.rnn1(x)
27 x, (hidden_n, _) = self.rnn2(x)
28
29 return hidden_n.reshape((self.n_features, self.embedding_dim))

The Encoder uses two LSTM layers to compress the Time Series data input.

Next, we’ll decode the compressed representation using a Decoder:

1class Decoder(nn.Module):
2
3 def __init__(self, seq_len, input_dim=64, n_features=1):
4 super(Decoder, self).__init__()
5
6 self.seq_len, self.input_dim = seq_len, input_dim
7 self.hidden_dim, self.n_features = 2 * input_dim, n_features
8
9 self.rnn1 = nn.LSTM(
10 input_size=input_dim,
11 hidden_size=input_dim,
12 num_layers=1,
13 batch_first=True
14 )
15
16 self.rnn2 = nn.LSTM(
17 input_size=input_dim,
18 hidden_size=self.hidden_dim,
19 num_layers=1,
20 batch_first=True
21 )
22
23 self.output_layer = nn.Linear(self.hidden_dim, n_features)
24
25 def forward(self, x):
26 x = x.repeat(self.seq_len, self.n_features)
27 x = x.reshape((self.n_features, self.seq_len, self.input_dim))
28
29 x, (hidden_n, cell_n) = self.rnn1(x)
30 x, (hidden_n, cell_n) = self.rnn2(x)
31 x = x.reshape((self.seq_len, self.hidden_dim))
32
33 return self.output_layer(x)

Our Decoder contains two LSTM layers and an output layer that gives the final reconstruction.

Time to wrap everything into an easy to use module:

1class RecurrentAutoencoder(nn.Module):
2
3 def __init__(self, seq_len, n_features, embedding_dim=64):
4 super(RecurrentAutoencoder, self).__init__()
5
6 self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)
7 self.decoder = Decoder(seq_len, embedding_dim, n_features).to(device)
8
9 def forward(self, x):
10 x = self.encoder(x)
11 x = self.decoder(x)
12
13 return x

Our Autoencoder passes the input through the Encoder and Decoder. Let’s create an instance of it:

1model = RecurrentAutoencoder(seq_len, n_features, 128)
2model = model.to(device)

Training

Let’s write a helper function for our training process:

1def train_model(model, train_dataset, val_dataset, n_epochs):
2 optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
3 criterion = nn.L1Loss(reduction='sum').to(device)
4 history = dict(train=[], val=[])
5
6 best_model_wts = copy.deepcopy(model.state_dict())
7 best_loss = 10000.0
8
9 for epoch in range(1, n_epochs + 1):
10 model = model.train()
11
12 train_losses = []
13 for seq_true in train_dataset:
14 optimizer.zero_grad()
15
16 seq_true = seq_true.to(device)
17 seq_pred = model(seq_true)
18
19 loss = criterion(seq_pred, seq_true)
20
21 loss.backward()
22 optimizer.step()
23
24 train_losses.append(loss.item())
25
26 val_losses = []
27 model = model.eval()
28 with torch.no_grad():
29 for seq_true in val_dataset:
30
31 seq_true = seq_true.to(device)
32 seq_pred = model(seq_true)
33
34 loss = criterion(seq_pred, seq_true)
35 val_losses.append(loss.item())
36
37 train_loss = np.mean(train_losses)
38 val_loss = np.mean(val_losses)
39
40 history['train'].append(train_loss)
41 history['val'].append(val_loss)
42
43 if val_loss < best_loss:
44 best_loss = val_loss
45 best_model_wts = copy.deepcopy(model.state_dict())
46
47 print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')
48
49 model.load_state_dict(best_model_wts)
50 return model.eval(), history

At each epoch, the training process feeds our model with all training examples and evaluates the performance on the validation set. Note that we’re using a batch size of 1 (our model sees only 1 sequence at a time). We also record the training and validation set losses during the process.

Note that we’re minimizing the L1Loss, which measures the MAE (mean absolute error). Why? The reconstructions seem to be better than with MSE (mean squared error).

We’ll get the version of the model with the smallest validation error. Let’s do some training:

1model, history = train_model(
2 model,
3 train_dataset,
4 val_dataset,
5 n_epochs=150
6)

png

Our model converged quite well. Seems like we might’ve needed a larger validation set to smoothen the results, but that’ll do for now.

Saving the model

Let’s store the model for later use:

1MODEL_PATH = 'model.pth'
2
3torch.save(model, MODEL_PATH)

Uncomment the next lines, if you want to download and load the pre-trained model:

1# !gdown --id 1jEYx5wGsb7Ix8cZAw3l5p5pOwHs3_I9A
2# model = torch.load('model.pth')
3# model = model.to(device)

Choosing a threshold

With our model at hand, we can have a look at the reconstruction error on the training set. Let’s start by writing a helper function to get predictions from our model:

1def predict(model, dataset):
2 predictions, losses = [], []
3 criterion = nn.L1Loss(reduction='sum').to(device)
4 with torch.no_grad():
5 model = model.eval()
6 for seq_true in dataset:
7 seq_true = seq_true.to(device)
8 seq_pred = model(seq_true)
9
10 loss = criterion(seq_pred, seq_true)
11
12 predictions.append(seq_pred.cpu().numpy().flatten())
13 losses.append(loss.item())
14 return predictions, losses

Our function goes through each example in the dataset and records the predictions and losses. Let’s get the losses and have a look at them:

1_, losses = predict(model, train_dataset)
2
3sns.distplot(losses, bins=50, kde=True);

png

1THRESHOLD = 26

Evaluation

Using the threshold, we can turn the problem into a simple binary classification task:

  • If the reconstruction loss for an example is below the threshold, we’ll classify it as a normal heartbeat
  • Alternatively, if the loss is higher than the threshold, we’ll classify it as an anomaly

Normal hearbeats

Let’s check how well our model does on normal heartbeats. We’ll use the normal heartbeats from the test set (our model haven’t seen those):

1predictions, pred_losses = predict(model, test_normal_dataset)
2sns.distplot(pred_losses, bins=50, kde=True);

png

We’ll count the correct predictions:

1correct = sum(l <= THRESHOLD for l in pred_losses)
2print(f'Correct normal predictions: {correct}/{len(test_normal_dataset)}')
1Correct normal predictions: 142/145

Anomalies

We’ll do the same with the anomaly examples, but their number is much higher. We’ll get a subset that has the same size as the normal heartbeats:

1anomaly_dataset = test_anomaly_dataset[:len(test_normal_dataset)]

Now we can take the predictions of our model for the subset of anomalies:

1predictions, pred_losses = predict(model, anomaly_dataset)
2sns.distplot(pred_losses, bins=50, kde=True);

png

Finally, we can count the number of examples above the threshold (considered as anomalies):

1correct = sum(l > THRESHOLD for l in pred_losses)
2print(f'Correct anomaly predictions: {correct}/{len(anomaly_dataset)}')
1Correct anomaly predictions: 142/145

We have very good results. In the real world, you can tweak the threshold depending on what kind of errors you want to tolerate. In this case, you might want to have more false positives (normal heartbeats considered as anomalies) than false negatives (anomalies considered as normal).

Looking at Examples

We can overlay the real and reconstructed Time Series values to see how close they are. We’ll do it for some normal and anomaly cases:

png

Summary

In this tutorial, you learned how to create an LSTM Autoencoder with PyTorch and use it to detect heartbeat anomalies in ECG data.

You learned how to:

  • Prepare a dataset for Anomaly Detection from Time Series Data
  • Build an LSTM Autoencoder with PyTorch
  • Train and evaluate your model
  • Choose a threshold for anomaly detection
  • Classify unseen examples as normal or anomaly

While our Time Series data is univariate (we have only 1 feature), the code should work for multivariate datasets (multiple features) with little or no modification. Feel free to try it!

References

Share

Want to be a Machine Learning expert?

Join the weekly newsletter on Data Science, Deep Learning and Machine Learning in your inbox, curated by me! Chosen by 10,000+ Machine Learning practitioners. (There might be some exclusive content, too!)

You'll never get spam from me