前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯云 ES 8 向量化语义混合检索一站式体验指南

腾讯云 ES 8 向量化语义混合检索一站式体验指南

原创
作者头像
岳涛
发布2024-07-30 19:51:42
2751
发布2024-07-30 19:51:42
举报
文章被收录于专栏:大数据生态

说明

本文描述问题及解决方法同样适用于 腾讯云 Elasticsearch Service(ES)

另外使用到:腾讯云 云服务器(Cloud Virtual Machine,CVM)

声明

本文使用的商品样本数据系混元大模型生成的商品数据。

环境配置

客户端环境

版本

CVM 镜像:CentOS 7.9 64位 | img-l8og963d | 20GiB

Linux环境:Centos 7.9

Python:3.9.12

Elasticsearch 服务端环境

版本

ES 版本:8.13.3(腾讯云 Elasticsearch Service(ES) 白金版)

1. 部署客户端环境

配置建议:

客户端配置一般要求不高,2核8G足够用于简单的功能测试。

客户端数量

CPU核数

内存

硬盘

镜像

1

2

8

增强型 SSD盘云盘 50G * 1

CentOS 7.9 64位

点击购买客户端机器

2. 创建 ES 集群

配置建议:

本文样本比较少,仅有200条数据样本,故选购配置较低。向量检索性能影响因素较多,生产环境请自行根据数据量以及业务需求进行POC性能测试,以匹配最佳配置。

版本

高级特性

节点类型

节点数量

CPU核数

内存

硬盘

8.13.3

白金版

数据节点

3

2

8

增强型SSD云硬盘 * 1

机器学习点

3

2

8

/

点击购买 ES 集群

版本这里我们选择 白金版,白金版有更多的 X-PACK 高级特性,并且可以不依赖自建推理机进行数据的 embedding:

提交集群构建之后,大概需要20分钟左右可以完成。

集群创建完成之后,为了方便测试,需要移步 ES实例 > 访问快照 > 可视化访问控制 > 公网访问策略,将白名单修改为 0.0.0.0/0

注意:此操作是为了方便测试,生产环境还需谨慎操作。

访问Kibana

白名单变更需要 2 分钟左右,完成之后点击 Kibana 域名进行访问:

3. 客户端准备工作

Python 环境部署

一键安装环境:

代码语言:javascript
复制
yum install conda -y; conda init; source ~/.bashrc; echo y | conda create -n es_vector python=3.9.12; conda activate es_vector

安装 Python 依赖包

代码语言:javascript
复制
pip install elasticsearch==8.13.1
pip install eland[pytorch]==8.13.1

这一步需要下载很多依赖,安装时间会比较久,需要耐心等待。

下载整合包

已将依赖模型及脚本打包成 整合包,可下载后上传至客户端服务器家目录:/root

解压整合包

已将整合包压缩成 了 ZSTD 格式,该格式的好处是压缩/解压缩性能极高,所以解压也需要使用 ZSTD 算法解压。

安装 zstd 命令:

代码语言:javascript
复制
yum install -y zstd tree

执行解压:

代码语言:javascript
复制
zstd -T0 -d tencent-es_vector.tzst && tar -xvf tencent-es_vector.tar && rm -rf tencent-es_vector.tar

一键复制命令进行:解压 -> 解档 -> 删除归档包 ,然后我们可以得到一个整合包目录:

一共1个目录,3个文件:

bge-base-zh:预训练 Embedding 中文推理模型(其他模型可在Huggingface下载)

goods.txt:商品文本数据

insert_vector.py:推理并写入向量脚本

vector_search.py:结合业务的一个demo

4. 上传模型

代码语言:javascript
复制
cd ~/tencent-es_vector

也可以在执行模型导入时,我们需要位于模型目录的外面,在导入模型时,--hub-model-id 指定的必须是相对目录,而不是绝对路径:

代码语言:javascript
复制
eland_import_hub_model --url http://10.0.xx.xx:9200  --hub-model-id bge-base-zh --es-username elastic --es-password '******' --task-type text_embedding

参数

解释

url

目标ES集群地址

hub-model-id

模型路径,会在官网,缓存,本地目录查找模型文件

es-username

目标ES集群用户名

es-password

目标ES集群密码

task-type

任务类型,比如文本向量化模型的上传就是text_embedding任务,通过eland_import_hub_model --help查看全部任务类型。

如图,模型导入成功

模型导入成功之后,还需要进行模型同步:

点击 synchronize your jobs and trained models

部署预训练模型

点击部署后,需要修改部署参数:

部署成功后可以在模型的 Stats 标签里看到资源分配情况:

模型到这里就部署完成了,下面我们开始准备数据。

5. Elasticsearch 准备工作

