机器学习+深度学习量化

背景

最近在港股美股上的收益还可以,但是在择时能力上感觉靠人的判断没有特别的准确,希望通过量化的方式去帮助择时,也不希望做高频,只希望高抛低吸做t的时候可以更准确

调研

一开始尝试了bigquant,发现需要开会员才能用到比较高级的因子和策略,所以找了下开源的量化回测框架和数据
目前采用的是以下几个库:

  1. talib
  2. backtrader
  3. akshare
  4. pybroker

目前做到的是用机器学习和深度学习来做未来涨跌的判定,dnn/gnn的情况还在研究,因为构造数据集和回测框架的兼容上比机器学习难度大一些;

效果

特征粗略制作,样本就挑选了一个a股为例

交易图表:

汇总结论

执行过程

  1. 模型训练+分步减仓
    减仓
  2. 模型训练+止损
    止损

数据表现:

  • 最终资金: 1,483,439.88 CNY (初始资金:1,000,000.00 CNY)
  • 回测时间:2018-01-01 至 2025-03-01 的 大A
  • 年化夏普比率: 0.51
  • 最大回撤: 30.80% (持续 489 天)
  • 年化收益率: 5.90%(只能跑赢通胀)

代码思路:

初始化引入:

import os
import warnings
import logging
import akshare as ak
import backtrader as bt
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import talib as ta
from collections import Counter
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
# ----------------------- 日志设置与警告屏蔽 -----------------------
DEBUG = False
if DEBUG:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

