特征管理

最近更新时间:2026-01-09 15:23:01

我的收藏

功能概述

WeData 的特征管理模块通过提供特征离线存储和在线存储的服务,从而实现特征的统一管理,助力数据科学家和数据工程师在模型开发过程中,可以集约化的存储、读取、分析和复用特征,避免烟囱式的开发协作模式。
WeData的特征管理主要提供如下能力:
说明:
WeData 所管理的离线特征表,目前仅支持 DLC 的 Iceberg 表和 EMR 的 Hive 表,且注册特征表时必须指定表的主键及时间戳键,后续操作时将以所指定的主键和时间戳键进行特征索引。
类型
说明
界面操作功能
1. 支持查看特征表的基础信息、特征字段信息等;
2. 支持批量导入特征表;
3. 支持创建特征同步等。
特征工程 API
支持在 Studio 中调用进行离在线特征表的增删改查、同步、消费等操作。

界面操作

特征导入和默认库设置

离线特征表批量导入/注册

WeData 支持批量的导入/注册已有的特征表,进入特征管理页面,点击批量导入按钮,支持按照提示选择引擎、数据源和数据库,并勾选主键及对应的表后即可进行批量导入。


默认库设置

WeData 支持设置默认的离线特征库和在线特征库,将会在您调用 API 未指定特征库信息时,使用默认特征库信息。

并且支持在模块名右侧查看所设置的默认库信息。


创建在线表/特征同步

支持点击离线特征表的创建在线表的操作项,从而创建离线表到在线表的同步任务。支持选择快照、周期两种同步方式。
快照方式支持一次性的特征同步,设置计算资源、任务优先级等。

周期方式支持设置同步周期,支持按天、周、小时、分钟的同步周期,并且支持设置时间区间、计算资源、任务优先级等。


特征表详情查看

特征管理页面支持索引查看离线特征表和在线特征表的元信息:
列表页支持查看特征表名、离/在线情况、责任人、主键、时间戳、上次修改时间、数据地址、描述、操作项。
列表中在线特征表信息默认收起
支持在未创建在线特征表的情况下,提供创建在线特征表的操作项
已经创建在线特征表的情况下,不提供在线特征表的操作项,且支持通过表名左侧的三角形展开对应在线特征表的信息
一个离线特征表,仅能创建一个在线特征表
删除离线特征表之前,必须先将对应的在线特征表删除,否则不提供删除按钮


离线特征表详情

点击进入离线特征表详情后,可以查看创建时间、责任人、上次修改时间、数据地址、描述,且顶部会显示离线标签。
并支持查看特征信息,包括特征名、字段类型、特征描述,并且会标记出主键和时间戳。


在线特征表详情

点击进入在线特征表详情后,可以查看创建时间、责任人、同步时间(支持立即同步)、来源离线表(支持点击跳转至相应的离线表页面)、数据地址、同步方式、工作流任务(支持点击跳转至相应的特征同步工作流的页面)、同步状态、描述。
并支持查看特征信息,包括特征名、字段类型、特征描述,并且会标记出主键和时间戳。


特征工程API

FeatureStoreClient 是 WeData 特征存储的统一客户端,提供特征全生命周期管理能力,包括特征表创建、注册、读写、训练集创建、模型记录等功能。
说明:
使用特征工程 API 时需要用云服务密钥和 ID 来进行鉴权访问。
当开启 Catalog 功能后,部分 API 需要传入 Catalog_name,请注意查看 API 文档。
使用前,需要在 Studio 的运行环境中安装特征工程 API 包。
%pip install tencent-wedata-feature-engineering

初始化特征工程客户端

init

init API 可用于初始化 FeatureStoreClient 实例。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
spark
Optional[SparkSession]
已初始化的SparkSession对象,如未提供则自动创建
SparkSession.builder.getOrCreate()
cloud_secret_id
str
云服务密钥 ID
"your_secret_id"
cloud_secret_key
str
云服务密钥
"your_secret_key"
输出参数
结果名
字段类型
字段说明
client
FeatureStoreClient
返回 FeatureStoreClient 实例
调用示例
from wedata.feature_store.client import FeatureStoreClient
# 构建特征工程客户端实例
# create FeatureStoreClient
client = FeatureStoreClient(spark, cloud_secret_id=cloud_secret_id, cloud_secret_key=cloud_secret_key)
错误码
无特定错误码,SparkSession 创建失败会抛出相关异常。