点击 Kibana 开发工具:

定义ES管道

管道参数解释:

处理器名称

处理器作用

参数

参数说明

set

设置字段值

field

要设置的字段名称

copy_from

要复制的源字段名称

inference

执行模型推理

model_id

要使用的模型ID

target_field

模型推理结果保存的目标字段

remove

删除字段

field_map

模型输入字段与文档字段的映射关系

field

要删除的字段名称

代码语言:javascript
复制
PUT _ingest/pipeline/bge-base-zh
{
  "processors": [
    {
      "set": {
        "field": "text_field", // 由于模型需要,增加固定字段 text_field 字段用于推理,不可修改
        "copy_from": "title"   // 需要进行推理的字段
      }
    },
    {
      "inference": {
        "model_id": "bge-base-zh",      // 模型id 
        "target_field": "title_vector", // 目标字段,用于存放推理后的向量
        "field_map": {
          "sentence": "text_field"      // 模型需要的固定字段,不可修改
        }
      }
    },
    {
      "remove": {
        "field": "text_field"           // 移除临时字段
      }
    }
  ]
}

管道创建成功之后,可以在模型的 Pipeline 标签看到关联情况:

定义模板

向量字段解释:

字段名

类型

描述

参数

参数说明

title_vector

dense_vector

密集向量

dims

向量维度,最高支持 2048 维度

index

true 表示对该字段进行索引

similarity

相似度计算方式

element_type

元素类型

index_options

type:向量检索算法m:每个节点的最大出度ef_construction:构建时的搜索深度

其他字段解释:

字段名

类型

描述

参数

参数说明

id

long

文档 ID

/

/

price

long

价格

/

/

title

text

标题

analyzersearch_analyzer

写入分词器检索分词器

specs

text

规格

colors

text

颜色

versions

text

版本

执行模板创建:

代码语言:javascript
复制
PUT _template/goods_vector
{
  "index_patterns": [
    "goods_vector*"
  ],
  "settings": {
    "number_of_shards": 3
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "price": {
        "type": "long"
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "specs": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "colors": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "versions": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "title_vector": {
        "properties": {
          "predicted_value": {
            "type": "dense_vector",
            "similarity": "cosine",
            "index": true,
            "dims": 768,
            "element_type": "float",
            "index_options": {
              "type": "hnsw",
              "m": 32,
              "ef_construction": 256
            }
          }
        }
      }
    }
  }
}

返回 "acknowledged": true 即提交成功。

创建索引

代码语言:javascript
复制
PUT goods_vector

6. 数据生成

这里我们使用ES直接进行预训练模型推理,并将数据写入到ES

激活 python 虚拟环境并安装必要环境:

代码语言:javascript
复制
conda activate es_vector
pip install tqdm

将python脚本保存成 insert_sentence.py,放在客户端家目录:

代码语言:javascript
复制
cd /root/tencent-es_vector/
vim insert_vector.py

修改配置信息:es_password es_host

代码语言:javascript
复制
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError
from datetime import datetime
import json
from tqdm import tqdm

es_username = 'elastic'
es_password = '******' # 修改ES密码
es_host = '10.0.xx.xx'     # 修改ES HOST
es_port = 9200

es = Elasticsearch(
    hosts=[{'host': es_host, 'port': es_port, 'scheme': 'http'}],
    basic_auth=(es_username, es_password),
)

# 读取文本
file_path = 'goods.txt'
index_name = 'goods_vector'

def parse_date(date_str):
    return datetime.strptime(date_str, "%Y 年 %m 月").isoformat()

def read_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            data = json.loads(line.strip())
            specs = data["规格参数"]
            product_id = data["商品ID"]
            brand = specs["主体"]["品牌"]
            title = data["标题"]
            price = data["商品价格"]
            launch_date = specs["主体"]["上市时间"]
            colors = data["商品颜色"]
            versions = data["商品版本"]
            product_type = data["类型"]

            yield {
                '_index': index_name,
                '_id': product_id,
                '_source': {
                    'id': product_id,
                    'brand': brand,
                    'title': title,
                    'price': price,
                    'specs': str(specs),
                    'launch_date': parse_date(launch_date),
                    'colors': colors,
                    'versions': versions,
                    'type': product_type
                }
            }


# 执行批量插入
def bulk_insert(file_path, chunk_size=4):
    total = sum(1 for _ in open(file_path))
    data = tqdm(read_data(file_path), total=total, desc="Indexing documents")
    try:
        # 指定推理管道进行写入
        success, _ = bulk(es, data, chunk_size=chunk_size, stats_only=True, pipeline='bge-base-zh')
        print(f"\nSuccessfully indexed {success} documents.")
    except BulkIndexError as e:
        print(f"{len(e.errors)} document(s) failed to index.")
        for error in e.errors:
            print("Error details:", error)