模型构建:

  1. transfromer

    ## ----------------------- Transformer模型定义 -----------------------
    class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, nhead=2, num_encoder_layers=2, dim_feedforward=64, dropout=0.1):
    super(TransformerClassifier, self).__init__()
    # 如果 input_dim 不能被 nhead 整除,则增加一个线性层映射到新维度
    if input_dim % nhead != 0:
    new_input_dim = ((input_dim // nhead) + 1) * nhead
    self.input_proj = nn.Linear(input_dim, new_input_dim)
    d_model = new_input_dim
    else:
    self.input_proj = None
    d_model = input_dim
    encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
    dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True)
    self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
    # 分类层
    self.fc = nn.Linear(d_model, 1)
    def forward(self, x):
    # x: (batch_size, seq_length, input_dim)
    if self.input_proj is not None:
    x = self.input_proj(x)
    transformer_out = self.transformer_encoder(x) # (batch_size, seq_length, d_model)
    pooled = transformer_out.mean(dim=1) # (batch_size, d_model)
    output = self.fc(pooled)
    return torch.sigmoid(output)
  2. Bilstm+Attention

    # ------------------ 定义 BiLSTM+Attention 模型 ------------------
    class BiLSTMAttention(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=1):
    super(BiLSTMAttention, self).__init__()
    self.hidden_dim = hidden_dim
    self.num_layers = num_layers
    self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
    # 注意力层:将每个时刻的输出映射为一个得分
    self.attn_layer = nn.Linear(hidden_dim * 2, 1)
    self.fc = nn.Linear(hidden_dim * 2, 1)
    def forward(self, x):
    # x: (batch_size, seq_length, input_dim)
    lstm_out, _ = self.lstm(x) # (batch_size, seq_length, hidden_dim*2)
    attn_scores = self.attn_layer(lstm_out) # (batch_size, seq_length, 1)
    attn_weights = torch.softmax(attn_scores, dim=1) # (batch_size, seq_length, 1)
    context = torch.sum(attn_weights * lstm_out, dim=1) # (batch_size, hidden_dim*2)
    output = self.fc(context) # (batch_size, 1)
    output = torch.sigmoid(output)
    return output

模型训练

  1. transformer

    def train_transformer_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, input_dim,
    nhead=2, num_encoder_layers=2, dim_feedforward=64, dropout=0.1,
    epochs=20, batch_size=32, patience=5):
    X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train_seq, dtype=torch.float32).unsqueeze(1)
    X_val_tensor = torch.tensor(X_val_seq, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val_seq, dtype=torch.float32).unsqueeze(1)
    model = TransformerClassifier(input_dim, nhead, num_encoder_layers, dim_feedforward, dropout)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None
    for epoch in range(epochs):
    model.train()
    permutation = torch.randperm(X_train_tensor.size(0))
    train_loss = 0
    for i in range(0, X_train_tensor.size(0), batch_size):
    optimizer.zero_grad()
    indices = permutation[i:i+batch_size]
    batch_x = X_train_tensor[indices]
    batch_y = y_train_tensor[indices]
    outputs = model(batch_x)
    loss = criterion(outputs, batch_y)
    loss.backward()
    optimizer.step()
    train_loss += loss.item() * batch_x.size(0)
    train_loss /= X_train_tensor.size(0)
    model.eval()
    with torch.no_grad():
    val_outputs = model(X_val_tensor)
    val_loss = criterion(val_outputs, y_val_tensor).item()
    if val_loss < best_val_loss:
    best_val_loss = val_loss
    best_model_state = model.state_dict()
    epochs_no_improve = 0
    else:
    epochs_no_improve += 1
    if epochs_no_improve >= patience:
    break
    if best_model_state is not None:
    model.load_state_dict(best_model_state)
    model.eval()
    with torch.no_grad():
    val_preds = model(X_val_tensor)
    val_preds_binary = (val_preds >= 0.5).float()
    accuracy = (val_preds_binary.eq(y_val_tensor).sum().item() / y_val_tensor.size(0))
    return model, accuracy
  2. BiLSTM+Attention

    def train_bilstm_attention_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, input_dim,
    hidden_dim=32, epochs=20, batch_size=32, patience=5):
    X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train_seq, dtype=torch.float32).unsqueeze(1)
    X_val_tensor = torch.tensor(X_val_seq, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val_seq, dtype=torch.float32).unsqueeze(1)
    model = BiLSTMAttention(input_dim, hidden_dim)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None
    for epoch in range(epochs):
    model.train()
    permutation = torch.randperm(X_train_tensor.size(0))
    train_loss = 0
    for i in range(0, X_train_tensor.size(0), batch_size):
    optimizer.zero_grad()
    indices = permutation[i:i+batch_size]
    batch_x = X_train_tensor[indices]
    batch_y = y_train_tensor[indices]
    outputs = model(batch_x)
    loss = criterion(outputs, batch_y)
    loss.backward()
    optimizer.step()
    train_loss += loss.item() * batch_x.size(0)
    train_loss /= X_train_tensor.size(0)
    model.eval()
    with torch.no_grad():
    val_outputs = model(X_val_tensor)
    val_loss = criterion(val_outputs, y_val_tensor).item()
    if val_loss < best_val_loss:
    best_val_loss = val_loss
    best_model_state = model.state_dict()
    epochs_no_improve = 0
    else:
    epochs_no_improve += 1
    if epochs_no_improve >= patience:
    break
    if best_model_state is not None:
    model.load_state_dict(best_model_state)
    model.eval()
    with torch.no_grad():
    val_preds = model(X_val_tensor)
    val_preds_binary = (val_preds >= 0.5).float()
    accuracy = (val_preds_binary.eq(y_val_tensor).sum().item() / y_val_tensor.size(0))
    return model, accuracy
  3. 辅助函数

    # 辅助函数:构造序列数据
    def create_sequences(X, y, seq_length):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_length + 1):
    X_seq.append(X[i:i+seq_length])
    y_seq.append(y[i+seq_length-1])
    return np.array(X_seq), np.array(y_seq)

特征工程:

