使用keras的LSTM模型预测时间序列的简单步骤

LSTM简单代码案例

[Record] 使用keras的LSTM模型预测时间序列的操作步骤(模板)

  • 导入库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import pandas as pd 
    import numpy as np
    from keras.models import Sequential
    from keras.layers import Dense,LSTM,Dropout
    import matplotlib.pyplot as plt
    import keras
    %matplotlib inline
    import glob, os
    import seaborn as sns
    import sys
    from sklearn.preprocessing import MinMaxScaler # 归一化
    import matplotlib as mpl
    mpl.rcParams['figure.figsize']=12,8
  • 导入数据并查看

    1
    2
    3
    colomns=['awefew','wefwfd',...]
    data = pd.read_csv(file_path, numes=columns) #更改列名
    data.head()
  • 简单可视化数据,便于察觉特征

    1
    2
    3
    4
    5
    6
    plt.figure(figsize=(24,24))
    for i in range(xxx):
    plt.subplot(xxx,1,i+1)
    plt.plot(data.values[:,i]);
    plt.title(....)
    plt.show()
  • 找到需要预测的数据和可使用的特征,并将时间序列数据转化为监督学习问题数据

    定义如下函数即可:

    原理:sequence to sequence

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    def series_to_supervised(data, n_in=1, n_out=1, dropna=True):
    '''
    data: origin data
    n_in:
    '''
    n_vars = 1 if type(data) is list else data.shape[1]
    df = pd.DataFrame(data)
    cols, names = list(),list()

    for i in range(n_in,0,-1):
    cols.append(df.shift(i))
    names+=[('var%d(t-%d)'%(j+1, i)) for j in range(n_vars)]
    for i in range(0, n_out):
    cols.append(df.shift(-i))
    if i==0:
    names += [('var%d(t)'%(j+1)) for j in range(n_vars)]
    else:
    names += [('var%d(t+%d)'%(j+1, i)) for j in range(n_vars)]
    agg = pd.concat(cols, axis=1)
    agg.columns = names
    if dropna: # 是否去除缺失值的行
    agg.dropna(inplace=True)
    return agg
  • 归一化数据到 [0,1] 之间

    比如可以采用sklearn中的MinMaxScaler函数,公式为

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    scaler = MinMaxScaler(feature_range=(0,1))
    scaled_data = scaler.fit_transform(data[['afe','wef',...]].values)

    # 转化为监督学习问题
    reframed = series_to_supervised(scaled_data,1,1)

    # 处理需要预测的数据,把不需要预测的数据去除
    reframed.drop(reframed.columns[[..,..,...]], axis=1,inplace=True)

    reframed.info()
    reframed.head()
  • 设置训练集、验证集、测试集大小,并开始分割

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    train_days = xxx
    valid_days = xxx
    values = reframed.values

    train = values[:train_days, :]
    valid = values[train_days:train_days+valid_days,:]
    test = values[train_days+valid_days:, :]
    #比如:最后一列是y,前面所有列都是x
    train_x, train_y = train[:, :-1], train[:,-1]
    valid_x, valid_y = valid[:, :-1], valid[:,-1]
    test_x, test_y = test[:, :-1], test[:-1]
  • 将数据重构为符合keras内LSTM模型要求的数据格式

    [样本数,时间步,特征数]

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 以 时间步==1 为例
    train_x = train_x.reshape((train_x.shape[0], 1, train_x.shape[1]))
    valid_x = valid_x.reshape((valid_x.shape[0], 1, valid_x.shape[1]))
    test_x = test_x.shape((test_x.shape[0], 1, test_x.shape[1]))

    print(train_x.shape,
    trian_y.shape,
    valid_x.shape,
    valid_y.shape,
    test_x.shape,
    test_y.shape)
  • 建立模型

    注意keras.layers.LSTM中input_shape的输入格式为(时间步,特征数)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 搭一个LSTM网络,这里用最简单的三层结构
    model = Sequential([
    keras.layers.LSTM(50, activation='relu', input_shape=(train_x.shape[1],train_x.shape[2]))
    keras.layers.Dense(1, activarion='linear')
    ])

    # 配置一下
    model.compile(opirmizer='adam', # 这里采用adam优化算法
    loss='mean_squared_error') # 优化的损失函数为mse

    # 看一下结构
    model.summary()
  • 训练模型

    1
    2
    3
    4
    5
    6
    model_history = model.fit(train_x, train_y,
    epochs=100,
    batch_size=32,
    validation_data=(valid_x, valid_y),
    verbose=1,
    shuffle=False)
  • 根据需要,绘出loss对照图,观察收敛情况

    1
    2
    3
    4
    5
    plt.figure(figsize=(8,6))
    plt.plot(model_history.epoch, model_history.history['loss'])
    plt.plot(model_history.epoch, model_history.history['val_loss'])
    plt.legend()
    plt.show()
  • 模型预测&评估&可视化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # 模型预测
    train_predict = model.predict(train_x)
    valid_predict = model.predict(valid_x)
    test_predict = model.predict(test_x)

    # 模型评估
    evaluation = model.evaluate(test_x, test_y)

    # 把原始数据、训练、验证、测试 都画在一张图上 便于比较分析
    plt.figure(figsize=(24,8))
    plt.plot(values[:,-1], c='b', label='Whole Data')
    plt.plot([x for x in train_predict], c='g', label='Train Predict')
    plt.plot([None for _ in train_predict]+[x for x in valid_predict], label='Valid Predict')
    plt.plot([None for _ in train_predict]+[None for _ int valid_predict]+[x for x in test_predict], label='Test Predict')
    plt.legend(fontsize=15)
    plt.show()
  • 将预测结果反归一化并绘图比较分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    origin_data = np.array(data[train_days+valid_days:]['DATA_COL'])
    # 由于预测数据是1维的,但之前的scaler是5维的,所以我们用零填充剩余维度
    for i in range(4):
    test_predict = np.column_stack((test_predict, np.zeros(27)))
    # 反归一化
    inverse_test_predict = scalar.inverse_transform(test_predict)
    # 绘图
    plt.figure(figsize=(8,6))
    plt.plot(inverse_test_predict[:,0], c='r', label='predict')
    plt.plot(origin_data, c='g', label='origin')
    plt.legend()
    plt.show()