前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实现基于内部文档的ChatBot

实现基于内部文档的ChatBot

原创
作者头像
flavorfan
发布2023-07-21 16:11:06
8411
发布2023-07-21 16:11:06
举报
文章被收录于专栏:范传康的专栏范传康的专栏

大群口嗨一时爽,不得不为公司HR做了一个基于内部文档的ChatBot。大概花了2周的个人业余时间,算起来有2个工作日。Open AI ChatGPT Key缺乏、网络延迟以及Open LLM性能不佳的问题,索性不使用LLM进行搜索关联文档后输出优化。

1. 方案概览

1) 系统架构

图1:系统架构图
图1:系统架构图

系统方案如上图所示。数据流程分为两个部分:

  1. 构建向量化知识库
  2. query查询匹配

2)运行界面

图2: CallMicky
图2: CallMicky

2)采用技术

  • Vector storage:Redis/RediSearch
  • Document Loader: LangChain
  • Embedding model: moka-ai/m3e-base (hugging face)
  • Embedding Cache: Sqlite3 wrap class
  • History DB: sqlite3
  • framework/ui: streamlit

2. 构建向量化知识库

1)用于构建知识库的一般流程

  1. 文档导入:需要支持多种异构文档,HR工作中积累的QA数据集(Excel表格),pdf格式的员工手册及其他内部规章制度文档;
  2. 分块(chunk)预处理:过滤、分割成合适大小的文本块、保留源信息(metadata:章节、chunk序号);
  3. embedding: 文本块通过embedding模型得到一个多维的向量,embedding模型一般考虑openai的ada模型或者其他开源模型,这里采用支持中英文的moka-ai/m3e模型;
  4. 存入向量数据库:包括向量数据库schema的构建,不同source来源分别存储(命名空间/单独表格),多个vector field支持不同的搜索匹配。

2)文档导入

excel文档导入十分简单:

代码语言:javascript
复制
import pandas as pd

df = pd.read_excel(example_qa_data, sheet_name='Sheet1')

pdf文档导入使用langchain的PyPDFLoader

代码语言:javascript
复制
from langchain.document_loaders import PyPDFLoader

loader = PyPDFLoader(data_file)
pages = loader.load()

3)分块(chunk)预处理

Excel文档基本就是格式化的内容,基于就是column的处理,不再赘述。

pdf的文档经过PyPDFLoader后是只是TextString并且含有无用信息(如页眉,页尾),缺乏结构化信息。

一般做法是采用Langchain text_splitter RecursiveCharacterTextSplitter来处理,

代码语言:javascript
复制
r_splitter = RecursiveCharacterTextSplitter(
    chunk_size=450,
    chunk_overlap=50, 
    separators=["\n\n", "\n", " ", ""]
)

因为后面接chatgpt LLM对文档做过滤和总结和重构输出,具有鲁棒性所以缺乏结构化信息问题不大。我不打算采用LLM做后端输出(没有api key),所以需要匹配中的内容就是直接展示给用户的信息,需要chunk文本块具有内聚的完整性,所以对pdf的文档需要提取结构化信息(章节)。

过滤页眉/页首

页眉页首位置固定,取到每个页Page信息,丢弃掉前面或者后面几行就可以

代码语言:javascript
复制
pages[4].page_content.split("\n")[9:]

分块、提取章节信息

代码语言:javascript
复制
//###############check row type#####################
def is_contain_chinese(check_str):
    cn_min = u'\u4e00'
    cn_max = u'\u9fff'
    for ch in check_str:
        if cn_min <= ch <= cn_max:
            return True
    return False
    
def is_title(text):
    title_regex = r'^\d+\.[\d+]?[\.\d+]*\s+.*[\u4e00-\u9fa5]+\s*$'
    return re.match(title_regex, text) is not None

def is_empty_line(text):        
    return text.isspace()