# ----------------------- 特征工程模块 -----------------------
def compute_features(close_list, volume_list, prediction_window):
"""
构造特征数据,返回DataFrame,其中:
- close_list: 按时间顺序排列的收盘价列表(最早在前)
- volume_list: 成交量列表
- prediction_window: 预测窗口(未来N日)
"""
df = pd.DataFrame({'close': close_list, 'volume': volume_list})
df['high'] = df['close']
df['low'] = df['close']
# 1. 滞后收益率
for window in [5, 10, 20]:
df[f'rtn_lag_{window}'] = df['close'].pct_change(window)
# 2. 日收益率 & 波动率
df['daily_return'] = df['close'].pct_change()
df['volatility_10'] = df['daily_return'].rolling(window=10).std()
# 3. 均线及差值
df['ma_20'] = df['close'].rolling(window=20).mean()
df['ma_diff'] = df['close'] / (df['ma_20'] + 1e-8) - 1
# 4. RSI(自定义与TA版本)
delta = df['close'].diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(14).mean()
avg_loss = loss.rolling(14).mean()
rs = avg_gain / (avg_loss + 1e-8)
df['rsi_14_custom'] = 100 - (100 / (1 + rs))
df['ta_rsi'] = ta.RSI(df['close'].values, timeperiod=14)
# 5. MACD
df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean()
df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean()
df['macd_custom'] = df['ema_12'] - df['ema_26']
df['macd_signal_custom'] = df['macd_custom'].ewm(span=9, adjust=False).mean()
df['macd_hist_custom'] = df['macd_custom'] - df['macd_signal_custom']
df['ta_macd'], df['ta_macd_signal'], df['ta_macd_hist'] = ta.MACD(
df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# 6. Bollinger Bands
df['ta_boll_upper'], df['ta_boll_middle'], df['ta_boll_lower'] = ta.BBANDS(
df['close'].values, timeperiod=20)
# 7. ADX
df['ta_adx'] = ta.ADX(df['high'].values, df['low'].values, df['close'].values, timeperiod=14)
# 8. OBV
df['ta_obv'] = ta.OBV(df['close'].values, df['volume'].values)
# 9. 慢速KD
slowk, slowd = ta.STOCH(
df['high'].values, df['low'].values, df['close'].values,
fastk_period=14, slowk_period=3, slowk_matype=0,
slowd_period=3, slowd_matype=0
)
df['ta_stoch_slowk'] = slowk
df['ta_stoch_slowd'] = slowd
# 10. CCI
df['ta_cci'] = ta.CCI(df['high'].values, df['low'].values, df['close'].values, timeperiod=14)
# 11. Momentum
df['ta_mom'] = ta.MOM(df['close'].values, timeperiod=10)
# 12. ATR
df['ta_atr'] = ta.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14)
# 13. MFI
df['ta_mfi'] = ta.MFI(df['high'].values, df['low'].values, df['close'].values,
df['volume'].values, timeperiod=14)
# 14. PPO
df['ta_ppo'] = ta.PPO(df['close'].values, fastperiod=12, slowperiod=26, matype=0)
# 15. ROC
df['ta_roc'] = ta.ROC(df['close'].values, timeperiod=10)
# 16. TRIX
df['ta_trix'] = ta.TRIX(df['close'].values, timeperiod=15)
# 17. Williams %R
df['ta_willr'] = ta.WILLR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14)
# 18. StochRSI
df['ta_stochrsi'] = ta.STOCHRSI(
df['close'].values, timeperiod=14,
fastk_period=3, fastd_period=3, fastd_matype=0
)[0]
# 19. ULTOSC
df['ta_ultosc'] = ta.ULTOSC(
df['high'].values, df['low'].values, df['close'].values,
timeperiod1=7, timeperiod2=14, timeperiod3=28
)
# 20. Ichimoku
def compute_ichimoku(high, low):
high_series = pd.Series(high)
low_series = pd.Series(low)
tenkan = (high_series.rolling(window=9).max() + low_series.rolling(window=9).min()) / 2
kijun = (high_series.rolling(window=26).max() + low_series.rolling(window=26).min()) / 2
senkou_span_a = ((tenkan + kijun) / 2).shift(26)
senkou_span_b = (high_series.rolling(window=52).max() + low_series.rolling(window=52).min()) / 2
senkou_span_b = senkou_span_b.shift(26)
return tenkan, kijun, senkou_span_a, senkou_span_b
tenkan, kijun, senkou_a, senkou_b = compute_ichimoku(df['high'].values, df['low'].values)
df['ichi_tenkan'] = tenkan
df['ichi_kijun'] = kijun
df['ichi_senkou_a'] = senkou_a
df['ichi_senkou_b'] = senkou_b
# 21. 未来N日累积涨跌幅(预测目标)
df['cum_return_future'] = df['close'].pct_change(periods=prediction_window).shift(-prediction_window)
# 22. 52周range
df['52w_high'] = df['close'].rolling(window=252).max()
df['52w_low'] = df['close'].rolling(window=252).min()
df['range_52w'] = (df['close'] - df['52w_low']) / (df['52w_high'] - df['52w_low'] + 1e-8)
# 23. PE (示例)
df['pe'] = 15
# 24. 换手率
df['vol_ma_20'] = df['volume'].rolling(window=20).mean()
df['turnover'] = df['volume'] / (df['vol_ma_20'] + 1e-8)
# 标签:未来累计涨幅超过1%为正样本
df['label'] = (df['cum_return_future'] > 0.01).astype(int)
df = df.fillna(method='ffill').fillna(0)
feature_cols = [col for col in df.columns if col not in ['label', 'cum_return_future', 'daily_return']]
df = df[feature_cols + ['label', 'cum_return_future', 'daily_return']]
df = df.fillna(method='ffill').fillna(0)
return df, feature_cols