离线特征表操作

create_table

创建特征表(支持批流数据写入)。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
特征表全称(格式:<table>)
"user_features"
primary_keys
Union[str, List[str]]
主键列名(支持复合主键)
"user_id" 或 ["user_id", "session_id"]
timestamp_key
str
时间戳键(用于时态特征)
"timestamp"
engine_type

wedata.feature_store.constants.engine_types.EngineTypes
引擎类型
EngineTypes.HIVE_ENGINE -- HIVE引擎

EngineTypes.
ICEBERG_ENGINE -- ICEBERG引擎
EngineTypes.HIVE_ENGINE
data_source_name
str
数据源名称
"hive_datasource"
database_name
Optional[str]
数据库名
"feature_db"
df
Optional[DataFrame]
初始数据(用于推断schema)
spark.createDataFrame([...])
partition_columns
Union[str, List[str], None]
分区列(优化存储查询)
"date" 或 ["date", "region"]
schema
Optional[StructType]
表结构定义(当不提供df时必需)
StructType([...])
description
Optional[str]
业务描述
"用户特征表"
tags
Optional[Dict[str, str]]
业务标签
{"domain": "user", "version": "v1"}
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
feature_table
FeatureTable
包含特征表元数据的FeatureTable对象
调用示例
from wedata.feature_store.constants.engine_types import EngineTypes
# 创建用户特征表--EMR on hive
# create user's feature table--EMR on hive
feature_table = client.create_table(
name=table_name, # 表名
primary_keys=["wine_id"], # 主键
df=wine_features_df, # 数据
engine_type=EngineTypes.HIVE_ENGINE,
data_source_name=data_source_name,
timestamp_key="event_timestamp",
tags={ # 业务标签
"purpose": "demo",
"create_by": "wedata"
}
)

# 创建用户特征表--DLC on iceberg(不开启catalog)
# create user's feature table--DLC on iceberg(disable catalog)
feature_table = client.create_table(
name=table_name, # 表名
database_name=database_name,
primary_keys=["wine_id"], # 主键
df=wine_features_df, # 数据
engine_type=EngineTypes.ICEBERG_ENGINE,
data_source_name=data_source_name,
timestamp_key="event_timestamp",
tags={ # 业务标签
"purpose": "demo",
"create_by": "wedata"
}
)

# 创建用户特征表--DLC on iceberg(开启catalog)
# create user's feature table--DLC on iceberg(enable catalog)
feature_table = client.create_table(
name=table_name, # 表名
primary_keys=["wine_id"], # 主键
df=wine_features_df, # 数据
engine_type=EngineTypes.ICEBERG_ENGINE,
data_source_name=data_source_name,
timestamp_key="event_timestamp",
tags={ # 业务标签
"purpose": "demo",
"create_by": "wedata"
},
catalog_name=catalog_name
)
错误码
ValueError:当 schema 与数据不匹配时会进行报错。

register_table

将普通的表注册为特征表,并返回特征表数据。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
特征表名称
"user_features"
timestamp_key
str
时间戳键(用于后续离在线特征同步)
"timestamp"
engine_type

wedata.feature_store.constants.engine_types.EngineTypes
引擎类型
EngineTypes.HIVE_ENGINE -- HIVE引擎

EngineTypes.
ICEBERG_ENGINE -- ICEBERG引擎
EngineTypes.HIVE_ENGINE
data_source_name
str
数据源名称
"hive_datasource"
database_name
Optional[str]
特征库名称
"feature_db"
primary_keys
Union[str, List[str]]
主键列名(仅当engine_type为EngineTypes.HIVE_ENGINE时有效)
"user_id"
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
dataframe
pyspark.DataFrame
包含特征表数据的DataFrame对象
调用示例
from wedata.feature_store.constants.engine_types import EngineTypes
# 注册特征表--EMR on hive
# register feature table--EMR on hive
client.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.HIVE_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",])

# 注册特征表--DLC on iceberg(不开启catalog)
# register feature table--DLC on iceberg(disable catalog)
client.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.ICEBERG_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",])

