前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Quant102】50 个形态学指标的 Pandas 代码

【Quant102】50 个形态学指标的 Pandas 代码

作者头像
ApacheCN_飞龙
发布2024-05-24 17:49:46
1350
发布2024-05-24 17:49:46
举报
文章被收录于专栏:信数据得永生信数据得永生

早晨之星(黎明之星)

代码语言:javascript
复制
def morning_star(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    # 计算三日移动平均线
    df['ma3'] = df['close'].rolling(3).mean()
    
    # 计算昨天的收盘价
    df['prev_close'] = df['close'].shift(1)

    # 找到所有符合条件的早晨之星形态
    df['morning_star'] = (df['close'] < df['open']) & (df['close'] > df['open']*1.005) & \
                        (df['open'] > df['prev_close']) & (df['open'].shift(1) > df['close'].shift(1)) & \
                        (df['prev_close'] < df['ma3']) & (df['open'] > df['ma3']) & \
                        (df['volume'] > df['volume'].rolling(5).mean()*1.5)

    return df

黄昏之星(夜星)

代码语言:javascript
复制
def evening_star(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算第一天和第二天的趋势
    df['trend_1'] = df['close'].shift(-1) - df['open'].shift(-1)
    df['trend_2'] = df['open'] - df['close'].shift(-1)
    
    # 判断符合黄昏之星的条件
    df['evening_star'] = ((df['close'].shift(-1) > df['open'].shift(-1)) & 
                          (df['open'] > df['close'].shift(-1)) & 
                          (df['close'] < df['open'] - 0.5 * (df['high'].shift(-1) - df['low'].shift(-1))) & 
                          (df['close'] < df['close'].shift(-1)) & 
                          (df['open'] > df['open'].shift(-1)))
    
    # 在原数据帧上进行更新或者返回新的数据帧
    if inplace:
        return None
    else:
        return df

倾盆大雨

以下是实现形态学指标【倾盆大雨】的Python函数代码:

代码语言:javascript
复制
import pandas as pd

def downpour(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['downpour'] = (df['close'] - df['open']) / df['close'].shift(1)
    
    if not inplace:
        return df

# 示例用法
data = {
    'open': [10, 11, 12, 13],
    'high': [11, 12, 13, 14],
    'low': [9, 10, 11, 12],
    'close': [11, 12, 13, 14],
    'volume': [100, 200, 300, 400]
}
df = pd.DataFrame(data)

df = downpour(df)
print(df)

在这个示例中,downpour函数计算了倾盆大雨指标,并将结果保存在名为downpour的新列中。你可以将这个函数和示例数据一起运行来查看计算结果。

旭日东升

代码语言:javascript
复制
import numpy as np

def xuridongsheng(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA10'] = df['close'].rolling(window=10).mean()
    
    df['XRL'] = np.where(df['MA5'] > df['MA10'], 1, 0)
    df['XRH'] = np.where(df['MA5'] < df['MA10'], 1, 0)
    
    df['XURDS'] = df['MA5'] * (1 + (df['volume'] / df['volume'].shift(1) - 1) * df['XRL'])
    
    if not inplace:
        return df

淡友反攻

代码语言:javascript
复制
def bullish_engulfing(df, inplace=True):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open']) 
    df['prev_body'] = abs(df['close'].shift(1) - df['open'].shift(1))

    df['bullish_engulfing'] = (df['open'] < df['close']) & (df['open'].shift(1) > df['close'].shift(1)) & (df['close'] > df['open'].shift(1)) & (df['open'] < df['close'].shift(1)) & (df['body'] > df['prev_body'])
    
    if not inplace:
        return df

好友反攻

代码语言:javascript
复制
import pandas as pd

def friends_rebound(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算最高价的移动平均值
    df['high_ma'] = df['high'].rolling(window=5).mean()
    # 计算最低价的移动平均值
    df['low_ma'] = df['low'].rolling(window=5).mean()

    # 判断是否符合好友反攻条件
    df['is_friends_rebound'] = (df['close'] > df['open']) & (df['close'] > df['high_ma']) & (df['open'] > df['low']) & (df['volume'] > df['volume'].rolling(window=5).mean())

    if not inplace:
        return df

射击之星形态

代码语言:javascript
复制
import pandas as pd

def star_shooting(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算射击之星形态指标
    df['body'] = abs(df['open'] - df['close'])
    df['wick'] = df['high'] - df[['open', 'close']].max(axis=1)
    
    # 标记射击之星形态
    df['shooting_star'] = ((df['body'] < df['body'].shift()) &
                           (df['body'] < df['body'].shift(-1)) &
                           (df['wick'] > df['wick'].shift()) &
                           (df['wick'] > df['wick'].shift(-1)))
    
    # 返回结果
    return df

# 示例用法
data = {'open': [10, 11, 9, 10, 8],
        'high': [12, 13, 10, 11, 9],
        'low': [8, 10, 8, 9, 7],
        'close': [11, 10, 9, 8, 8],
        'volume': [1000, 1500, 800, 1200, 900]}
df = pd.DataFrame(data)
df = star_shooting(df)
print(df)

墓碑十字线(墓碑线)

代码语言:javascript
复制
def gravestone_doji(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['is_gravestone_doji'] = ((df['open'] == df['high']) & 
                                (df['close'] == df['low']) &
                                ((df['open'] - df['low']) > 2 * (df['high'] - df['close'])))
    
    return df

顶部尽头线(下山虎)

代码语言:javascript
复制
def down_tiger(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['lowest_low'] = df['low'].rolling(9).min()
    df['highest_close'] = df['close'].rolling(26).max()
    
    df['down_tiger'] = df['lowest_low'] - df['highest_close']
    
    return df

底部尽头线(上山虎)

代码语言:javascript
复制
def bottom_reversal(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算底部尽头线指标
    df['low_close'] = df['low'] - df['close'].shift(1)
    df['close_low'] = df['close'] - df['low']
    df['bottom_reversal'] = (df['low_close'] < 0) & (df['close_low'] > 0)

    if not inplace:
        return df

双针探底

代码语言:javascript
复制
def double_bottom(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算波谷
    df['valley'] = (df['low'] < df['low'].shift(-1)) & (df['low'] < df['low'].shift(1))

    # 计算双针探底
    df['double_bottom'] = df['valley'] & df['valley'].shift(2) & ~df['valley'].shift(1)

    if inplace:
        return
    else:
        return df

吊颈线(上吊线)

代码语言:javascript
复制
def hanging_man(df, inplace=False):
    if not inplace:
        df = df.copy()
    df['body'] = abs(df['open'] - df['close'])  # 实体
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    df['is_hanging_man'] = (df['body'] < df['body'].shift(1)) & (df['lower_shadow'] > 2 * df['body']) & (df['lower_shadow'] >= 2 * df['upper_shadow'])

    return df

乌云盖顶

代码语言:javascript
复制
def dark_cloud_cover(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['previous_close'] = df['close'].shift(1)
    df['body'] = df['open'] - df['close']
    df['shadow'] = (df['high'] - df['low']) / df['close']
    
    df['is_dark_cloud'] = ((df['close'] > df['open']) & 
                           (df['open'] > df['previous_close']) & 
                           (df['open'] > df['high']) &
                           ((df['open'] + df['close']) / 2 < df['high']) & 
                           (df['close'] > df['previous_close']) &
                           (df['body'] > 0.25 * (df['previous_close'] - df['open'])) &
                           (df['body'] < 0.75 * (df['previous_close'] - df['open'])) &
                           (df['shadow'] > 0.5))
    
    if not inplace:
        return df

曙光初现形态

代码语言:javascript
复制
import pandas as pd
import numpy as np

def dawn_appear(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['open_close_range'] = abs(df['open'] - df['close']) / df['open']
    
    df['dawn_appear'] = np.where(
        (df['open_close_range'] <= 0.01) & 
        (df['close'] > df['open']) & 
        (df['close'] > df['high'].shift(1)) & 
        (df['close'] > df['open'].shift(1)), 
        True, False
    )
    
    return df

# 示例用法
data = {'open': [10, 9, 11, 12, 10],
        'high': [12, 11, 12, 13, 11],
        'low': [9, 8, 10, 11, 9],
        'close': [11, 10, 11, 12, 11],
        'volume': [1000, 1200, 800, 1500, 1000]}

df = pd.DataFrame(data)
df = dawn_appear(df)
print(df)

上涨身怀六甲

代码语言:javascript
复制
import pandas as pd

def up_trend_six_pregnancies(df, inplace=True):
    if not inplace:
        df = df.copy()

    df['open-close'] = df['open'] - df['close']  # 计算开盘价和收盘价的差
    df['high-low'] = df['high'] - df['low']  # 计算最高价和最低价的差
    df['up_trend'] = (df['open-close'] > 0) & (df['open-close'].shift(1) > 0)  # 判断是否处于上涨趋势
    df['pregnancy_1'] = df['high-low'].rolling(3).sum() / 3  # 第一持有期
    df['pregnancy_2'] = df['high-low'].rolling(6).sum() / 6  # 第二持有期
    df['pregnancy_3'] = df['high-low'].rolling(9).sum() / 9  # 第三持有期
    df['pregnancy_4'] = df['high-low'].rolling(12).sum() / 12  # 第四持有期
    df['pregnancy_5'] = df['high-low'].rolling(18).sum() / 18  # 第五持有期
    df['pregnancy_6'] = df['high-low'].rolling(24).sum() / 24  # 第六持有期

    if not inplace:
        return df

下跌身怀六甲

代码语言:javascript
复制
def downtrend_six_pregnancies(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    df['body'] = abs(df['close'] - df['open'])  # 实体大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    df['pregnancies'] = ((df['close'] > df['open']) & (df['body'] > df['body'].shift(1)) & (df['body'] > df['body'].shift(2)) &
                         (df['upper_shadow'] < df['upper_shadow'].shift(1)) & (df['upper_shadow'] < df['upper_shadow'].shift(2)) &
                         (df['lower_shadow'] < df['lower_shadow'].shift(1)) & (df['lower_shadow'] < df['lower_shadow'].shift(2))).astype(int)
    
    return df

上涨孕十字星

下面是实现形态学指标【上涨孕十字星】的Python函数代码:

代码语言:javascript
复制
import pandas as pd

def bullish_engulfing(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])
    df['is_bullish_engulfing'] = False
    
    for i in range(1, len(df)):
        if df['close'][i] > df['open'][i] and df['open'][i-1] > df['close'][i-1] and df['close'][i-1] > df['open'][i] and df['open'][i] < df['close'][i-1] and df['body'][i] > df['body'][i-1]:
            df.at[i, 'is_bullish_engulfing'] = True
    
    return df

# 测试
data = {'open': [10, 8, 9, 7, 12],
        'high': [12, 9, 10, 8, 13],
        'low': [8, 7, 8, 6, 10],
        'close': [12, 9, 10, 8, 12],
        'volume': [1000, 2000, 1500, 3000, 2500]}
df = pd.DataFrame(data)

df = bullish_engulfing(df)
print(df)

该函数会在数据帧中添加两列:body表示K线实体长度,is_bullish_engulfing表示是否符合上涨孕十字星形态。在测试中,我们创建了一个示例数据帧并调用了bullish_engulfing函数,最终输出数据帧包含了新增的两列。

下跌孕十字星

代码语言:javascript
复制
import pandas as pd

def down_abandoned_baby(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_down_abandoned_baby'] = (df['body'] < df['body'].shift(1)) & (df['body'].shift(1) > df['body'].shift(2)) & (df['close'].shift(1) > df['open'].shift(1)) & (df['open'] < df['close']) & (df['close'] < df['open'].shift(1)) & (df['low'] > df['high'].shift(1)) & (df['high'] < df['low'].shift(1))

    if inplace:
        return
    else:
        return df

上涨孤独十字星

代码语言:javascript
复制
def upward_lone_cross_star(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_upward_lone_cross_star'] = ((df['body'] < 0.1 * df['open']) & 
                                       (df['upper_shadow'] < 0.3 * df['open']) &
                                       (df['lower_shadow'] > 2 * df['body']) & 
                                       (df['open'] < df['close']))

    if not inplace:
        return df

使用示例:

代码语言:javascript
复制
import pandas as pd
# 创建包含必要列的示例数据
data = {'open': [10, 8, 12, 8],
        'high': [11, 9, 13, 9],
        'low': [9, 7, 11, 7],
        'close': [10, 8, 11, 8],
        'volume': [1000, 2000, 1500, 1800]}
df = pd.DataFrame(data)

# 应用函数
upward_lone_cross_star(df, inplace=True)

# 查看结果
print(df)

下跌孤独十字星

代码语言:javascript
复制
def falling_lone_cross_star(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['is_falling_lone_cross_star'] = False
    
    for i in range(2, len(df)):
        if df['close'][i] < df['open'][i] and df['close'][i-1] < df['open'][i-1] and df['close'][i-2] < df['open'][i-2] \
            and df['close'][i-1] < df['close'][i-2] and df['close'][i-1] < df['open'][i-2]:
            
            df.at[i, 'is_falling_lone_cross_star'] = True
    
    if not inplace:
        return df

您可以将这段代码添加到您的量化交易策略中,用于识别下跌孤独十字星形态,从而辅助决策。

上涨Pinbar组合

代码语言:javascript
复制
def bullish_pinbar(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = abs(df['close'] - df['open'])
    df['upper_shadow_size'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow_size'] = df[['open', 'close']].min(axis=1) - df['low']
    
    df['bullish_pinbar'] = (
        (df['body_size'] < df['upper_shadow_size']) & 
        (df['body_size'] < df['lower_shadow_size']) & 
        (df['close'] > df['open'])
    )
    
    return df

下跌Pinbar组合

代码语言:javascript
复制
def down_pinbar(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])  # 计算蜡烛实体
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 计算上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 计算下影线
    
    # 判断下跌Pinbar
    df['down_pinbar'] = ((df['close'] < df['open']) &  # 实体为下跌
                         (df['body'] < 0.5 * df['low'] * 0.01) &  # 实体小于0.5%的最低价
                         (df['upper_shadow'] < 0.3 * df['low'] * 0.01) &  # 上影线小于0.3%的最低价
                         (df['lower_shadow'] < 0.3 * df['low'] * 0.01))  # 下影线小于0.3%的最低价
    
    return df

下跌螺旋桨

代码语言:javascript
复制
import numpy as np

def falling_screwing_propeller(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['Close-Open'] = df['close'] - df['open']
    df['High-Low'] = df['high'] - df['low']

    df['Body'] = np.abs(df['Close-Open'])
    df['Upper Shadow'] = np.where(df['Close-Open'] > 0, df['high'] - df['close'], df['high'] - df['open'])
    df['Lower Shadow'] = np.where(df['Close-Open'] > 0, df['open'] - df['low'], df['close'] - df['low'])
    
    if not inplace:
        return df

这段代码实现了计算下跌螺旋桨指标的函数,包括对数据帧中的收盘价、开盘价、最高价、最低价等列进行处理,计算出指标所需的各个数据并保存到数据帧中。最后返回更新后的数据帧。

上涨螺旋桨

下面是一个实现上涨螺旋桨指标的Python函数:

代码语言:javascript
复制
import numpy as np

def rising_screw_propeller(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['HL'] = df['high'] - df['low']
    df['HC'] = np.abs(df['high'] - df['close'].shift())
    df['LC'] = np.abs(df['low'] - df['close'].shift())

    df['TR'] = df[['HC', 'HL', 'LC']].max(axis=1)
    df['ATR'] = df['TR'].rolling(window=14).mean()

    df['RSPL'] = 100 * (df['close'] - df['close'].shift(1)) / df['ATR']

    if not inplace:
        return df

# 调用函数
df = rising_screw_propeller(df)

这个函数首先计算了当日的最高价和最低价的差值HL,以及当日最高价和昨日收盘价的差值HC、当日最低价和昨日收盘价的差值LC。然后通过这三个差值计算真实范围TR,并通过TR的移动平均计算平均真实范围ATR。最后根据公式计算上涨螺旋桨指标RSPL。如果inplace为False,则返回更新后的数据帧df,否则直接在原数据帧上进行更新。

阴包阳形态

代码语言:javascript
复制
def candlestick_pattern(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['bearish_engulfing'] = ((df['close'] < df['open']) & 
                                (df['close'].shift(1) > df['open'].shift(1)) & 
                                (df['open'] > df['close'].shift(1)) & 
                                (df['close'] > df['open'].shift(1)))
    
    df['bullish_engulfing'] = ((df['close'] > df['open']) & 
                                (df['close'].shift(1) < df['open'].shift(1)) & 
                                (df['open'] < df['close'].shift(1)) & 
                                (df['close'] < df['open'].shift(1)))
    
    if not inplace:
        return df

# 测试
import pandas as pd

data = {'open': [10, 11, 20, 15, 30],
        'high': [15, 15, 25, 25, 35],
        'low': [5, 10, 18, 12, 25],
        'close': [8, 12, 22, 18, 28],
        'volume': [100000, 120000, 150000, 110000, 200000]}

df = pd.DataFrame(data)

df = candlestick_pattern(df)

print(df)

阳包阴形态

代码语言:javascript
复制
def yang_bao_yin(df, inplace=True):
    if not inplace:
        df = df.copy()

    df['yang_bao_yin'] = (df['close'] > df['open']) & (df['close'] < df['open'] * 1.01) & (df['low'] < df['open']) & (df['high'] > df['close'])

    if not inplace:
        return df

仙人指路

代码语言:javascript
复制
import pandas as pd
import numpy as np

def williams_ad(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    high = df['high']
    low = df['low']
    close = df['close']
    volume = df['volume']

    ad = np.zeros(len(df))
    for i in range(1, len(df)):
        ad[i] = ad[i-1] + (close[i] - low[i]) - (high[i] - close[i])

    df['williams_ad'] = ad
    return df

中流砥柱形态

代码语言:javascript
复制
def morphology_indicator(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算中流砥柱形态指标
    df['body_height'] = abs(df['close'] - df['open'])  # 蜡烛实体高度
    df['upper_shadow_height'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线高度
    df['lower_shadow_height'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线高度
    df['morphology'] = (df['upper_shadow_height'] <= 2 * df['body_height']) & (df['lower_shadow_height'] <= 2 * df['body_height'])
    
    if not inplace:
        return df

使用示例:

代码语言:javascript
复制
import pandas as pd

# 创建示例数据
data = {
    'open': [10, 15, 12, 20],
    'high': [12, 18, 17, 22],
    'low': [9, 14, 11, 18],
    'close': [11, 16, 13, 21],
    'volume': [1000, 1500, 1200, 2000]
}
df = pd.DataFrame(data)

# 计算形态学指标
morphology_indicator(df, inplace=True)

print(df)

单针探底(定海神针)

代码语言:javascript
复制
import numpy as np

def single_needle_bottom(df, inplace=False):
    if not inplace:
        df = df.copy()

    def is_single_needle_bottom(row):
        if row['close'] < row['open'] and row['close'] < row['low'] + 0.5 * (row['high'] - row['low']):
            return True
        else:
            return False

    df['single_needle_bottom'] = df.apply(is_single_needle_bottom, axis=1)

    return df

锤头线(锤子线)

代码语言:javascript
复制
def hammer_candlestick(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
  
    df['is_hammer'] = ((df['body'] / df['body'].rolling(5).mean() < 0.02) & 
                      (df['upper_shadow'] / df['body'] < 0.15) &
                      (df['lower_shadow'] / df['body'] < 0.15))
  
    return df

看跌(高位倒锤头)流星线

以下是一个可以实现形态学指标【看跌(高位倒锤头)流星线】的Python函数:

代码语言:javascript
复制
def bearish_inverted_hammer(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_bearish_inverted_hammer'] = False
    for i in range(len(df)):
        if (df.loc[i]['body'] < 0.1 * df.loc[i]['high'] and
            df.loc[i]['upper_shadow'] > 2.5 * df.loc[i]['body'] and
            df.loc[i]['lower_shadow'] < 0.1 * df.loc[i]['high']):
            df.at[i, 'is_bearish_inverted_hammer'] = True

    return df

这个函数会计算每日的实体(body)、上影线(upper_shadow)和下影线(lower_shadow),然后检查是否符合看跌(高位倒锤头)流星线的形态。如果符合条件,则将is_bearish_inverted_hammer列设置为True,表示这一天出现了高位倒锤头形态。最后返回更新后的数据帧df

看涨(低位)倒锤头线

代码语言:javascript
复制
import pandas as pd

def bullish_inverted_hammer(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['open'] - df['close'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_bullish_inverted_hammer'] = ((df['close'] > df['open']) & 
                                         (df['open'] > df['low']) &
                                         (df['close'] > ((df['high'] + df['low']) / 2)) &
                                         (df['body'] < 0.2 * df['low']) &
                                         (df['upper_shadow'] < 2 * df['body']) &
                                         (df['lower_shadow'] > 2 * df['body']))

    if not inplace:
        return df

上涨镊子线(U形磁铁)

代码语言:javascript
复制
def u_shaped_magnet(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['u_shaped_magnet'] = 0
    
    for i in range(2, len(df)):
        if df['close'][i-2] > df['open'][i-2] and df['close'][i-1] < df['close'][i-2] and df['close'][i] > df['open'][i]:
            df.loc[i, 'u_shaped_magnet'] = 1
            
    return df

下跌镊子线(n形磁铁)

代码语言:javascript
复制
def falling_engulfing(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = abs(df['close'] - df['open'])
    df['body_top'] = df[['open', 'close']].max(axis=1)
    df['body_bottom'] = df[['open', 'close']].min(axis=1)
    df['body_range'] = df['body_top'] - df['body_bottom']
    
    df['prev_body_size'] = df['body_size'].shift(1)
    df['prev_body_top'] = df['body_top'].shift(1)
    df['prev_body_bottom'] = df['body_bottom'].shift(1)
    
    df['falling_engulfing'] = (df['close'] < df['open']) & (df['open'] > df['prev_body_top']) & (df['close'] < df['prev_body_bottom'])
    
    if not inplace:
        return df

三空阴线

代码语言:javascript
复制
def morphology_indicator(df, inplace=False):
    # 计算三空阴线指标
    df['close_open'] = df['close'] - df['open']
    df['close_low'] = df['close'] - df['low']
    df['open_close'] = df['open'] - df['close']
    
    df['three_black_crows'] = (df['close_open'] < 0) & (df['close_low'] < 0) & (df['open_close'] < 0)
    
    if inplace:
        return df
    else:
        return df.copy()

三空阳线

代码语言:javascript
复制
def three_white_soldiers(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = df['close'] - df['open']
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
    
    df['is_up_day'] = df['close'] > df['open']
    
    for i in range(2, len(df)):
        if df.loc[i-2, 'is_up_day'] and df.loc[i-1, 'is_up_day'] and df.loc[i, 'is_up_day']:
            if df.loc[i-2, 'body_size'] <= 0 or df.loc[i-1, 'body_size'] <= 0 or df.loc[i, 'body_size'] <= 0:
                continue
            if df.loc[i-2, 'lower_shadow'] / df.loc[i-2, 'body_size'] >= 0.5 or df.loc[i-1, 'lower_shadow'] / df.loc[i-1, 'body_size'] >= 0.5 or df.loc[i, 'lower_shadow'] / df.loc[i, 'body_size'] >= 0.5:
                continue
            if df.loc[i-2, 'close'] <= df.loc[i-1, 'close'] and df.loc[i-1, 'close'] <= df.loc[i, 'close']:
                df.loc[i, 'three_white_soldiers'] = 1
    
    df.drop(['body_size', 'upper_shadow', 'lower_shadow', 'is_up_day'], axis=1, inplace=True)
    
    return df

# 调用示例
import pandas as pd

data = {
    'open': [10, 11, 12, 13, 14],
    'high': [12, 13, 14, 15, 16],
    'low': [9, 10, 11, 12, 13],
    'close': [11, 12, 13, 14, 15],
    'volume': [1000, 1200, 1300, 1100, 1000]
}
df = pd.DataFrame(data)
df = three_white_soldiers(df, inplace=False)
print(df)

红三兵

代码语言:javascript
复制
def red_three_soldiers(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = df['close'] - df['open']
    df['up_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['down_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['up_body'] = df['body'].apply(lambda x: x if x > 0 else 0)
    df['down_body'] = df['body'].apply(lambda x: -x if x < 0 else 0)

    df['red_soldier'] = False

    for i in range(2, len(df)):
        if df.loc[i-2, 'up_body'] > 0 and df.loc[i-1, 'up_body'] > 0 and df.loc[i, 'up_body'] > 0:
            df.loc[i, 'red_soldier'] = True

    return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 11, 12, 13, 14],
    'high': [15, 16, 17, 18, 19],
    'low': [9, 10, 11, 12, 13],
    'close': [14, 15, 16, 17, 18],
    'volume': [1000, 2000, 1500, 1800, 1200]
}

df = pd.DataFrame(data)

df = red_three_soldiers(df)
print(df)

三只乌鸦

代码语言:javascript
复制
import pandas as pd

def three_black_crows(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['close_shifted'] = df['close'].shift(1)
    
    def is_three_black_crows(row):
        if row['close'] < row['open'] and row['close'] < row['close_shifted']:
            return True
        else:
            return False
    
    df['is_three_black_crows'] = df.apply(is_three_black_crows, axis=1)
    
    return df

# 使用示例
data = {'open': [10, 9, 8, 7],
        'high': [11, 10, 9, 8],
        'low': [9, 8, 7, 6],
        'close': [8, 7, 6, 5],
        'volume': [100, 200, 150, 180]}
df = pd.DataFrame(data)

df = three_black_crows(df)
print(df)

低位并排阳线

代码语言:javascript
复制
def low_parallel_up(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['low_parallel_up'] = (df['open'] < df['close']) & (df['low'] < df['open']) & (df['low'] < df['close'])

    return df

高位并排阳线

代码语言:javascript
复制
def high_concurrent_positive(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['high_close'] = df['high'] == df['close']
    df['prev_close'] = df['close'].shift(1)
    df['prev_high'] = df['high'].shift(1)
    df['prev_close_high'] = df['prev_close'] == df['prev_high']
    df['high_concurrent_positive'] = df['high_close'] & df['prev_close_high']

    if not inplace:
        return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 15, 20, 18, 25],
    'high': [12, 18, 22, 20, 27],
    'low': [8, 14, 18, 16, 22],
    'close': [11, 17, 21, 19, 26],
    'volume': [1000, 2000, 1500, 1800, 1200]
}

df = pd.DataFrame(data)

df = high_concurrent_positive(df)
print(df)

上升三部曲(升势三鸦)

代码语言:javascript
复制
def upward_three_crows(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = df['open'] - df['close']
    df['body_rel'] = df['body'] / (df['high'] - df['low'])
    
    for i in range(2, len(df)):
        if df['body'][i] < 0 and df['body'][i-1] < 0 and df['body'][i-2] < 0 \
        and df['body_rel'][i] >= 0.5 and df['body_rel'][i-1] >= 0.5 and df['body_rel'][i-2] >= 0.5:
            df.loc[i, 'upward_three_crows'] = 1
        else:
            df.loc[i, 'upward_three_crows'] = 0
    
    return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 9, 8, 7, 8, 7],
    'high': [12, 11, 10, 9, 9, 8],
    'low': [9, 8, 7, 6, 7, 6],
    'close': [9, 8, 7, 6, 7, 6],
    'volume': [100, 200, 150, 120, 300, 250]
}

df = pd.DataFrame(data)
df = upward_three_crows(df)
print(df)

下降三部曲(降势三鹤)

代码语言:javascript
复制
import pandas as pd

def descending_three_methods(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['short_ma'] = df['close'].rolling(window=5).mean()  # 计算5日均价
    df['long_ma'] = df['close'].rolling(window=20).mean()  # 计算20日均价
    df['volume_ma'] = df['volume'].rolling(window=5).mean()  # 计算5日成交量均值

    # 判断下降三部曲条件
    df['descending_three_methods'] = (df['short_ma'] < df['long_ma']) & (df['short_ma'].shift(1) > df['long_ma'].shift(1)) & (df['close'] < df['short_ma']) & (df['close'] < df['close'].shift(1)) & (df['volume'] < df['volume_ma'])

    if not inplace:
        return df

高档五阴线

代码语言:javascript
复制
import pandas as pd

def high_grade_five_black_crows(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算每根K线的实体大小
    df['body'] = abs(df['open'] - df['close'])
    
    # 计算每根K线的上影线和下影线
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
    
    # 判断是否为高档五阴线
    conditions = (df['body'] / df['open'] < 0.01) & (df['lower_shadow'] / df['open'] < 0.005) & \
                 (df['close'] < df['open']) & (df['close'].shift(1) < df['close'].shift(2)) & \
                 (df['close'].shift(1) < df['close'].shift(3)) & (df['close'].shift(1) < df['close'].shift(4))

    df['high_grade_five_black_crows'] = conditions.astype(int)
    
    return df

低档五阳线

代码语言:javascript
复制
def low_gang_wuyang(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['low_gang_wuyang'] = 0
    for i in range(4, len(df)):
        if df['close'][i] > df['close'][i-1] and df['close'][i-1] > df['close'][i-2] \
        and df['close'][i-2] > df['close'][i-3] and df['close'][i-3] > df['close'][i-4]:
            df.at[i, 'low_gang_wuyang'] = 1
    
    return df

两黑夹一红

代码语言:javascript
复制
def two_black_one_red(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])  # 计算实体大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 计算上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 计算下影线

    df['is_black'] = (df['close'] < df['open']).astype(int)  # 判断是否为黑蜡烛
    df['is_red'] = (df['close'] > df['open']).astype(int)  # 判断是否为红蜡烛

    df['pattern'] = (df['is_black'] + df['is_black'].shift(1) + df['is_red']) == 2  # 判断是否为两黑夹一红形态

    if not inplace:
        return df

这段代码定义了一个名为two_black_one_red的函数,该函数接受一个数据帧df,并包含一个inplace参数用于指示是否原地更新df。函数计算并添加了实体大小、上影线、下影线、蜡烛颜色、以及两黑夹一红形态的指标到df中。如果inplaceFalse,则函数会返回更新后的df;否则将直接在原地更新df

两红夹一黑

代码语言:javascript
复制
def two_red_one_black(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['close_shift1'] = df['close'].shift(1)
    df['close_shift2'] = df['close'].shift(2)
    df['open_shift1'] = df['open'].shift(1)
    df['open_shift2'] = df['open'].shift(2)
    
    df['is_red1'] = df['close'] > df['open']
    df['is_red2'] = df['close_shift1'] > df['open_shift1']
    df['is_black'] = df['close_shift2'] < df['open_shift2']

    df['two_red_one_black'] = ((df['is_red1'] & df['is_red2'] & df['is_black']).shift(-2)).astype(int)

    if not inplace:
        return df

多方炮

代码语言:javascript
复制
import pandas as pd

def bull_power(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['bull_power'] = df['high'] - (df['open'] + df['close']) / 2
    
    return df

空方炮

代码语言:javascript
复制
def short_selling_cannon(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])  # 实体的大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    
    df['short_selling_cannon'] = (df['upper_shadow'] > 2 * df['body']) & (df['lower_shadow'] < 0.5 * df['body'])

    return df

下降插入线(坠落线)

你可以使用以下代码来实现形态学指标【下降插入线(坠落线)】:

代码语言:javascript
复制
def descent_insert_line(df, inplace=False):
    if not inplace:
        df = df.copy()
    df['insert_line'] = (df['open'] + df['close']) / 2
    df['descent_insert_line'] = (df['high'] - df['insert_line']).rolling(window=3).max()
    if not inplace:
        return df

使用这个函数,你可以对数据帧df应用形态学指标【下降插入线(坠落线)】,并选择是否原地更新df

高开跳空缺口

代码语言:javascript
复制
def high_open_jump_gap(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    df['high_open_gap'] = df['open'] - df['close'].shift(1)
    
    return df

低开跳空缺口

代码语言:javascript
复制
def low_open_gap(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算当日开盘价与前一日收盘价的差值
    df['open_close_diff'] = df['open'] - df['close'].shift(1)
    
    # 判断是否出现低开跳空缺口
    df['low_open_gap'] = df['open_close_diff'] > 0
    
    if not inplace:
        return df
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-05-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 早晨之星(黎明之星)
  • 黄昏之星(夜星)
  • 倾盆大雨
  • 旭日东升
  • 淡友反攻
  • 好友反攻
  • 射击之星形态
  • 墓碑十字线(墓碑线)
  • 顶部尽头线(下山虎)
  • 底部尽头线(上山虎)
  • 双针探底
  • 吊颈线(上吊线)
  • 乌云盖顶
  • 曙光初现形态
  • 上涨身怀六甲
  • 下跌身怀六甲
  • 上涨孕十字星
  • 下跌孕十字星
  • 上涨孤独十字星
  • 下跌孤独十字星
  • 上涨Pinbar组合
  • 下跌Pinbar组合
  • 下跌螺旋桨
  • 上涨螺旋桨
  • 阴包阳形态
  • 阳包阴形态
  • 仙人指路
  • 中流砥柱形态
  • 单针探底(定海神针)
  • 锤头线(锤子线)
  • 看跌(高位倒锤头)流星线
  • 看涨(低位)倒锤头线
  • 上涨镊子线(U形磁铁)
  • 下跌镊子线(n形磁铁)
  • 三空阴线
  • 三空阳线
  • 红三兵
  • 三只乌鸦
  • 低位并排阳线
  • 高位并排阳线
  • 上升三部曲(升势三鸦)
  • 下降三部曲(降势三鹤)
  • 高档五阴线
  • 低档五阳线
  • 两黑夹一红
  • 两红夹一黑
  • 多方炮
  • 空方炮
  • 下降插入线(坠落线)
  • 高开跳空缺口
  • 低开跳空缺口
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档