def get_title_from_line(line):
    title_reg = r'^(\d+\.[\d+]?[\.\d+]*)\s([/&A-Za-z\s’,-]+)[\s]?([\u4e00-\u9fa5、/\s&]+)\s*$'
    result = re.search(title_reg, line)
    if result is None:
        return None
    title_idx, title_en, title_cn = result.groups()
    title_lvl = len(title_idx[:-1].split(".")) - 1
    return result.group(1), result.group(2), result.group(3), title_lvl

def get_chaper_strunct_content_from_file(pdf_file,page_range):
    pages = get_pages_from_file(pdf_file,page_range)
    lines = get_lines_from_pages(pages)

    df = pd.DataFrame(lines, columns=["text"])
    df['is_title'] = df['text'].apply(is_title)
    df['is_empty_line'] = df['text'].apply(is_empty_line)
    df['contain_chinese'] = df['text'].apply(is_contain_chinese)

    chapter_sessions=[]
    cur_session = {}
    passage_lines = []
    cur_lan = None

    for i, row in df.iterrows():
        if row['is_title']:
            result = get_title_from_line(row['text'])
            if result is None:
                print(row['text'])
                continue
            title_idx, title_en, title_cn, title_lvl = result 

            if title_lvl >= 1:
                if len(cur_session.keys()) > 0:
                    if len(passage_lines) > 0:
                        if cur_lan == 'cn':
                            cur_session['sessions_cn'].append(" ".join(passage_lines))
                        else:
                            cur_session['sessions_en'].append(" ".join(passage_lines))

                    cur_session['text_en_full'] = "\n".join(cur_session['sessions_en'])
                    cur_session['text_cn_full'] = "\n".join(cur_session['sessions_cn'])
                    chapter_sessions.append(cur_session)

                    cur_session = {}    
                    passage_lines = []        
                cur_session['title_en'] = title_en
                cur_session['title_cn'] = title_cn
                cur_session['title_idx'] = title_idx
                cur_session['title_lvl'] = title_lvl

                cur_session['sessions_cn'] = []  
                cur_session['sessions_en'] = []  
                cur_session['text_en_full'] = ""
                cur_session['text_cn_full'] = ""
        
        elif row['is_empty_line']:
            continue

        elif row['contain_chinese']:
            if cur_lan == 'en' or cur_lan is None:
                if len(passage_lines) > 0:
                    if 'sessions_en' not in cur_session.keys():
                        cur_session['sessions_en'] = []
                    cur_session['sessions_en'].append(" ".join(passage_lines))
                    passage_lines = []
                cur_lan = 'cn'
            passage_lines.append(row['text'])
        else:
            if cur_lan == 'cn' or cur_lan is None:
                if len(passage_lines) > 0:
                    if 'sessions_cn' not in cur_session.keys():
                        cur_session['sessions_cn'] = []
                    cur_session['sessions_cn'].append(" ".join(passage_lines))
                    passage_lines = []
                cur_lan = 'en'        
            passage_lines.append(row['text'])


    if len(passage_lines) > 0:
        if cur_lan == 'cn':
            if 'sessions_cn' not in cur_session.keys():
                cur_session['sessions_cn'] = []
            cur_session['sessions_cn'].append(" ".join(passage_lines))
        else:
            if 'sessions_en' not in cur_session.keys():
                cur_session['sessions_en'] = []
            cur_session['sessions_en'].append(" ".join(passage_lines))    

        cur_session['text_en_full'] = "\n".join(cur_session['sessions_en'])
        cur_session['text_cn_full'] = "\n".join(cur_session['sessions_cn'])   
        chapter_sessions.append(cur_session)
    
    return chapter_sessions

4)向量数据库构建

代码语言:javascript
复制
import pandas as pd
import numpy as np

import redis
from redis.commands.search.indexDefinition import (
    IndexDefinition,
    IndexType
)
from redis.commands.search.query import Query
from redis.commands.search.field import (
    TextField,
    VectorField
)
from typing import List, Iterator