# 注册特征表--DLC on iceberg(开启catalog)
# register feature table--DLC on iceberg(enable catalog)
client.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.HIVE_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",], catalog_name=catalog_name)
错误码
无特定错误码。

read_table

读取特征表数据。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
特征表名称
"user_features"
database_name
Optional[str]
特征库名
"feature_db"
is_online
bool
是否读取在线特征表(默认False)
True
online_config
Optional[RedisStoreConfig]
在线特征表配置(仅当is_online为True时有效)
RedisStoreConfig(...)
entity_row
Optional[List[Dict[str, Any]]]
实体行数据(仅当is_online为True时有效)
[{"user_id": ["123", "456"]}]
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
dataframe
pyspark.DataFrame
包含特征表数据的DataFrame对象
调用示例
# 读取离线特征表--EMR on hive--&--DLC on iceberg(不开启catalog)
# read offline feature table--EMR on hive--&--DLC on iceberg(disable catalog)
get_df = client.read_table(name=table_name, database_name=database_name)
get_df.show(30)

# 读取在线特征表--EMR on hive--&--DLC on iceberg(不开启catalog)
# read online feature table--EMR on hive--&--DLC on iceberg(disable catalog)
online_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)
primary_keys_rows = [
{"wine_id": 1},
{"wine_id": 3}
]
result = client.read_table(name=table_name,database_name=database_name, is_online=True, online_config=online_config, entity_row=primary_keys_rows)
result.show()

# 读取离线特征表--DLC on iceberg(开启catalog)
# read offline feature table--DLC on iceberg(enable catalog)
get_df = client.read_table(name=table_name, database_name=database_name, catalog_name=catalog_name)
get_df.show(30)

# 读取在线特征表--DLC on iceberg(开启catalog)
# read online feature table--DLC on iceberg(enable catalog)
primary_keys_rows = [
{"wine_id": 1},
{"wine_id": 3}
]
result = client.read_table(name=table_name,database_name=database_name, is_online=True, online_config=online_config, entity_row=primary_keys_rows, catalog_name=catalog_name)
result.show()
错误码
无特定错误码。

get_table

获取特征表数据。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
特征表名称
"user_features"
database_name
Optional[str]
特征库名
"feature_db"
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
feature_table
FeatureTable
包含特征表元数据的FeatureTable对象
调用示例
# 获取特征表数据--EMR on hive--&--DLC on iceberg(不开启catalog)
# get feature table data--EMR on hive--&--DLC on iceberg(disable catalog)
featureTable = client.get_table(name=table_name, database_name=database_name)

# 读取在线特征表--DLC on iceberg(开启catalog)
# get feature table data--DLC on iceberg(enable catalog)
featureTable = client.get_table(name=table_name, database_name=database_name, catalog_name=catalog_name)
错误码
无特定错误码。

drop_table

删除特征表。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
要删除的特征表名称
"user_features"
database_name
Optional[str]
特征库名
"feature_db"
catalog_name
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
None
无返回值
调用示例
# 删除特征表--EMR on hive--&--DLC on iceberg(不开启catalog)
# drop feature table data--EMR on hive--&--DLC on iceberg(disable catalog)
client.drop_table(table_name)

# 删除特征表--DLC on iceberg(开启catalog)
# drop feature table data--DLC on iceberg(enable catalog)
client.drop_table(table_name,catalog_name="DataLakeCatalog")
错误码
无特定错误码。

write_table

写入数据到特征表(支持批处理和流式处理)。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
name
str
特征表全称(格式:<table>)
"user_features"
df
Optional[DataFrame]
要写入的数据DataFrame
spark.createDataFrame([...])
database_name
Optional[str]
特征库名
"feature_db"
mode
Optional[str]
写入模式(默认APPEND)
"overwrite"
checkpoint_location
Optional[str]
流式处理的检查点位置
"/checkpoints/user_features"
trigger
Dict[str, Any]
流式处理触发器配置
{"processingTime": "10 seconds"}
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
streaming_query
Optional[StreamingQuery]
如果是流式写入返回StreamingQuery对象,否则返回None
调用示例
from wedata.feature_store.constants.constants import APPEND

# 批处理写入--EMR on hive--&--DLC on iceberg(不开启catalog)
# batch write--EMR on hive--&--DLC on iceberg(disable catalog)
client.write_table(
name="user_features",
df=user_data_df,
database_name="feature_db",
mode="append"
)