特征上用到都是比较基础统计类的特征,时序相关的还没开,实际上如果开发时序类特征,lstm+transformer的效果会好很多

主程序

策略上就是止盈止损+一些回撤止盈,比较简单就不写了,然后下面是代码主入口

# ----------------------- 主程序 -----------------------
if __name__ == '__main__':
cerebro = bt.Cerebro()
symbol = '600519'
stock_data = fetch_akshare_data(
symbol=symbol,
start_date=pd.to_datetime('2018-01-01'),
end_date=pd.to_datetime('2025-03-01')
)
logger.info("\n[数据样例]")
logger.info(stock_data.head(3))
logger.info(f"\n数据时间范围: {stock_data.index.min()} ~ {stock_data.index.max()}")
data = bt.feeds.PandasData(
dataname=stock_data,
open=0, high=1, low=2, close=3, volume=4, openinterest=-1
)
cerebro.adddata(data)
cerebro.addstrategy(
MLStrategy,
training_period=504,
prediction_window=3,
probability_threshold=0.65,
)
cerebro.broker.setcash(1_000_000)
cerebro.broker.setcommission(commission=0.001)
cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
cerebro.addanalyzer(bt.analyzers.SharpeRatio, riskfreerate=0.0, annualize=True,
timeframe=bt.TimeFrame.Days, _name='sharpe')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trade_analyzer')
cerebro.addanalyzer(bt.analyzers.TimeReturn, timeframe=bt.TimeFrame.Days, _name='time_return')
logger.info("\n启动回测...")
results = cerebro.run()
strat = results[0]
final_value = cerebro.broker.getvalue()
logger.info(f"\n最终资金: {final_value:,.2f} CNY")
sharpe = strat.analyzers.sharpe.get_analysis()
if sharpe.get('sharperatio') is not None:
logger.info(f"年化夏普比率: {sharpe['sharperatio']:.2f}")
else:
logger.info("年化夏普比率: N/A")
drawdown = strat.analyzers.drawdown.get_analysis()
logger.info(f"最大回撤: {drawdown['max']['drawdown']:.2f}% (持续 {drawdown['max']['len']} 天)")
trade_analyzer = strat.analyzers.trade_analyzer.get_analysis()
logger.info("\n交易统计: %s", trade_analyzer)
time_return = strat.analyzers.time_return.get_analysis()
if len(time_return) > 0:
daily_returns = pd.Series(time_return)
total_ret = (daily_returns + 1.0).prod() - 1.0
days = len(daily_returns)
annual_ret = (1 + total_ret)**(252/days) - 1 if days > 0 else None
if annual_ret is not None:
logger.info(f"年化收益率: {annual_ret:.2%}")
else:
logger.info("无法计算年化收益率")
else:
logger.info("time_return结果为空,无法计算年化收益率")
figs = cerebro.plot(style='candlestick', volume=False, barup='red', bardown='green')
if figs and figs[0]:
fig = figs[0][0]
fig.savefig("backtrader_plot.png")
logger.info("图形已保存为 backtrader_plot.png")
plt.show()

下一步计划

  1. 构造特征要加强,要筛选出更多的因子,这次只是实验,所以因子挑选的比较简单
  2. 模型结构可以调整一下,深度和注意力可以加多
  3. gnn再尝试一下,板块内部的股票实际上是有关联的

当然这个代码非常粗糙,仅作为学习记录,如果有建议和不足请大家指正