openGauss 6.0.0 TLS企业版
python3 .9.13
psycopg2 2.9.10
# -*- coding:utf-8 -*-
import psycopg2
if __name__ == '__main__':
# 连接方式1
# connection_str = 'host=192.168.88.139 port=15400 dbname=testdb user=testacc password=test1234#'
# conn = psycopg2.connect(connection_str)
# 连接方式2
# 注意:
# host: 如果未提供,默认为UNIX socket
# port: 如果未提供,默认为5432
# database 仅用于关键词参数,不能用于连接参数字符串中
conn = psycopg2.connect(host='192.168.88.139', port=15400, database='testdb', user='testacc', password='test1234#')
cursor = conn.cursor()
cursor.execute('SELECT datname AS "Database", pg_catalog.pg_get_userbyid(datdba) AS "Owner" FROM pg_database')
res = cursor.fetchall()
print(res) # 输出形如 [('template1', 'omm'), ('template0', 'omm'), ('testdb', 'omm'), ('postgres', 'omm')]
# 删除表
cursor.execute('DROP TABLE IF EXISTS t_user')
#print(cursor.fetchall()) # 会报错:psycopg2.ProgrammingError: no results to fetch
create_tb_sql = '''CREATE TABLE IF NOT EXISTS t_user (
user_id INT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
password VARCHAR(60) NOT NULL,
email VARCHAR(100) UNIQUE
) WITH (
fillfactor = 85 -- 行存表填充因子(预留15%空间用于更新)
)'''
cursor.execute(create_tb_sql)
cursor.execute('commit') # 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库public模式下依然看不到数据表
# 插入数据--使用 %s 占位符
cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
(1, '赖某某', '123456', 'testemail1@163.com'))
print(cursor.rowcount) # 获取execute 产生记录数 输出:1
cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
[2, '王某某', '123456', 'testemail2@163.com'])
# 插入数据--使用 %(field_name)s 占位符
cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%(user_id)s, %(username)s, %(password)s, %(email)s)',
{'user_id':3, 'username': '肖某某', 'password': '123456', 'email':'testemail3@163.com'})
# 插入数据-插入多条
cursor.executemany('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
[(4, 'testacc1', '123456', 'testemail5@163.com'),
(5, 'testacc2', '123456', 'testemail6@163.com')])
# 最后提交
cursor.execute('commit') # 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库表中依然查不到对应数据
cursor.execute('SELECT * FROM testdb.public.t_user')
print(cursor.fetchall()) # 输出包含对应记录的list
# cursor.execute("SELECT (%s % 2) = 0 AS even", (10,)) # WRONG
cursor.execute("SELECT (%s %% 2) = 0 AS even", (10,)) # correct
print(cursor.fetchall()) # 输出:[(True,)]
cursor.execute('SELECT * FROM t_user WHERE user_id=1')
print(cursor.fetchone()) # 输出:(1, '赖某某', '123456', 'testemail1@163.com')
print(cursor.fetchone()) # 输出:None
cursor.execute('SELECT * FROM t_user')
print(cursor.rowcount) # 输出:5
# 只取部分记录
print(cursor.fetchmany(3))
# 输出:[(1, '赖某某', '123456', 'testemail1@163.com'), (2, '林某某', '123456', 'testemail2@163.com'), (3, '王某某', '123456', 'testemail3@163.com')]
cursor.execute('SELECT * FROM t_user WHERE user_id=10')
res = cursor.fetchall()
print(res) # 输出:[]
cursor.close()
conn.close()
问题
psycopg2.OperationalError: connection to server at "192.168.88.139", port 15400 failed: none of the server's SASL authentication mechanisms are supported
前提:
opengausspg_hba.conf
关键配置
# IPv4 local connections:
...
host all all 0.0.0.0/0 sha256
解决方法
1、编辑opengauss服务器postgresql.conf
配置文件,修改password_encryption_type
为1
password_encryption_type = 1 #Password storage type, 0 is md5 for PG, 1 is sha256 + md5, 2 is sha256 only
2、重启数据库服务器
3、修改连接数据库所用用户的密码
测试用数据表
CREATE TABLE IF NOT EXISTS test (
id INT PRIMARY KEY,
log_message VARCHAR(60) NOT NULL
) WITH (
fillfactor = 85 -- 行存表填充因子(预留15%空间用于更新)
)
# -*- coding:utf-8 -*-
import re
import traceback
import psycopg2
from utils.log import logger
class PostgreSQLCli:
def __init__(self, db_name='', db_host='', port=3306, user='', password='', connect_timeout=15):
try:
self.dbconn = None
self.host = db_host
self.port = port
self.user = user
self.passwd = password
self.db_name = db_name
self.connect_timeout = connect_timeout
self.connect_config = {'host': self.host, 'port': self.port, 'user': self.user, 'password': self.passwd, 'database': self.db_name}
self.__connect_database()
logger.debug('初始化数据库连接成功(数据库:%s)' % self.db_name)
except Exception as e:
raise Exception('初始化数据库(%s)连接失败:%s' % (self.db_name, traceback.format_exc()))
def __connect_database(self):
self.dbconn = psycopg2.connect(**self.connect_config)
def insert(self, query, params=None):
'''插入单条数据
示例:
:query "INSERT INTO test (x) VALUES(%(x)s)"
:params {'x': 100}
:query "INSERT INTO test (x) VALUES(%s)"
:params [100] 或者(100,)
'''
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.execute(query, params)
db_cursor.execute('commit')
db_cursor.close()
except Exception:
db_cursor.execute('rollback')
db_cursor.close()
raise Exception(f'执行数据库插入操作({query})失败:{traceback.format_exc()}')
def insert_many(self, query, params=None):
'''插入多条数据
示例:
:query "INSERT INTO test (x) VALUES(%(x)s)"
:params [{'x': 100}, {'x': 101}]
:query "INSERT INTO test (x) VALUES(%s)"
:params [[100], [101]] 或者[(100,), (101, )]
'''
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.executemany(query, params)
db_cursor.execute('commit')
db_cursor.close()
except Exception:
db_cursor.execute('rollback')
db_cursor.close()
raise Exception(f'执行数据库批量插入操作({query})失败:{traceback.format_exc()}')
def delete(self, query, params=None):
'''例子:
:query DELETE FROM test WHERE id = %(id)s'
:params {'id': 1}
当然,也可以把参数放到query中 DELETE FROM test WHERE id = 2
'''
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.execute(query, params)
db_cursor.execute('commit')
db_cursor.close()
except Exception:
db_cursor.execute('rollback')
db_cursor.close()
raise Exception(f'执行数据库删除操作({query})失败:{traceback.format_exc()}')
def update(self, query, params=None):
'''
例子:
:query "UPDATE test SET log_message=%(log_message)s WHERE id = %(id)s"
:params {'log_message':'log message', 'id': 2}
当然,也可以把参数放到query中 UPDATE test SET log_message='log message' WHERE id = 2
'''
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.execute(query, params)
db_cursor.execute('commit')
db_cursor.close()
except Exception:
db_cursor.execute('rollback')
db_cursor.close()
raise Exception(f'执行数据库更新操作({query})失败:{traceback.format_exc()}')
def select(self, query, params=None):
'''查询结果最多只包含一条记录
示例:查询获取获取id为2的记录
:query 'SELECT * FROM test WHERE id = %(id)s'
:param {'id': 2}
当然,也可以把参数放到query中 SELECT * FROM test WHERE id = 2
'''
result = []
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.execute(query, params)
query_result = db_cursor.fetchall()
if query_result:
result = query_result[0]
db_cursor.close()
except Exception:
db_cursor.close()
raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')
return result
def select_many(self, query, params=None):
'''查询,查询结果包含多条记录'''
try:
db_cursor = self.dbconn.cursor()
except Exception:
logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库(%s)' % (traceback.format_exc(), self.db_name))
self.__connect_database()
db_cursor = self.dbconn.cursor()
try:
db_cursor.execute(query, params)
query_result = db_cursor.fetchall()
db_cursor.close()
except Exception:
db_cursor.close()
raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')
return query_result
def close(self):
if self.dbconn:
self.dbconn.close()
self.dbconn = None
def __del__(self):
self.close()
# 测试
if __name__ == '__main__':
db_cli = PostgreSQLCli(db_name='testdb', db_host='192.168.88.139', port=15400, user='testacc', password='test1234#')
db_cli.insert('INSERT INTO test (id, log_message) VALUES(%s, %s)', [1, 'test message 1'])
db_cli.insert_many('INSERT INTO test (id, log_message) VALUES(%s, %s)', [[2, 'test message 2'],[3, 'test message 3']])
res = db_cli.select_many('SELECT * FROM test')
print(res) #输出:[(1, 'test message 1'), (2, 'test message 2'), (3, 'test message 3')]
res = db_cli.select('SELECT * FROM test WHERE id = %(id)s', {'id': 2})
print(res) # 输出:(2, 'test message 2')
res = db_cli.select('SELECT * FROM test WHERE id = 2')
print(res)
db_cli.update("UPDATE test SET log_message = %(log_message)s WHERE id = %(id)s", {'log_message':'log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S'), 'id': 2})
res = db_cli.select('SELECT * FROM test WHERE id = 2')
print(res)
db_cli.update("UPDATE test SET log_message = '%s' WHERE id = 2" % ('log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S')))
res = db_cli.select('SELECT * FROM test WHERE id = 2')
print(res)
db_cli.delete('DELETE FROM test WHERE id IN (%(id)s, %(id2)s)', {'id': 2, 'id2': 3})
res = db_cli.select_many('SELECT * FROM test')
print(res)
注意:误区
当前驱动版本下验证,使用类似以下代码,尝试切换当前数据库至目标数据库test_db,然后获取获取test_db数据库中所有表
db_cli.select('USE `test_db`')
tables = db_cli.select('SHOW TABLES')
实际执行结果,db_cli.select('SHOW TABLES')
总是返回初始化连接时连接的数据库的中的表。解决方法如下:
tables = db_cli.select('SHOW TABLES FROM test_db')
注意:如果表名中存在特殊字符比如 / 时,表名需要加双引号,否则会报错
db_cli.select('SELECT * FROM db_name.schema_name."agent/defualt");
db_cli.select('SELECT * FROM "agent/defualt");