# 流式处理写入--EMR on hive--&--DLC on iceberg(不开启catalog)
# strem write--EMR on hive--&--DLC on iceberg(disable catalog)
streaming_query = client.write_table(
name="user_features",
df=streaming_df,
database_name="feature_db",
checkpoint_location="/checkpoints/user_features",
trigger={"processingTime": "10 seconds"}
)

# 批处理写入--DLC on iceberg(开启catalog)
# batch write--EMR on hive--&--DLC on iceberg(enable catalog)
client.write_table(
name="user_features",
df=user_data_df,
database_name="feature_db",
mode="append",
catalog_name=catalog_name
)

# 流式处理写入--DLC on iceberg(开启catalog)
# strem write--EMR on hive--&--DLC on iceberg(enable catalog)
streaming_query = client.write_table(
name="user_features",
df=streaming_df,
database_name="feature_db",
checkpoint_location="/checkpoints/user_features",
trigger={"processingTime": "10 seconds"},
catalog_name=catalog_name
)
错误码
无特定错误码。

离线特征消费

Feature Lookup

对特征表进行特征查找。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
table_name
str
特征表的名称
"user_features"
lookup_key
Union[str, List[str]]
用于在特征表和训练集之间进行联接的键,一般使用主键
"user_id" 或 ["user_id", "session_id"]
is_online
bool
是否读取在线特征表(默认False)
True
online_config
Optional[RedisStoreConfig]
在线特征表配置(仅当is_online为True时有效)
RedisStoreConfig(...)
feature_names
Union[str, List[str], None]
要从特征表中查找的特征名称
["age", "gender", "preferences"]
rename_outputs
Optional[Dict[str, str]]
特征重命名映射
{"age": "user_age"}
timestamp_lookup_key
Optional[str]
用于时间点查找的时间戳键
"event_timestamp"
lookback_window
Optional[datetime.timedelta]
时间点查找的回溯窗口
datetime.timedelta(days=1)
输出参数
结果名
字段类型
字段说明
FeatureLookup对象
FeatureLookup
构造完成的FeatureLookup实例
调用示例
from wedata.feature_store.entities.feature_lookup import FeatureLookup

# 特征查找--EMR on hive--&--DLC on iceberg(不开启catalog)--&--DLC on iceberg(开启catalog)
# feature lookup--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)
wine_feature_lookup = FeatureLookup(
table_name=table_name,
lookup_key="wine_id",
timestamp_lookup_key="event_timestamp"
)
错误码
无特定错误码。

create_training_set

对特征表进行特征查找。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
df
Optional[DataFrame]
要写入的数据DataFrame
spark.createDataFrame([...])
feature_lookups
List[Union[FeatureLookup, FeatureFunction]]
特征查询列表
[FeatureLookup(...), FeatureFunction(...)]
label
Union[str, List[str], None]
标签列名
"is_churn" 或 ["label1", "label2"]
exclude_columns
Optional[List[str]]
排除列名
["user_id", "timestamp"]
database_name
Optional[str]
特征库名
"feature_db"
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
training_set
TrainingSet
构造完成的TrainingSet实例
调用示例
from wedata.feature_store.entities.feature_lookup import FeatureLookup
from wedata.feature_store.entities.feature_function import FeatureFunction
# 定义特征查找
# define feature lookup
wine_feature_lookup = FeatureLookup(
table_name=table_name,
lookup_key="wine_id",
timestamp_lookup_key="event_timestamp"
)

# 构建训练数据
# prepare training data
inference_data_df = wine_df.select(f"wine_id", "quality", "event_timestamp")

# 创建训练集--EMR on hive--&--DLC on iceberg(不开启catalog)
# create trainingset--EMR on hive--&--DLC on iceberg(disable catalog)
training_set = client.create_training_set(
df=inference_data_df, # 基础数据
feature_lookups=[wine_feature_lookup], # 特征查找配置
label="quality", # 标签列
exclude_columns=["wine_id", "event_timestamp"] # 排除不需要的列
)