bulk_insert(file_path)

执行向量化导入:

代码语言:javascript
复制
cd /root/tencent-es_vector/
python insert_sentence.py

数据导入过程中,可以在预训练模型界面看到推理吞吐:

导入完成后可以在 kibana 中检索到数据:

代码语言:javascript
复制
// 查看一条数据
GET goods_vector/_search
{
  "size": 1,
  "_source": {
    "excludes": "title_vector"
  }
}

// 统计条数
GET goods_vector/_count

7. Segment合并优化

优化segment可以提升查询吞吐,一定程度上降低IO和CPU开销。

在客户端服务器上一键安装段合并工具:

代码语言:javascript
复制
rpm -vih https://tools-release-1253240642.cos.ap-shanghai.myqcloud.com/elasticsearch/packages/es-merge-segment-2.0-1.el7.x86_64.rpm

执行段合并:

代码语言:javascript
复制
password='******' es-merge-segment --ip 127.0.0.1 --port 9200 --pattern 'goods_*'

更多说明详见:ES_MergeSegment 工具使用指导

8. 向量化检索

纯向量检索:

代码语言:javascript
复制
GET goods_vector/_search //语义检索
{
  "knn": {
    "field": "title_vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh",
        "model_text": "小米 12 pro max"
      }
    },
    "k": 2,
    "num_candidates": 100
  },
  "_source": "title"
}

混合检索:

