最近在港股美股上的收益还可以,但是在择时能力上感觉靠人的判断没有特别的准确,希望通过量化的方式去帮助择时,也不希望做高频,只希望高抛低吸做t的时候可以更准确
一开始尝试了bigquant,发现需要开会员才能用到比较高级的因子和策略,所以找了下开源的量化回测框架和数据
目前采用的是以下几个库:
目前做到的是用机器学习和深度学习来做未来涨跌的判定,dnn/gnn的情况还在研究,因为构造数据集和回测框架的兼容上比机器学习难度大一些;
特征粗略制作,样本就挑选了一个a股为例
初始化引入:
import os import warnings import logging import akshare as ak import backtrader as bt import matplotlib.pyplot as plt import numpy as np import pandas as pd import talib as ta from collections import Counter from sklearn.model_selection import train_test_split from imblearn.over_sampling import SMOTE from sklearn.preprocessing import StandardScaler import torch import torch.nn as nn import torch.optim as optim # ----------------------- 日志设置与警告屏蔽 ----------------------- DEBUG = False if DEBUG: logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s') else: logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') logger = logging.getLogger(__name__) warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning)
transfromer
## ----------------------- Transformer模型定义 ----------------------- class TransformerClassifier(nn.Module): def __init__(self, input_dim, nhead=2, num_encoder_layers=2, dim_feedforward=64, dropout=0.1): super(TransformerClassifier, self).__init__() # 如果 input_dim 不能被 nhead 整除,则增加一个线性层映射到新维度 if input_dim % nhead != 0: new_input_dim = ((input_dim // nhead) + 1) * nhead self.input_proj = nn.Linear(input_dim, new_input_dim) d_model = new_input_dim else: self.input_proj = None d_model = input_dim encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers) # 分类层 self.fc = nn.Linear(d_model, 1) def forward(self, x): # x: (batch_size, seq_length, input_dim) if self.input_proj is not None: x = self.input_proj(x) transformer_out = self.transformer_encoder(x) # (batch_size, seq_length, d_model) pooled = transformer_out.mean(dim=1) # (batch_size, d_model) output = self.fc(pooled) return torch.sigmoid(output)
Bilstm+Attention
# ------------------ 定义 BiLSTM+Attention 模型 ------------------ class BiLSTMAttention(nn.Module): def __init__(self, input_dim, hidden_dim, num_layers=1): super(BiLSTMAttention, self).__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True) # 注意力层:将每个时刻的输出映射为一个得分 self.attn_layer = nn.Linear(hidden_dim * 2, 1) self.fc = nn.Linear(hidden_dim * 2, 1) def forward(self, x): # x: (batch_size, seq_length, input_dim) lstm_out, _ = self.lstm(x) # (batch_size, seq_length, hidden_dim*2) attn_scores = self.attn_layer(lstm_out) # (batch_size, seq_length, 1) attn_weights = torch.softmax(attn_scores, dim=1) # (batch_size, seq_length, 1) context = torch.sum(attn_weights * lstm_out, dim=1) # (batch_size, hidden_dim*2) output = self.fc(context) # (batch_size, 1) output = torch.sigmoid(output) return output
transformer
def train_transformer_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, input_dim, nhead=2, num_encoder_layers=2, dim_feedforward=64, dropout=0.1, epochs=20, batch_size=32, patience=5): X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32) y_train_tensor = torch.tensor(y_train_seq, dtype=torch.float32).unsqueeze(1) X_val_tensor = torch.tensor(X_val_seq, dtype=torch.float32) y_val_tensor = torch.tensor(y_val_seq, dtype=torch.float32).unsqueeze(1) model = TransformerClassifier(input_dim, nhead, num_encoder_layers, dim_feedforward, dropout) criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) best_val_loss = float('inf') epochs_no_improve = 0 best_model_state = None for epoch in range(epochs): model.train() permutation = torch.randperm(X_train_tensor.size(0)) train_loss = 0 for i in range(0, X_train_tensor.size(0), batch_size): optimizer.zero_grad() indices = permutation[i:i+batch_size] batch_x = X_train_tensor[indices] batch_y = y_train_tensor[indices] outputs = model(batch_x) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() train_loss += loss.item() * batch_x.size(0) train_loss /= X_train_tensor.size(0) model.eval() with torch.no_grad(): val_outputs = model(X_val_tensor) val_loss = criterion(val_outputs, y_val_tensor).item() if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict() epochs_no_improve = 0 else: epochs_no_improve += 1 if epochs_no_improve >= patience: break if best_model_state is not None: model.load_state_dict(best_model_state) model.eval() with torch.no_grad(): val_preds = model(X_val_tensor) val_preds_binary = (val_preds >= 0.5).float() accuracy = (val_preds_binary.eq(y_val_tensor).sum().item() / y_val_tensor.size(0)) return model, accuracy
BiLSTM+Attention
def train_bilstm_attention_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, input_dim, hidden_dim=32, epochs=20, batch_size=32, patience=5): X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32) y_train_tensor = torch.tensor(y_train_seq, dtype=torch.float32).unsqueeze(1) X_val_tensor = torch.tensor(X_val_seq, dtype=torch.float32) y_val_tensor = torch.tensor(y_val_seq, dtype=torch.float32).unsqueeze(1) model = BiLSTMAttention(input_dim, hidden_dim) criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) best_val_loss = float('inf') epochs_no_improve = 0 best_model_state = None for epoch in range(epochs): model.train() permutation = torch.randperm(X_train_tensor.size(0)) train_loss = 0 for i in range(0, X_train_tensor.size(0), batch_size): optimizer.zero_grad() indices = permutation[i:i+batch_size] batch_x = X_train_tensor[indices] batch_y = y_train_tensor[indices] outputs = model(batch_x) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() train_loss += loss.item() * batch_x.size(0) train_loss /= X_train_tensor.size(0) model.eval() with torch.no_grad(): val_outputs = model(X_val_tensor) val_loss = criterion(val_outputs, y_val_tensor).item() if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict() epochs_no_improve = 0 else: epochs_no_improve += 1 if epochs_no_improve >= patience: break if best_model_state is not None: model.load_state_dict(best_model_state) model.eval() with torch.no_grad(): val_preds = model(X_val_tensor) val_preds_binary = (val_preds >= 0.5).float() accuracy = (val_preds_binary.eq(y_val_tensor).sum().item() / y_val_tensor.size(0)) return model, accuracy
辅助函数
# 辅助函数:构造序列数据 def create_sequences(X, y, seq_length): X_seq, y_seq = [], [] for i in range(len(X) - seq_length + 1): X_seq.append(X[i:i+seq_length]) y_seq.append(y[i+seq_length-1]) return np.array(X_seq), np.array(y_seq)
# ----------------------- 特征工程模块 ----------------------- def compute_features(close_list, volume_list, prediction_window): """ 构造特征数据,返回DataFrame,其中: - close_list: 按时间顺序排列的收盘价列表(最早在前) - volume_list: 成交量列表 - prediction_window: 预测窗口(未来N日) """ df = pd.DataFrame({'close': close_list, 'volume': volume_list}) df['high'] = df['close'] df['low'] = df['close'] # 1. 滞后收益率 for window in [5, 10, 20]: df[f'rtn_lag_{window}'] = df['close'].pct_change(window) # 2. 日收益率 & 波动率 df['daily_return'] = df['close'].pct_change() df['volatility_10'] = df['daily_return'].rolling(window=10).std() # 3. 均线及差值 df['ma_20'] = df['close'].rolling(window=20).mean() df['ma_diff'] = df['close'] / (df['ma_20'] + 1e-8) - 1 # 4. RSI(自定义与TA版本) delta = df['close'].diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.rolling(14).mean() avg_loss = loss.rolling(14).mean() rs = avg_gain / (avg_loss + 1e-8) df['rsi_14_custom'] = 100 - (100 / (1 + rs)) df['ta_rsi'] = ta.RSI(df['close'].values, timeperiod=14) # 5. MACD df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean() df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean() df['macd_custom'] = df['ema_12'] - df['ema_26'] df['macd_signal_custom'] = df['macd_custom'].ewm(span=9, adjust=False).mean() df['macd_hist_custom'] = df['macd_custom'] - df['macd_signal_custom'] df['ta_macd'], df['ta_macd_signal'], df['ta_macd_hist'] = ta.MACD( df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9) # 6. Bollinger Bands df['ta_boll_upper'], df['ta_boll_middle'], df['ta_boll_lower'] = ta.BBANDS( df['close'].values, timeperiod=20) # 7. ADX df['ta_adx'] = ta.ADX(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) # 8. OBV df['ta_obv'] = ta.OBV(df['close'].values, df['volume'].values) # 9. 慢速KD slowk, slowd = ta.STOCH( df['high'].values, df['low'].values, df['close'].values, fastk_period=14, slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0 ) df['ta_stoch_slowk'] = slowk df['ta_stoch_slowd'] = slowd # 10. CCI df['ta_cci'] = ta.CCI(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) # 11. Momentum df['ta_mom'] = ta.MOM(df['close'].values, timeperiod=10) # 12. ATR df['ta_atr'] = ta.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) # 13. MFI df['ta_mfi'] = ta.MFI(df['high'].values, df['low'].values, df['close'].values, df['volume'].values, timeperiod=14) # 14. PPO df['ta_ppo'] = ta.PPO(df['close'].values, fastperiod=12, slowperiod=26, matype=0) # 15. ROC df['ta_roc'] = ta.ROC(df['close'].values, timeperiod=10) # 16. TRIX df['ta_trix'] = ta.TRIX(df['close'].values, timeperiod=15) # 17. Williams %R df['ta_willr'] = ta.WILLR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) # 18. StochRSI df['ta_stochrsi'] = ta.STOCHRSI( df['close'].values, timeperiod=14, fastk_period=3, fastd_period=3, fastd_matype=0 )[0] # 19. ULTOSC df['ta_ultosc'] = ta.ULTOSC( df['high'].values, df['low'].values, df['close'].values, timeperiod1=7, timeperiod2=14, timeperiod3=28 ) # 20. Ichimoku def compute_ichimoku(high, low): high_series = pd.Series(high) low_series = pd.Series(low) tenkan = (high_series.rolling(window=9).max() + low_series.rolling(window=9).min()) / 2 kijun = (high_series.rolling(window=26).max() + low_series.rolling(window=26).min()) / 2 senkou_span_a = ((tenkan + kijun) / 2).shift(26) senkou_span_b = (high_series.rolling(window=52).max() + low_series.rolling(window=52).min()) / 2 senkou_span_b = senkou_span_b.shift(26) return tenkan, kijun, senkou_span_a, senkou_span_b tenkan, kijun, senkou_a, senkou_b = compute_ichimoku(df['high'].values, df['low'].values) df['ichi_tenkan'] = tenkan df['ichi_kijun'] = kijun df['ichi_senkou_a'] = senkou_a df['ichi_senkou_b'] = senkou_b # 21. 未来N日累积涨跌幅(预测目标) df['cum_return_future'] = df['close'].pct_change(periods=prediction_window).shift(-prediction_window) # 22. 52周range df['52w_high'] = df['close'].rolling(window=252).max() df['52w_low'] = df['close'].rolling(window=252).min() df['range_52w'] = (df['close'] - df['52w_low']) / (df['52w_high'] - df['52w_low'] + 1e-8) # 23. PE (示例) df['pe'] = 15 # 24. 换手率 df['vol_ma_20'] = df['volume'].rolling(window=20).mean() df['turnover'] = df['volume'] / (df['vol_ma_20'] + 1e-8) # 标签:未来累计涨幅超过1%为正样本 df['label'] = (df['cum_return_future'] > 0.01).astype(int) df = df.fillna(method='ffill').fillna(0) feature_cols = [col for col in df.columns if col not in ['label', 'cum_return_future', 'daily_return']] df = df[feature_cols + ['label', 'cum_return_future', 'daily_return']] df = df.fillna(method='ffill').fillna(0) return df, feature_cols
特征上用到都是比较基础统计类的特征,时序相关的还没开,实际上如果开发时序类特征,lstm+transformer的效果会好很多
策略上就是止盈止损+一些回撤止盈,比较简单就不写了,然后下面是代码主入口
# ----------------------- 主程序 ----------------------- if __name__ == '__main__': cerebro = bt.Cerebro() symbol = '600519' stock_data = fetch_akshare_data( symbol=symbol, start_date=pd.to_datetime('2018-01-01'), end_date=pd.to_datetime('2025-03-01') ) logger.info("\n[数据样例]") logger.info(stock_data.head(3)) logger.info(f"\n数据时间范围: {stock_data.index.min()} ~ {stock_data.index.max()}") data = bt.feeds.PandasData( dataname=stock_data, open=0, high=1, low=2, close=3, volume=4, openinterest=-1 ) cerebro.adddata(data) cerebro.addstrategy( MLStrategy, training_period=504, prediction_window=3, probability_threshold=0.65, ) cerebro.broker.setcash(1_000_000) cerebro.broker.setcommission(commission=0.001) cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') cerebro.addanalyzer(bt.analyzers.SharpeRatio, riskfreerate=0.0, annualize=True, timeframe=bt.TimeFrame.Days, _name='sharpe') cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trade_analyzer') cerebro.addanalyzer(bt.analyzers.TimeReturn, timeframe=bt.TimeFrame.Days, _name='time_return') logger.info("\n启动回测...") results = cerebro.run() strat = results[0] final_value = cerebro.broker.getvalue() logger.info(f"\n最终资金: {final_value:,.2f} CNY") sharpe = strat.analyzers.sharpe.get_analysis() if sharpe.get('sharperatio') is not None: logger.info(f"年化夏普比率: {sharpe['sharperatio']:.2f}") else: logger.info("年化夏普比率: N/A") drawdown = strat.analyzers.drawdown.get_analysis() logger.info(f"最大回撤: {drawdown['max']['drawdown']:.2f}% (持续 {drawdown['max']['len']} 天)") trade_analyzer = strat.analyzers.trade_analyzer.get_analysis() logger.info("\n交易统计: %s", trade_analyzer) time_return = strat.analyzers.time_return.get_analysis() if len(time_return) > 0: daily_returns = pd.Series(time_return) total_ret = (daily_returns + 1.0).prod() - 1.0 days = len(daily_returns) annual_ret = (1 + total_ret)**(252/days) - 1 if days > 0 else None if annual_ret is not None: logger.info(f"年化收益率: {annual_ret:.2%}") else: logger.info("无法计算年化收益率") else: logger.info("time_return结果为空,无法计算年化收益率") figs = cerebro.plot(style='candlestick', volume=False, barup='red', bardown='green') if figs and figs[0]: fig = figs[0][0] fig.savefig("backtrader_plot.png") logger.info("图形已保存为 backtrader_plot.png") plt.show()
当然这个代码非常粗糙,仅作为学习记录,如果有建议和不足请大家指正