# 创建训练集--DLC on iceberg(开启catalog)
# create trainingset--DLC on iceberg(enable catalog)
training_set = client.create_training_set(
df=inference_data_df, # 基础数据
feature_lookups=[wine_feature_lookup], # 特征查找配置
label="quality", # 标签列
exclude_columns=["wine_id", "event_timestamp"], # 排除不需要的列
database_name=database_name,
catalog_name=catalog_name
)

# 获取最终的训练DataFrame
# get final training DataFrame
training_df = training_set.load_df()

# 打印训练集数据
# print trainingset data
print(f"\\n=== 训练集数据 ===")
training_df.show(10, True)
错误码
ValueError: FeatureLookup必须指定table_name。

log_model

记录MLflow模型并关联特征查找信息。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
model
Any
要记录的模型对象
sklearn.RandomForestClassifier()
artifact_path
str
模型存储路径
"churn_model"
flavor
ModuleType
MLflow模型类型模块
mlflow.sklearn
training_set
Optional[TrainingSet]
训练模型使用的TrainingSet对象
training_set
registered_model_name
Optional[str]
要注册的模型名称(开启catalog后需要写catalog的模型名称)
"churn_prediction_model"
model_registry_uri
Optional[str]
模型注册中心地址
"databricks://model-registry"
await_registration_for
int
等待模型注册完成的秒数(默认300秒)
600
infer_input_example
bool
是否自动记录输入示例(默认False)
True
输出参数
调用示例
# 训练模型
# train model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
import mlflow.sklearn
import pandas as pd
import os
project_id=os.environ["WEDATA_PROJECT_ID"]
mlflow.set_experiment(experiment_name=expirement_name)

# 设置mlflow信息上报地址
# set mlflow tracking_uri
mlflow.set_tracking_uri("http://30.22.36.75:5000")

# 将Spark DataFrame转换为Pandas DataFrame用于训练
# convert Spark DataFrame to Pandas DataFrame to train
train_pd = training_df.toPandas()

# 删除时间戳列
# delete timestamp
# train_pd.drop('event_timestamp', axis=1)

# 准备特征和标签
# prepare features and tags
X = train_pd.drop('quality', axis=1)
y = train_pd['quality']

# 划分训练集和测试集
# split traningset and testset
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)

# 或者把日期转成时间戳(秒)
# or convert datatime to timestamp(second)
for col in X_train.select_dtypes(include=['datetime', 'datetimetz']):
X_train[col] = X_train[col].astype('int64') // 10**9 # 纳秒→秒

# 确认没有缺失值导致 dtype 被降级为 object
# Verify that no missing values cause the dtype to be downgraded to object.
X_train = X_train.fillna(X_train.median(numeric_only=True))

# 初始化并训练模型和记录模型--EMR on hive--&--DLC on iceberg(不开启catalog)--&--DLC on iceberg(开启catalog)
# Initialize, train and log the model.--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)
model = RandomForestClassifier(n_estimators=100, max_depth=3, random_state=42)
model.fit(X_train, y_train)

with mlflow.start_run():
client.log_model(
model=model,
artifact_path="wine_quality_prediction", # 模型文件路径名model artifact path
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=model_name, # 模型名称(开启catalog后需要写catalog的模型名称)model name(if enable catalog, must be catalog model name)
)
错误码
无特定错误码。

score_batch

执行批量推理,如果输入特征缺失(但注意需要传入对应的主键),推理过程中会按照模型所记录的feature_spec文件来自动的执行特征补全。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
model_uri
str
MLflow模型URI位置
"models:/churn_model/1"
df
DataFrame
要推理的DataFrame
spark.createDataFrame([...])
result_type
str
模型返回类型(默认"double")
"string"
输出参数
结果名
字段类型
字段说明
predictions
DataFrame
包含预测结果的DataFrame
调用示例
# 运行score_batch--EMR on hive--&--DLC on iceberg(不开启catalog)--&--DLC on iceberg(开启catalog)
# run score_batch--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)
result = client.score_batch(model_uri=f"models:/{model_name}/1", df=wine_df)
result.show()
错误码
无特定错误码。

在线特征表操作

publish_table