代码语言:javascript
复制
GET goods_vector/_search //混合检索
{
  "knn": {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": "小米 12 pro max"
          }
        },
        "k": 40,
        "num_candidates": 200,
        "filter": {
            "bool": {
                "must": [
                    {
                        "bool": {
                            "should": [
                                {
                                    "match": {
                                        "specs": {
                                            "query": "小米"
                                        }
                                    }
                                },
                                {
                                    "match": {
                                        "specs": {
                                            "query": "XIAOMI"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "boost": 0.6
      },
  "_source": "title"
}

混合检索

代码语言:javascript
复制
// 混合多路检索
GET dpcq_verctor_bbz768/_search
{
  "size": 2, 
  "query": {
    "match": {
      "text_field": {
        "query": "你这坏人",
        "boost": 0.1
      }
    }
  },
  "knn": {
    "field": "vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh-v1.5",
        "model_text": "你这坏人"
      }
    },
    "k": 2,
    "num_candidates": 500,
    "boost": 0.9
  },
  "_source": "text_field"
}

// 纯向量检索
GET dpcq_verctor_bbz768/_search
{
  "knn": {
    "field": "vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh-v1.5",
        "model_text": "你这坏人"
      }
    },
    "k": 2,
    "num_candidates": 500
  },
  "_source": "text_field"
}

9. 检索效果对比

所有准备工作就绪,下面将演示向量检索,我们分别用向量检索和分词检索测试两者的检索效果:

代码语言:javascript
复制
cd /root/tencent-es_vector/
vim vector_search.py

修改配置信息:password host

代码语言:javascript
复制
import torch, streamlit as st
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from collections import OrderedDict
import json
from datetime import datetime
from pypinyin import lazy_pinyin

username = 'elastic'
password = '******' # 修改为ES密码
host = 'http://10.0.xx.xx:9200' # 修改为ES URL
index = 'goods_vector'

def parse_date(date_str):
    date = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
    return date.strftime("%Y年%m月")

# 使用k-NN搜索
def knn_search(es, index_name, text):
    knn = [
      {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": text
          }
        },
        "k": 10,
        "num_candidates": 100
      }
    ]
    resp = es.search(
        index=index_name,
        knn=knn,
        source={"excludes": "title_vector"})
    return resp

# 使用混合搜索
def mix_search(es, index_name, knn):
    resp = es.search(
        index=index_name,
        knn=knn,
        size=10,
        source={"excludes": "title_vector"})
    return resp

# 使用混合搜索聚合
def mix_aggs_search(es, index_name, knn):
    aggs = {
        "count_group_by": {
            "terms": {
                "field": "brand"
            }
        }
    }

    resp = es.search(
        index=index_name,
        knn=knn,
        aggs=aggs,
        size=0,
        source={"excludes": "title_vector"})
    return resp

# 创建界面
st.set_page_config(layout="wide")
st.markdown("<h1 style='text-align:center;'>腾讯云 Elaticsearch 8 向量检索</h1>", unsafe_allow_html=True)

with st.form("chat_form"):
    query = st.text_input("请输入文本:")
    submit_button = st.form_submit_button("查询")

# 连接ES
es = Elasticsearch(hosts=[host],
                   basic_auth=(username, password))

def find_keyword_in_text(text, keywords):
    for keyword in keywords:
        if keyword in text:
            return keyword
    return ""

# 品牌标签
keywords = [
    "华为", "苹果", "小米", "OPPO", "vivo", "三星", "一加", "诺基亚", "Realme", "荣耀",
    "谷歌", "锤子科技", "HUAWEI", "LG", "ONEPLUS", "SAMSUNG", "VIVO", "XIAOMI",
    "MOTOROLA", "NOKIA", "Redmi", "SONY", "魅族"
]

# 当点击查询按钮时
if submit_button:
    found_keyword = find_keyword_in_text(query, keywords)
    query_pinyin = ''.join(lazy_pinyin(found_keyword))

    knn = [
    {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": query
          }
        },
        "k": 40,
        "num_candidates": 200,
        "filter": {
            "bool": {
                "must": [
                    {
                        "bool": {
                            "should": [
                                {
                                    "match": {
                                        "specs": {
                                            "query": found_keyword
                                        }
                                    }
                                },
                                {
                                    "match": {
                                        "specs": {
                                            "query": query_pinyin
                                        }
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "boost": 0.6
      }
    ]

    # 调用knn检索
    knn_resp = knn_search(es, index, query)

    # 调用混合检索
    mix_resp = mix_search(es, index, knn)

    # 调用混合聚合检索
    mix_aggs_resp = mix_aggs_search(es, index, knn)


    # 创建三列
    col1, col2, col3 = st.columns(3)

    counter = 1
    with col1:
        st.write("### 向量检索结果")
        for hit in knn_resp['hits']['hits']:
            fields = hit['_source']
            ordered_fields = OrderedDict()
            ordered_fields['title'] = fields['title']
            ordered_fields['price'] = fields['price']
            ordered_fields['launch_date'] = parse_date(fields['launch_date'])
            ordered_fields['type'] = fields['type']
            ordered_fields['versions'] = fields['versions']
            ordered_fields['specs'] = fields['specs']

            json_output = json.dumps(ordered_fields, ensure_ascii=False)

            with st.container():
                st.text(f"{counter}. {json_output}")
            counter += 1

    counter = 1
    with col2:
        st.write("### 混合检索结果")
        for hit in mix_resp['hits']['hits']:
            fields = hit['_source']
            ordered_fields = OrderedDict()
            ordered_fields['title'] = fields['title']
            ordered_fields['price'] = fields['price']
            ordered_fields['launch_date'] = parse_date(fields['launch_date'])
            ordered_fields['type'] = fields['type']
            ordered_fields['versions'] = fields['versions']
            ordered_fields['specs'] = fields['specs']

            json_output = json.dumps(ordered_fields, ensure_ascii=False)

            with st.container():
                st.text(f"{counter}. {json_output}")
            counter += 1

    counter = 1
    with col3:
        st.write("### 混合检索聚合结果")
        for bucket in mix_aggs_resp['aggregations']['count_group_by']['buckets']:
            key = bucket['key']
            doc_count = bucket['doc_count']
            with st.container():
                st.text(f"{counter}. {key}: {doc_count} 条")
            counter += 1

激活 python 虚拟环境并安装必要环境:

代码语言:javascript
复制
conda activate es_vector
pip install pypinyin streamlit

启动 streamlit 页面服务:

代码语言:javascript
复制
cd /root/tencent-es_vector/
streamlit run vector_search.py

访问返回的公网地址,进行向量测试。

检索效果测试

我们模拟用户在商城搜索栏输入一个手机型号:小米 12 pro max

● 向量检索结果可能会召回不相关的内容

● 而使用 ES 的混合检索,利用前置过滤,在提高效率的同时,可以大幅提升召回率

● ES 也支持在在混合检索场景使用聚合查询

10. 总结

从检索效果可以直观看出,使用纯向量检索,往往是达不到业务需求的。如果想提升召回率,则需要配合混合检索,不仅可以提前过滤一些不相关的内容,对性能也有一定提升。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 说明
  • 声明
  • 环境配置
    • 客户端环境
      • Elasticsearch 服务端环境
      • 1. 部署客户端环境
      • 2. 创建 ES 集群
        • 访问Kibana
        • 3. 客户端准备工作
          • Python 环境部署
            • 安装 Python 依赖包
              • 下载整合包
                • 解压整合包
                • 4. 上传模型
                • 5. Elasticsearch 准备工作
                  • 定义ES管道
                    • 定义模板
                      • 创建索引
                        • 检索效果测试
                    • 6. 数据生成
                    • 7. Segment合并优化
                    • 8. 向量化检索
                    • 9. 检索效果对比
                    • 10. 总结
                    相关产品与服务
                    Elasticsearch Service
                    腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档