
如今数字经济飞速发展,数据已成为企业的核心资产。然而,大多数企业在数据利用方面面临着一个突出的矛盾:数据量呈指数级增长,但专业数据分析师资源却严重匮乏。业务人员往往需要等待数天甚至数周才能获得一个简单的数据查询结果,严重影响了决策效率和市场响应速度。
传统数据查询流程的痛点:
早期的解决方案包括可视化BI工具和SQL查询界面优化,但这些方案仍需要用户具备一定的数据知识和技能。真正的突破来自于自然语言处理技术,特别是大型语言模型的发展,使Text2SQL技术从实验室走向实用阶段。
Text2SQL技术通过将自然语言转换为结构化查询语言(SQL),彻底改变了数据访问方式。这项技术不仅能够理解用户意图,还能生成符合语法规范、可高效执行的数据库查询,让业务人员直接用日常语言与数据库"对话"。实现Text2SQL技术后,企业能够建立真正的自助式数据分析平台,将数据团队从重复劳动中解放出来,同时赋予业务人员直接探索数据的能力,大幅提升数据驱动决策的效率和广度。

Text2SQL是自然语言处理(NLP)与数据库管理系统的交叉领域技术,其主要任务是将用户输入的自然语言问题转换为符合语法规范、可正确执行的SQL查询语句。这个过程涉及多个子任务:
2.1 语义理解
自然语言中存在大量歧义和模糊表达,如"显示最新数据"中的"最新"可能指时间最新或ID最大。
解决方案:
2.2 模式对齐
将自然语言中的提及正确关联到数据库表结构,如用户说"客户数量"需要映射到customer表的id列。
解决方案:
2.3 SQL语法检测
确保生成的SQL符合目标数据库的语法规范,特别是复杂查询涉及多表连接、子查询、聚合函数等。
解决方案:
2.4 查询效率优化
避免生成性能低下的SQL,如缺少索引的全表扫描、不必要的嵌套查询等。
解决方案:
一个完整的Text2SQL系统包含以下核心组件:
1. 自然语言理解模块
2. schema管理模块
class SchemaManager:
def __init__(self, db_config):
self.db_config = db_config
self.connection = None
self.schema_metadata = None
def fetch_metadata(self):
# 获取数据库表结构信息
pass
def build_vector_index(self):
# 将表结构向量化用于快速检索
passSchema管理模块是Text2SQL系统的"大脑"和"词典",承担着将数据库物理结构转化为语言模型可理解的知识体系的关键任务。
元数据采集与解析:
知识表示与向量化:
语义检索与上下文构建:
版本管理与变更同步:
Schema管理模块是Text2SQL系统中最具技术深度和业务价值的组件之一。它不仅仅是一个被动的元数据存储库,而是一个主动的知识管理和语义理解系统。优秀的Schema管理能够显著提升Text2SQL系统的准确性和用户体验,真正实现从"技术语言"到"业务语言"的平滑转换。
3. SQL生成模块
4. 执行与返回模块
5. 安全与权限控制
企业级应用必须考虑安全性:
OpenAI GPT系列:
国内大模型:
Qwen系列:
其他选择:
选择模型时需要考虑多个因素:
Function Calling 是大语言模型与外部系统交互的一种标准化方式。在 Text2SQL 场景中,它允许我们将复杂的 SQL 生成任务分解为两个清晰的步骤:
优势与特点:
这种方法比直接让大模型生成 SQL 更加安全、可控和可靠。
下面是对上述代码的详细解释,涵盖了每个部分的功能和作用:
import dashscope
from dashscope import Generation
import json
import os # 设置通义千问API密钥
dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')# 定义可用的函数
functions = [
{
"name": "execute_sql_query",
"description": "执行SQL查询并返回结果",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "要执行的SQL查询语句"
}
},
"required": ["sql"]
}
}
]# 模拟的数据库表结构信息
database_schema = """
数据库中有以下表:
1. 用户表(users): 包含id, name, email, city字段
2. 订单表(orders): 包含id, user_id, product, amount, order_date字段
3. 产品表(products): 包含id, name, price, category字段
"""def text_to_sql(user_query):
"""将自然语言转换为SQL并执行"""
try:
# 调用通义千问Qwen-Plus模型,支持Function Calling
response = Generation.call(
model='qwen-plus',
messages=[
{
"role": "system",
"content": f"你是一个SQL专家。请根据以下数据库结构生成SQL查询:\n{database_schema}"
},
{
"role": "user",
"content": user_query
}
],
functions=functions,
result_format='message'
) # 解析模型响应
message = response.output.choices[0].message
# 检查是否要求调用函数
if hasattr(message, 'function_call') and message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
# 执行相应的函数
if function_name == "execute_sql_query":
return execute_sql_query(function_args["sql"]) else:
# 如果没有函数调用,尝试直接提取SQL
sql = extract_sql_from_text(message.content)
if sql:
return execute_sql_query(sql, "从模型响应中提取SQL")
else:
return "未能生成合适的SQL查询"
except Exception as e:
return f"API调用失败: {str(e)}"def extract_sql_from_text(text):
"""从文本中提取SQL语句"""
import re
# 查找SELECT语句
select_pattern = r"SELECT\s+.*?FROM\s+.*?(?:WHERE\s+.*?)?(?:ORDER BY\s+.*?)?(?:LIMIT\s+.*?)?;?"
matches = re.findall(select_pattern, text, re.IGNORECASE | re.DOTALL)
if matches:
return matches[0].strip()
# 查找其他类型的SQL语句
other_patterns = [
r"INSERT INTO\s+.*?VALUES\s*\(.*?\);?",
r"UPDATE\s+.*?SET\s+.*?WHERE\s+.*?;?",
r"DELETE FROM\s+.*?WHERE\s+.*?;?"
]
for pattern in other_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
if matches:
return matches[0].strip()
return Nonedef execute_sql_query(sql, thought=""):
"""执行SQL查询的函数"""
# 在实际应用中,这里会连接真实数据库执行查询
# 这里只是返回模拟结果
print(f"思考过程: {thought}")
print(f"执行的SQL: {sql}")
# 模拟不同查询的结果
sql_lower = sql.lower()
if "select name, city from users" in sql_lower:
return [
{"name": "张三", "city": "北京"},
{"name": "李四", "city": "上海"},
{"name": "王五", "city": "广州"}
]
# ... 其他条件判断# 测试示例
if __name__ == "__main__":
# 示例查询
queries = [
"显示所有用户的姓名和所在城市",
"查询订单中的产品和数量",
"删除所有测试用户",
"显示所有产品名称和价格",
"查询北京的用户"
]
for query in queries:
print(f"\n{'='*50}")
print(f"用户查询: {query}")
result = text_to_sql(query)
print(f"查询结果: {result}")
print(f"{'='*50}") ==================================================
用户查询: 显示所有用户的姓名和所在城市
思考过程: {"role": "assistant", "content": "", "function_call": {"name": "execute_sql_query", "arguments": "{\"sql\": \"SELECT name, city FROM users;\"}"}}
执行的SQL: SELECT name, city FROM users;
查询结果: [{'name': '张三', 'city': '北京'}, {'name': '李四', 'city': '上海'}, {'name': '王五', 'city': '广州'}]
==================================================
==================================================
用户查询: 查询订单中的产品和数量
思考过程: {"role": "assistant", "content": "", "function_call": {"name": "execute_sql_query", "arguments": "{\"sql\": \"SELECT product, amount FROM orders;\"}"}}
执行的SQL: SELECT product, amount FROM orders;
查询结果: [{'product': '笔记本电脑', 'amount': 2}, {'product': '智能手机', 'amount': 5}, {'product': '平板电脑', 'amount': 3}]
==================================================
==================================================
用户查询: 删除所有测试用户
查询结果: API调用失败: 'function_call'
==================================================
==================================================
用户查询: 显示所有产品名称和价格
思考过程: {"role": "assistant", "content": "", "function_call": {"name": "execute_sql_query", "arguments": "{\"sql\": \"SELECT name, price FROM products;\"}"}}
执行的SQL: SELECT name, price FROM products;
查询结果: [{'name': '笔记本电脑', 'price': 5999.0}, {'name': '智能手机', 'price': 3999.0}, {'name': '平板电脑', 'price': 2999.0}]
==================================================
==================================================
用户查询: 查询北京的用户
思考过程: {"role": "assistant", "content": "", "function_call": {"name": "execute_sql_query", "arguments": "{\"sql\": \"SELECT * FROM users WHERE city = '北
京';\"}"}}
执行的SQL: SELECT * FROM users WHERE city = '北京';
查询结果: [{'id': 1, 'name': '张三', 'email': 'zhangsan@example.com', 'city': '北京'}, {'id': 2, 'name': '李四', 'email': 'lisi@example.com', 'city': '上海'}, {'id': 3, 'name': '王五', 'email': 'wangwu@example.com', 'city': '广州'}]
==================================================import os
import json
import pymysql
import dashscope
from dashscope import Generation
# 设置通义千问API密钥
dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')# 数据库配置
db_config = {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database",
"charset": "utf8mb4"
}
# 初始化Text2SQL系统
text2sql = SimpleText2SQL(db_config)def get_database_schema(self):
"""获取数据库表结构信息"""
try:
connection = pymysql.connect(**self.db_config)
schema = {}
with connection.cursor() as cursor:
# 获取所有表名
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
schema[table_name] = {"columns": []}
# 获取表的列信息
cursor.execute(f"DESCRIBE {table_name}")
columns = cursor.fetchall()def generate_sql(self, user_query):
"""使用Qwen生成SQL查询"""
if not self.schema_info:
self.get_database_schema()
# 构建schema描述
schema_desc = "数据库表结构:\n"
for table_name, table_info in self.schema_info.items():
schema_desc += f"- {table_name}: "
schema_desc += ", ".join([f"{col['name']}({col['type']})" for col in table_info["columns"]])
schema_desc += "\n"
# 构建提示词
prompt = f"""你是一个SQL专家。请根据以下数据库结构,将用户的自然语言查询转换为MySQL SQL语句。
{schema_desc}
请只输出SQL语句,不要输出其他任何内容。
用户查询: {user_query}
SQL语句:"""# 调用通义千问API
response = Generation.call(
model='qwen-turbo', # 使用qwen-turbo以节省成本
messages=[{'role': 'user', 'content': prompt}],
temperature=0.1
)
# 提取SQL语句
sql = response.output.choices[0].message.content.strip()def execute_query(self, sql):
"""执行SQL查询并返回结果"""
# 安全检查:只允许SELECT查询
if not sql.lower().strip().startswith('select'):
return {"error": "只允许执行SELECT查询"}
try:
connection = pymysql.connect(**self.db_config)
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return results==================================================
用户查询: 显示所有用户
Qwen API响应: {"status_code": 200, "request_id": "05aeba62-9038-415d-be08-0b47a66e7463", "code": "", "message": "", "output": {"text": "SELECT * FROM users;", "finish_reason": "stop", "choices": null}, "usage": {"input_tokens": 497, "output_tokens": 5, "total_tokens": 502, "prompt_tokens_details": {"cached_tokens": 256}}}
生成的SQL: SELECT * FROM users;
查询结果 (显示前5行):
1. {'user_id': 1, 'username': 'zhangsan', 'email': 'zhangsan@example.com', 'password_hash': 'hashed_password_1', 'full_name': '张三', 'date_of_birth': datetime.date(1990, 5, 15), 'registration_date': datetime.datetime(2023, 1, 10, 9, 30), 'last_login': datetime.datetime(2023, 6, 1, 14, 25), 'status': 'active', 'created_at': datetime.datetime(2025, 9, 10, 11, 41, 23), 'updated_at': datetime.datetime(2025, 9, 10, 11, 41, 23)}
2. {'user_id': 2, 'username': 'lisi', 'email': 'lisi@example.com', 'password_hash': 'hashed_password_2', 'full_name': '李四', 'date_of_birth': datetime.date(1985, 8, 22), 'registration_date': datetime.datetime(2023, 2, 15, 11, 20), 'last_login': datetime.datetime(2023, 6, 2, 10, 15), 'status': 'active', 'created_at': datetime.datetime(2025, 9, 10, 11, 41, 23), 'updated_at': datetime.datetime(2025, 9, 10, 11, 41, 23)}
3. {'user_id': 3, 'username': 'wangwu', 'email': 'wangwu@example.com', 'password_hash': 'hashed_password_3', 'full_name': '王五', 'date_of_birth': datetime.date(1992, 12, 10), 'registration_date': datetime.datetime(2023, 3, 20, 14, 45), 'last_login': datetime.datetime(2023, 6, 1, 16, 30), 'status': 'active',
'created_at': datetime.datetime(2025, 9, 10, 11, 41, 23), 'updated_at': datetime.datetime(2025, 9, 10, 11, 41, 23)}
4. {'user_id': 4, 'username': 'zhaoliu', 'email': 'zhaoliu@example.com', 'password_hash': 'hashed_password_4', 'full_name': '赵六', 'date_of_birth': datetime.date(1988, 3, 25), 'registration_date': datetime.datetime(2023, 4, 5, 10, 10), 'last_login': datetime.datetime(2023, 5, 28, 9, 45), 'status': 'inactive', 'created_at': datetime.datetime(2025, 9, 10, 11, 41, 23), 'updated_at': datetime.datetime(2025, 9, 10, 11, 41, 23)}
5. {'user_id': 5, 'username': 'qianqi', 'email': 'qianqi@example.com', 'password_hash': 'hashed_password_5', 'full_name': '钱七', 'date_of_birth': datetime.date(1995, 7, 18), 'registration_date': datetime.datetime(2023, 5, 12, 16, 20), 'last_login': datetime.datetime(2023, 6, 3, 11, 30), 'status': 'active', 'created_at': datetime.datetime(2025, 9, 10, 11, 41, 23), 'updated_at': datetime.datetime(2025, 9, 10, 11, 41, 23)}
==================================================
==================================================
用户查询: 查询订单数量最多的前5个产品
Qwen API响应: {"status_code": 200, "request_id": "93bbb16a-41ca-4610-ab7a-95ddd483f2bd", "code": "", "message": "", "output": {"text": "SELECT p.product_id, p.product_name, COUNT(oi.order_item_id) AS order_count\nFROM products p\nJOIN order_items oi ON p.product_id = oi.product_id\nGROUP BY p.product_id, p.product_name\nORDER BY order_count DESC\nLIMIT 5;", "finish_reason": "stop", "choices": null}, "usage": {"input_tokens": 501, "output_tokens": 57, "total_tokens": 558, "prompt_tokens_details": {"cached_tokens": 0}}}
生成的SQL: SELECT p.product_id, p.product_name, COUNT(oi.order_item_id) AS order_count
FROM products p
JOIN order_items oi ON p.product_id = oi.product_id
GROUP BY p.product_id, p.product_name
ORDER BY order_count DESC
LIMIT 5;
查询结果 (显示前5行):
1. {'product_id': 6, 'product_name': '男士休闲衬衫', 'order_count': 2}
2. {'product_id': 1, 'product_name': 'iPhone 14 Pro', 'order_count': 1}
3. {'product_id': 2, 'product_name': '华为Mate 50', 'order_count': 1}
4. {'product_id': 3, 'product_name': '小米13', 'order_count': 1}
5. {'product_id': 7, 'product_name': '女士连衣裙', 'order_count': 1}
==================================================表数据参考:USERS (用户表)

products (产品表)


1. 系统初始化
2. 数据库结构获取
3. 查询处理
4. 执行与结果处理
5. 异常处理流程
Text2SQL 技术通过大语言模型的能力,打破了技术语言与业务需求之间的障碍,让数据查询变得更加直观和高效。这种技术不仅提升了数据使用的便利性,也让用户能够用日常语言与数据库交互,无需掌握专业的 SQL 语法。
随着大语言模型能力的不断提升,Schema管理模块的重要性将更加凸显。未来的发展方向包括更加智能的关系推理、基于实际查询模式的自适应优化、以及跨数据源的统一schema管理等。这个模块的成熟度将在很大程度上决定Text2SQL技术在企业环境中的落地效果和应用范围。
Text2SQL 集成 Function Calling 示例展示了如何将自然语言查询转换为数据库操作的核心原理。通过 Function Calling,我们可以让大模型专注于理解用户意图和生成结构化参数,而将实际的数据库操作交给应用程序控制,从而提高系统的安全性、可靠性和灵活性。
import dashscope
from dashscope import Generation
import json
import os
# 设置通义千问API密钥
dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')
# 定义可用的函数
functions = [
{
"name": "execute_sql_query",
"description": "执行SQL查询并返回结果",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "要执行的SQL查询语句"
}
},
"required": ["sql"]
}
}
]
# 模拟的数据库表结构信息
database_schema = """
数据库中有以下表:
1. 用户表(users): 包含id, name, email, city字段
2. 订单表(orders): 包含id, user_id, product, amount, order_date字段
3. 产品表(products): 包含id, name, price, category字段
"""
def text_to_sql(user_query):
"""将自然语言转换为SQL并执行"""
try:
# 调用通义千问Qwen-Plus模型,支持Function Calling
response = Generation.call(
model='qwen-plus',
messages=[
{
"role": "system",
"content": f"你是一个SQL专家。请根据以下数据库结构生成SQL查询:\n{database_schema}"
},
{
"role": "user",
"content": user_query
}
],
functions=functions,
result_format='message'
)
# 解析模型响应
message = response.output.choices[0].message
print(message)
# 检查是否要求调用函数
if hasattr(message, 'function_call') and message.function_call:
# 检查function_call是对象还是字典
if isinstance(message.function_call, dict):
function_name = message.function_call.get('name')
function_args = json.loads(message.function_call.get('arguments', '{}'))
else:
try:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
except AttributeError:
# 如果访问属性失败,尝试使用字典方式访问
function_name = message.function_call.get('name')
function_args = json.loads(message.function_call.get('arguments', '{}'))
# 执行相应的函数
if function_name == "execute_sql_query":
return execute_sql_query(function_args["sql"])
else:
# 如果没有函数调用,尝试直接提取SQL
sql = extract_sql_from_text(message.content)
if sql:
return execute_sql_query(sql, "从模型响应中提取SQL")
else:
return "未能生成合适的SQL查询"
except Exception as e:
return f"API调用失败: {str(e)}"
def extract_sql_from_text(text):
"""从文本中提取SQL语句"""
import re
# 查找SELECT语句
select_pattern = r"SELECT\s+.*?FROM\s+.*?(?:WHERE\s+.*?)?(?:ORDER BY\s+.*?)?(?:LIMIT\s+.*?)?;?"
matches = re.findall(select_pattern, text, re.IGNORECASE | re.DOTALL)
if matches:
return matches[0].strip()
# 查找其他类型的SQL语句
other_patterns = [
r"INSERT INTO\s+.*?VALUES\s*\(.*?\);?",
r"UPDATE\s+.*?SET\s+.*?WHERE\s+.*?;?",
r"DELETE FROM\s+.*?WHERE\s+.*?;?"
]
for pattern in other_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
if matches:
return matches[0].strip()
return None
def execute_sql_query(sql, thought=""):
"""执行SQL查询的函数"""
# 在实际应用中,这里会连接真实数据库执行查询
# 这里只是返回模拟结果
print(f"思考过程: {thought}")
print(f"执行的SQL: {sql}")
# 模拟不同查询的结果
sql_lower = sql.lower()
if "select name, city from users" in sql_lower:
return [
{"name": "张三", "city": "北京"},
{"name": "李四", "city": "上海"},
{"name": "王五", "city": "广州"}
]
elif "select product, amount from orders" in sql_lower:
return [
{"product": "笔记本电脑", "amount": 2},
{"product": "智能手机", "amount": 5},
{"product": "平板电脑", "amount": 3}
]
elif "select * from users" in sql_lower:
return [
{"id": 1, "name": "张三", "email": "zhangsan@example.com", "city": "北京"},
{"id": 2, "name": "李四", "email": "lisi@example.com", "city": "上海"},
{"id": 3, "name": "王五", "email": "wangwu@example.com", "city": "广州"}
]
elif "select name, price from products" in sql_lower:
return [
{"name": "笔记本电脑", "price": 5999.00},
{"name": "智能手机", "price": 3999.00},
{"name": "平板电脑", "price": 2999.00}
]
elif "delete" in sql_lower:
return [{"message": "删除操作已执行", "rows_affected": 5}]
elif "update" in sql_lower:
return [{"message": "更新操作已执行", "rows_affected": 3}]
elif "insert" in sql_lower:
return [{"message": "插入操作已执行", "rows_affected": 1}]
else:
return [{"message": "查询执行成功", "rows_affected": 10}]
# 测试示例
if __name__ == "__main__":
# 示例查询
queries = [
"显示所有用户的姓名和所在城市",
"查询订单中的产品和数量",
"删除所有测试用户",
"显示所有产品名称和价格",
"查询北京的用户"
]
for query in queries:
print(f"\n{'='*50}")
print(f"用户查询: {query}")
result = text_to_sql(query)
print(f"查询结果: {result}")
print(f"{'='*50}")import os
import json
import pymysql
import dashscope
from dashscope import Generation
# 设置通义千问API密钥
dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')
class SimpleText2SQL:
def __init__(self, db_config):
self.db_config = db_config
self.schema_info = None
def get_database_schema(self):
"""获取数据库表结构信息"""
try:
connection = pymysql.connect(**self.db_config)
schema = {}
with connection.cursor() as cursor:
# 获取所有表名
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
schema[table_name] = {"columns": []}
# 获取表的列信息
cursor.execute(f"DESCRIBE {table_name}")
columns = cursor.fetchall()
for col in columns:
schema[table_name]["columns"].append({
"name": col[0],
"type": col[1],
"nullable": col[2],
"key": col[3],
"default": col[4],
"extra": col[5]
})
self.schema_info = schema
return schema
except Exception as e:
print(f"获取数据库结构失败: {str(e)}")
return None
finally:
if connection:
connection.close()
def generate_sql(self, user_query):
"""使用Qwen生成SQL查询"""
if not self.schema_info:
self.get_database_schema()
# 构建schema描述
schema_desc = "数据库表结构:\n"
for table_name, table_info in self.schema_info.items():
schema_desc += f"- {table_name}: "
schema_desc += ", ".join([f"{col['name']}({col['type']})" for col in table_info["columns"]])
schema_desc += "\n"
# 构建提示词
prompt = f"""你是一个SQL专家。请根据以下数据库结构,将用户的自然语言查询转换为MySQL SQL语句。
{schema_desc}
请只输出SQL语句,不要输出其他任何内容。
用户查询: {user_query}
SQL语句:"""
try:
# 调用通义千问API
response = Generation.call(
model='qwen-turbo', # 使用qwen-turbo以节省成本
messages=[{'role': 'user', 'content': prompt}],
temperature=0.1
)
print(f"Qwen API响应: {response}")
# 检查API响应是否有效 or not response.output.choices
if not response or not hasattr(response, 'output') :
print(f"API返回无效响应: {response}")
return None
try:
# 提取SQL语句
# sql = response.output.choices[0].message.content.strip()
sql = response.output.text.strip()
# 清理SQL语句(移除可能的代码标记)
if sql.startswith('```sql'):
sql = sql[6:]
if sql.startswith('```'):
sql = sql[3:]
if sql.endswith('```'):
sql = sql[:-3]
except Exception as e:
print(f"解析API响应失败: {str(e)}")
return None
return sql.strip()
except Exception as e:
print(f"生成SQL失败: {str(e)}")
return None
def execute_query(self, sql):
"""执行SQL查询并返回结果"""
# 安全检查:只允许SELECT查询
if not sql.lower().strip().startswith('select'):
return {"error": "只允许执行SELECT查询"}
try:
connection = pymysql.connect(**self.db_config)
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return results
except Exception as e:
return {"error": f"执行查询失败: {str(e)}"}
finally:
if connection:
connection.close()
def query(self, user_query):
"""处理用户查询的完整流程"""
print(f"用户查询: {user_query}")
# 生成SQL
sql = self.generate_sql(user_query)
if not sql:
return {"error": "无法生成SQL语句"}
print(f"生成的SQL: {sql}")
# 执行查询
result = self.execute_query(sql)
return result
# 使用示例
if __name__ == "__main__":
# 数据库配置
db_config = {
"host": "localhost",
"user": "root",
"password": "Aa123456!",
"database": "ecommerce_db",
"charset": "utf8mb4"
}
# 初始化Text2SQL系统
text2sql = SimpleText2SQL(db_config)
# 示例查询
queries = [
"显示所有用户",
"查询订单数量最多的前5个产品",
"查找最近一个月注册的用户",
"统计每个城市的用户数量"
]
for query in queries:
print("\n" + "="*50)
result = text2sql.query(query)
if "error" in result:
print(f"错误: {result['error']}")
else:
print(f"查询结果 (显示前5行):")
for i, row in enumerate(result[:5]):
print(f" {i+1}. {row}")
print("="*50)原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。