发布离线特征表为一个在线特征表。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
table_name
str
离线特征表全称(格式:<table>)
"user_features"
data_source_name
str
数据源名称
"hive_datasource"
database_name
Optional[str]
数据库名
"feature_db"
is_cycle
bool
是否启用周期性发布(默认False)
True
cycle_obj
TaskSchedulerConfiguration
周期性任务配置对象
TaskSchedulerConfiguration(...)
is_use_default_online
bool
是否使用默认在线存储配置(默认True)
False
online_config
RedisStoreConfig
自定义在线存储配置
RedisStoreConfig(...)
catalog_name
如果未开启catalog,则无需填入
如果开启catalog,则必填
str
特征表所在的catalog名
"DataLakeCatalog"
输出参数
结果名
字段类型
字段说明
None
无返回值
调用示例
from wedata.feature_store.common.store_config.redis import RedisStoreConfig
from wedata.feature_store.cloud_sdk_client.models import TaskSchedulerConfiguration

online_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)

# 将离线特征表发布成为在线特征表--EMR on hive--&--DLC on iceberg(不开启catalog)
# publish a offline feature table to online feature table--EMR on hive--&--DLC on iceberg(disable catalog)

# Case 1 将离线表table_name通过指定离线数据源传入到默认在线数据源,并采用快照方式
# Case 1 Pass the offline table table_name from the specified offline data source to the default online data source in snapshot mode.
result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=False, is_use_default_online=True)

# Case 2 将离线表table_name通过指定离线数据源传入到指定数据源(非默认),并采用快照方式, 需注意这里的IP要与录入的数据源中登陆的数据源IP信息一致。
# Case 2 Sync the offline table table_name from the specified offline data source to the designated non-default data source in snapshot mode. Note that the IP address here must be consistent with the login IP information of the entered data source.
result = client.publish_table(name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=False, is_use_default_online=False, online_config=online_config)

# Case 3 将离线表table_name通过指定离线数据源传入到默认在线数据源,并采用周期的方式,每五分钟运行一次
# Sync the offline table table_name from the specified offline data source to the default online data source in cycle mode, with a run interval of every five minutes.
cycle_obj = TaskSchedulerConfiguration()
cycle_obj.CrontabExpression = "0 0/5 * * * ?"
result = client.publish_table(name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=True, cycle_obj=cycle_obj, is_use_default_online=True)

# 将离线特征表发布成为在线特征表--DLC on iceberg(开启catalog)
# publish a offline feature table to online feature table--DLC on iceberg(enable catalog)

# Case 1 将离线表table_name通过指定离线数据源传入到默认在线数据源,并采用快照方式
# Case 1 Pass the offline table table_name from the specified offline data source to the default online data source in snapshot mode.
result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=False, is_use_default_online=True, catalog_name=catalog_name)

# Case 2 将离线表table_name通过指定离线数据源传入到指定数据源(非默认),并采用快照方式, 需注意这里的IP要与录入的数据源中登陆的数据源IP信息一致。
# Case 2 Sync the offline table table_name from the specified offline data source to the designated non-default data source in snapshot mode. Note that the IP address here must be consistent with the login IP information of the entered data source.
result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=False, is_use_default_online=False, online_config=online_config, catalog_name=catalog_name)

# Case 3 将离线表table_name通过指定离线数据源传入到默认在线数据源,并采用周期的方式,每五分钟运行一次
# Sync the offline table table_name from the specified offline data source to the default online data source in cycle mode, with a run interval of every five minutes.
cycle_obj = TaskSchedulerConfiguration()
cycle_obj.CrontabExpression = "0 0/5 * * * ?"
result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,
is_cycle=True, cycle_obj=cycle_obj, is_use_default_online=True, catalog_name=catalog_name)
错误码
无特定错误码。

drop_online_table

删除在线特征表。
输入参数
参数名
是否必填
字段类型
字段说明
入参示例
table_name
str
离线特征表全称(格式:<table>)
"user_features"
online_config
RedisStoreConfig
自定义在线存储配置
RedisStoreConfig(...)
database_name
Optional[str]
数据库名
"feature_db"
输出参数
结果名
字段类型
字段说明
None
无返回值
调用示例
from wedata.feature_store.common.store_config.redis import RedisStoreConfig
# 删除在线表--EMR on hive--&--DLC on iceberg(不开启catalog)--&--DLC on iceberg(开启catalog)
# delete online feature table--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)
online_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)
client.drop_online_table(table_name=table_name, database_name=database_name, online_config=online_config)
错误码
无特定错误码。