import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from math import sqrt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import matplotlib.pyplot as plt
import akshare as ak
from pmdarima import auto_arima
# 获取基金数据
df = ak.fund_open_fund_info_em(symbol="008166", indicator="单位净值走势", period="成立以来")
df['净值日期'] = pd.to_datetime(df['净值日期'])
df.set_index('净值日期', inplace=True)
# 筛选指定日期范围的数据
start_date = '2020-03-05'
end_date = '2024-03-05'
df = df[(df.index >= start_date) & (df.index <= end_date)]
# 确保数据按日期排序
df.sort_index(inplace=True)
# 提取数值列,这里假设单位净值列名为'单位净值'
net_value = df['单位净值'].values
# 数据标准化
scaler = StandardScaler()
standardized_data = scaler.fit_transform(net_value.reshape(-1, 1))
# 创建时间序列数据集
def create_dataset(data, look_back=5):
X, Y = [], []
for i in range(len(data) - look_back - 1):
a = data[i:(i + look_back), 0]
X.append(a)
Y.append(data[i + look_back, 0])
return np.array(X), np.array(Y)
look_back = 5
X, Y = create_dataset(standardized_data, look_back)
# 划分训练集、验证集和测试集
train_size = 700
val_size = 100
test_size = 158
# 计算验证集和测试集的起始索引
val_start = len(X) - val_size - test_size
train_end = val_start
train_start = 0
# 训练集
X_train, Y_train = X[:train_size], Y[:train_size]
# 验证集
X_val, Y_val = X[train_size:(train_size + val_size)], Y[train_size:(train_size + val_size)]
# 测试集
X_test, Y_test = X[(train_size + val_size):], Y[(train_size + val_size):]
# 重塑输入数据的形状 [samples, time steps, features],只针对LSTM模型
X_train_lstm = X_train.reshape((X_train.shape[0], look_back, 1))
X_val_lstm = X_val.reshape((X_val.shape[0], look_back, 1))
X_test_lstm = X_test.reshape((X_test.shape[0], look_back, 1))
# 构建LSTM模型
model = Sequential()
model.add(LSTM(128, input_shape=(look_back, 1), return_sequences=True))
model.add(Dropout(0.2)) # 添加Dropout层
model.add(LSTM(64, return_sequences=False))
model.add(Dropout(0.2)) # 添加Dropout层
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
# 使用早停法和模型保存
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.001)
model_checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True)
# 训练模型
history = model.fit(X_train_lstm, Y_train, epochs=100, batch_size=32, verbose=2,
validation_data=(X_val_lstm, Y_val), callbacks=[early_stopping, reduce_lr, model_checkpoint])
# 预测
Y_pred_scaled = model.predict(X_test_lstm)
# 反标准化
Y_pred = scaler.inverse_transform(Y_pred_scaled)
Y_true = scaler.inverse_transform(Y_test.reshape(-1, 1))
# 计算LSTM模型的性能指标
rmse = sqrt(mean_squared_error(Y_true, Y_pred))
mae = mean_absolute_error(Y_true, Y_pred)
r2 = r2_score(Y_true, Y_pred)
# 构建ARIMA模型
# 差分处理以确保平稳性
Y_train_diff = np.diff(Y_train)
Y_test_diff = np.diff(Y_test)
# ADF检验
adf_result = adfuller(Y_train_diff)
print('ADF Statistic:', adf_result[0])
print('p-value:', adf_result[1])
# 自相关和偏自相关图
plot_acf(Y_train_diff, lags=50)
plot_pacf(Y_train_diff, lags=50)
plt.show()
# 参数选择
# 数据标准化
scaler = StandardScaler()
Y_train_scaled = scaler.fit_transform(Y_train.reshape(-1, 1))
Y_test_scaled = scaler.transform(Y_test.reshape(-1, 1))
# 定义网格搜索函数
def grid_search_arima(Y, start_p, max_p, start_q, max_q, d, seasonal=False):
best_aic = float('inf')
best_bic = float('inf')
best_order = None
for p in range(start_p, max_p + 1):
for q in range(start_q, max_q + 1):
try:
model = ARIMA(Y, order=(p, d, q))
results = model.fit()
aic = results.aic
bic = results.bic
if aic < best_aic:
best_aic = aic
best_order = (p, d, q)
if bic < best_bic:
best_bic = bic
except:
continue
return best_order, results
# 执行网格搜索
best_order, best_model = grid_search_arima(Y_train_scaled, 0, 5, 0, 5, 1)
print(f"Best ARIMA order according to AIC: {best_order}")
print(f"Best ARIMA order according to BIC: {best_order}")
# 使用找到的最佳参数构建ARIMA模型
model_arima = ARIMA(Y_train_scaled, order=best_order)
model_arima_fit = model_arima.fit()
# 预测
Y_pred_arima_scaled = model_arima_fit.forecast(steps=len(Y_test_scaled))
# 将预测结果重塑为二维数组
Y_pred_arima_scaled = np.reshape(Y_pred_arima_scaled, (-1, 1))
# 反标准化
Y_pred_arima = scaler.inverse_transform(Y_pred_arima_scaled)
# 计算ARIMA模型的性能指标
rmse_arima = sqrt(mean_squared_error(Y_test, Y_pred_arima))
mae_arima = mean_absolute_error(Y_test, Y_pred_arima)
r2_arima = r2_score(Y_test, Y_pred_arima)
# 打印所有模型的性能指标
print(f'LSTM Root Mean Squared Error: {rmse}')
print(f'LSTM Mean Absolute Error: {mae}')
print(f'LSTM R2 Score: {r2}')
print(f'ARIMA Root Mean Squared Error: {rmse_arima}')
print(f'ARIMA Mean Absolute Error: {mae_arima}')
print(f'ARIMA R2 Score: {r2_arima}')
# 可视化所有模型的预测结果
plt.figure(figsize=(12, 6))
plt.plot(Y_true, label='Actual')
plt.plot(Y_pred, label='LSTM Predicted')
plt.plot(Y_pred_arima, label='ARIMA Predicted')
plt.xlabel('Date')
plt.ylabel('Unit Net Value')
plt.legend()
plt.show()
# 可视化训练过程
plt.figure(figsize=(12, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
# 可视化原始单位净值走势
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['单位净值'], label='Actual Unit Net Value')
plt.xlabel('Date')
plt.ylabel('Unit Net Value')
plt.legend()
# 可视化原始单位净值走势并添加预测结果
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['单位净值'], label='Actual Unit Net Value', color='grey') # 将实际值颜色改为灰色
# 获取测试集的日期范围
test_dates = df.index[train_size + val_size:(train_size + val_size + test_size)]
adjusted_test_dates = test_dates[:152]
plt.plot(adjusted_test_dates, Y_pred.flatten(), label='LSTM Predicted', color='red') # 实线
plt.plot(adjusted_test_dates, Y_pred_arima.flatten(), label='ARIMA Predicted', color='green') # 实线
plt.axvline(x=test_dates[0], color='red', linestyle='--') # 添加红色虚线以区分预测值和真实值
plt.xlabel('Date')
plt.ylabel('Unit Net Value')
plt.legend()
plt.show()
相似问题