但由于上市公司的分红送股,股价除权除息,导致除权除息前后的行情数据不连续,影响了指标计算的连续性。需要先计算股票交易接口源码,复权行情数据,再做指标计算。
单次股票交易接口源码,复权因子
单次复权因子的计算有两种计算方法:
根据交易所行情数据计算
这种计算方式与交易所价格一致,但策略回测时的收益计算不包含分红再投资收益,对收益率有一定影响。
根据除权除息数据计算
比例=送股比例+转增比例+缩减比例
这种计算方式与交易所价格不一致,但策略回测时的收益计算包含分红再投资收益。
第一种方式简单实用,第二种方式收益计算更准确,个人认为第一种完全可以适用,但也对两种方式都做出说明和代码示例。
前复权
最近的交易日作为基点,原始行情数据乘以前复权因子,得到前复权行情数据,从使得最新的真实行情数据与前复权行情数据相等;
前复权的优点在于,指标计算的最新行情数据与委托价格相等;
后复权
最早的交易日作为基点,原始行情数据乘以后前复权因子,得到前复权行情数据,从使得最新的真实行情数据与前复权行情数据相等;
后复权的优点在于,策略回测可避免一部分因分红配股产生的未来数据;
累计复权因子计算
为方便与原始行情数据相乘,累计复权因子的数据结构为矩阵,column为股票代码,index为交易日历。
后复权因子的计算步骤如下:
单次复权因子累乘;
矩阵的index,从只有股份变动日扩充为全历史交易日历,并赋值nan延续前值;
由于上市初期的一段时间可能没有分红配股,所以赋值这段时间的为1;
前复权因子等于后复权因子除以最新的单次复权因子;
根据交易所行情数据
tgw提供的复权因子接口为QueryExFactorTable,计算方法是根据交易所行情数据计算的;
获取并规整dataframe数据格式的核心代码如下
class UpdateAdjFactor(object):
def __init__(self):
pass
def get_backward_factor(self, code_sh_list, code_sz_list, calendar_index):
backward_factor = pd.DataFrame(index=calendar_index)
market = "SH"
for market_type in [tgw.MarketType.kSSE, tgw.MarketType.kSZSE]:
code_list = code_sh_list
if market_type == tgw.MarketType.kSZSE:
code_list = code_sz_list
market = "SZ"
for code in code_list[:5]:
adj_factor, _ = tgw.QueryExFactorTable(code)
adj_factor.set_index(["ex_date"], inplace=True)
adj_factor.sort_index(inplace=True)
backward_factor[code+"."+market] = adj_factor["cum_factor"]
backward_factor.replace([np.inf, 0], np.nan, inplace=True)
backward_factor.fillna(method="ffill", inplace=True)
backward_factor.fillna(1, inplace=True)
return backward_factor
根据除权除息数据计算
tgw提供的除权除息数据接口为QueryThirdInfo,
根据除权除息数据,结合日线收盘价,实现后复权因子计算的核心代码如下
class UpdateAdjFactor(object):
def __init__(self):
pass
def get_backward_factor_ratio(self, close_df, code_sh_list, code_sz_list, calendar_index):
"""
取当日收盘价,作为转、送的股价,
再计算复权因子更新到AShareExRightDividend, 复权因子adj_factor
比例 = 送股比例 + 转增比例 + 缩减比例
单次复权因子 = 股权登记日收盘价 * (1 + 比例 + 配股比例 + 增发比例) /
(股权登记日收盘价 - 派息比例 + 股权登记日收盘价 * 比例 + 配股价格 * 配股比例 + 增发价格 * 增发比例)
:return:
"""
ex_right_pidend_df = None
market = "SH"
for market_type in [tgw.MarketType.kSSE, tgw.MarketType.kSZSE]:
code_list = code_sh_list
if market_type == tgw.MarketType.kSZSE:
code_list = code_sz_list
market = "SZ"
for code in code_list[:5]:
task_id = tgw.GetTaskID()
tgw.SetThirdInfoParam(task_id, "function_id", "A010030003")
tgw.SetThirdInfoParam(task_id, "start_date", "20130101")
tgw.SetThirdInfoParam(task_id, "end_date", "20991231")
tgw.SetThirdInfoParam(task_id, "market_code", code+"."+market)
df, _ = tgw.QueryThirdInfo(task_id)
if ex_right_pidend_df is None:
ex_right_pidend_df = df
else:
ex_right_pidend_df = ex_right_pidend_df.append(df)
ex_right_pidend_df["close"] = ex_right_pidend_df.apply(
lambda x: self.get_adj_day_close(x["MARKET_CODE"], int(x["EX_RD_DATE"]), close_df), axis=1)
ex_right_pidend_df = ex_right_pidend_df.fillna(0)
ratio = ex_right_pidend_df["BONUS_SHARE_RATIO"] + ex_right_pidend_df["CONVER_INCR_RATIO"] + ex_right_pidend_df["REDUCED_RATIO"]
ex_right_pidend_df["adj_factor"] = ex_right_pidend_df["close"] * (
1 + ratio + ex_right_pidend_df["RIGHT_ISSUE_RATIO"] + ex_right_pidend_df["SEO_RATIO"]) / (
ex_right_pidend_df["close"] - ex_right_pidend_df["DIV_PAYOUT_RATIO"] + ex_right_pidend_df["close"]
* ratio + ex_right_pidend_df["RIGHT_ISSUE_PRICE"] * ex_right_pidend_df["RIGHT_ISSUE_RATIO"] +
ex_right_pidend_df["SEO_PRICE"] * ex_right_pidend_df["SEO_RATIO"])
ex_right_pidend_df = ex_right_pidend_df.reindex(columns=["MARKET_CODE", "EX_RD_DATE", "adj_factor","close"])
ex_right_pidend_df.set_index(["EX_RD_DATE"], inplace=True)
ex_right_pidend_df.sort_index(inplace=True)
ex_right_pidend_df.fillna(method="ffill", inplace=True)
backward_factor_ratio = pd.DataFrame(index=calendar_index)
data_dict = dict(list(ex_right_pidend_df.groupby(ex_right_pidend_df["MARKET_CODE"])))
for security_code, adj_data in data_dict.items():
backward_factor_ratio[security_code] = adj_data["adj_factor"].cumprod(axis=0)
print(backward_factor_ratio[security_code])
print(adj_data["adj_factor"])
backward_factor_ratio.replace([np.inf, 0], np.nan, inplace=True)
backward_factor_ratio.fillna(method="ffill", inplace=True)
backward_factor_ratio.fillna(1, inplace=True)
backward_factor_ratio.sort_index(inplace=True)
return backward_factor_ratio, ex_right_pidend_df, data_dict
def get_adj_day_close(self, security_code, date, close_df):
security_code_market_data = 0
try:
security_code_market_data = close_df.loc[date, security_code]/1000000
except KeyError:
print(security_code, date, security_code_market_data)
return security_code_market_data
前复权因子计算
前复权因子,需要每日计算更新,代码如下:
def cal_forward_factor(self, backward_factor):
return backward_factor.p(backward_factor.iloc[-1])
文章为作者独立观点,不代表股票配资公司观点