def create_qa_index(redis_client: redis.Redis):
    VECTOR_DIM_ADA = 1536 # length of the vectors
    VECTOR_DIM_M3E = 768 # length of the vectors

    VECTOR_NUMBER = 1000                # initial number of vectors
    INDEX_NAME = "qa-ada-m3e"           # name of the search index
    PREFIX = "qa"                            # prefix for the document keys
    DISTANCE_METRIC = "COSINE"                # distance metric for the vectors (ex. COSINE, IP, L2)

    city = TextField("city")
    question = TextField("question")
    keywords = TextField("keywords")
    answer = TextField("answer")
    qa = TextField("qa")

    question_embedding_ada = VectorField("question_vector_ada",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_ADA,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )
    answer_embedding_ada = VectorField("answer_vector_ada",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_ADA,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )
    qa_embedding_ada = VectorField("qa_vector_ada",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_ADA,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )

    question_embedding_m3e = VectorField("question_vector_m3e",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_M3E,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )
    answer_embedding_m3e = VectorField("answer_vector_m3e",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_M3E,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )
    qa_embedding_m3e = VectorField("qa_vector_m3e",
        "FLAT", {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIM_M3E,
            "DISTANCE_METRIC": DISTANCE_METRIC,
            "INITIAL_CAP": VECTOR_NUMBER,
        }
    )

    fields = [city,question,keywords,answer,qa,question_embedding_ada,answer_embedding_ada,qa_embedding_ada,question_embedding_m3e,answer_embedding_m3e,qa_embedding_m3e]

    try:
        redis_client.ft(INDEX_NAME).info()
        print("Index already exists")
    except:
        # Create RediSearch Index
        redis_client.ft(INDEX_NAME).create_index(
            fields = fields,
            definition = IndexDefinition(prefix=[PREFIX], index_type=IndexType.HASH)
    )

def index_qa_documents(client: redis.Redis, prefix: str, documents: pd.DataFrame):
    records = documents.to_dict("records")
    for doc in records:
        key = f"{prefix}:{str(doc['city'])}:{str(doc['question'])}"

        # create byte vectors for title and content
        question_embedding_ada = np.array(doc["question_vector_ada"], dtype=np.float32).tobytes()
        answer_embedding_ada = np.array(doc["answer_vector_ada"], dtype=np.float32).tobytes()
        qa_embedding_ada = np.array(doc["qa_vector_ada"], dtype=np.float32).tobytes()

        # replace list of floats with byte vectors
        doc["question_vector_ada"] = question_embedding_ada
        doc["answer_vector_ada"] = answer_embedding_ada
        doc["qa_vector_ada"] = qa_embedding_ada

        # create byte vectors for title and content
        question_embedding_m3e = np.array(doc["question_vector_m3e"], dtype=np.float32).tobytes()
        answer_embedding_m3e = np.array(doc["answer_vector_m3e"], dtype=np.float32).tobytes()
        qa_embedding_m3e = np.array(doc["qa_vector_m3e"], dtype=np.float32).tobytes()

        # replace list of floats with byte vectors
        doc["question_vector_m3e"] = question_embedding_m3e
        doc["answer_vector_m3e"] = answer_embedding_m3e
        doc["qa_vector_m3e"] = qa_embedding_m3e

        client.hset(key, mapping = doc)
        
def create_hb_index(redis_client: redis.Redis):
    ...

def index_hb_documents(client: redis.Redis, prefix: str, documents: pd.DataFrame):
    ...

3. Query查询匹配

1) 基础redis search 功能

代码语言:javascript
复制
def search_redis(
    redis_client: redis.Redis,
    user_query: str,
    index_name: str = "qa-ada-m3e",
    vector_field: str = "question_vector_m3e",
    return_fields: list = ['city','question','keywords','answer','qa',"vector_score"],
    hybrid_fields = "*",
    k: int = 3,
    embedding_model='m3e',
    print_results: bool = True
) -> List[dict]:
       
    embedded_query = get_query_embeddings(user_query,embedding_model)

    # Prepare the Query
    base_query = f'{hybrid_fields}=>[KNN {k} @{vector_field} $vector AS vector_score]'
    query = (
        Query(base_query)
         .return_fields(*return_fields)
         .sort_by("vector_score")
         .paging(0, k)
         .dialect(2)
    )
    params_dict = {"vector": np.array(embedded_query).astype(dtype=np.float32).tobytes()}

    # perform vector search
    results = redis_client.ft(index_name).search(query, params_dict)
    if print_results:
        for i, article in enumerate(results.docs):
            score = 1 - float(article.vector_score)
            print(f"{i}.  {article[return_fields[0]]} : {article[return_fields[1]]} (Score: {round(score ,3) })")
    return results.docs

2) Mult-shot search

代码语言:javascript
复制
def do_all_search(query:str,city:str):
    qa_rlt_ls = []
    qa_search_cnt = len(redis_search_params['qa']['vector_fields'] )
    for i in range(qa_search_cnt):
        qa_rlt = search_redis(redis_client,
                            query,
                            index_name=redis_search_params['qa']['index_name'],
                            vector_field=redis_search_params['qa']['vector_fields'][i],
                            return_fields=redis_search_params['qa']['return_fields'],
                            hybrid_fields=create_hybrid_field('city',city),
                            k=2
                            )
        qa_rlt_ls.append(qa_rlt)
    
    hb_rlt_ls = []
    hb_search_cnt = len(redis_search_params['hb']['vector_fields'] )
    for i in range(hb_search_cnt):
        hb_rlt = search_redis(redis_client,
                            query,
                            index_name=redis_search_params['hb']['index_name'],
                            vector_field=redis_search_params['hb']['vector_fields'][i],
                            return_fields=redis_search_params['hb']['return_fields'],
                            k=2
                            )
        hb_rlt_ls.append(hb_rlt)
    
    df_ls = []
    for i in range(len(qa_rlt_ls)):
        df_qa = pd.DataFrame([it.__dict__ for it in qa_rlt_ls[i]])
        df_qa['score'] = df_qa['vector_score'].apply(lambda x: 1 - float(x))
        df_qa['search_by'] = 'question' if i == 0 else 'answer'
        df_qa.rename(columns={'question':'question_title','answer':'answer_text'},inplace=True)
        df_ls.append(df_qa[['score','id','answer_text','question_title','search_by']])
    
    for i in range(len(hb_rlt_ls)):
        df_hb = pd.DataFrame([it.__dict__ for it in hb_rlt_ls[i]])
        df_hb['score'] = df_hb['vector_score'].apply(lambda x: 1 - float(x))
        df_hb['search_by'] = 'title' if i == 0 else 'text'
        df_hb.rename(columns={'title_cn':'question_title','text_cn_full':'answer_text'},inplace=True)
        df_ls.append(df_hb[['score','id','answer_text','question_title','search_by']])
    
    df = pd.concat(df_ls).sort_values(by='score',ascending=False).reset_index(drop=True)
    return df

4. 其他

实现Query的history Database,另外有评估反馈的按钮,就可以:收集群众大多数没有满意的问题,人工回答并反馈,并且构建填充QA数据集。另外后期批量文档数据导入,不用人工对这些文档做精细处理,有chatgpt的话,可以提取满意的回答固化到qa数据集中去。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 方案概览
    • 1) 系统架构
      • 2)运行界面
        • 2)采用技术
        • 2. 构建向量化知识库
          • 1)用于构建知识库的一般流程
            • 2)文档导入
              • 3)分块(chunk)预处理
                • 过滤页眉/页首
                • 分块、提取章节信息
              • 4)向量数据库构建
              • 3. Query查询匹配
                • 1) 基础redis search 功能
                  • 2) Mult-shot search
                  • 4. 其他
                  相关产品与服务
                  云数据库 Redis
                  腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档