我有以下代码:
from sqlalchemy import create_engine
SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
cursor_obj.close()
finally:
connection.close()
如果从数据库中选择“全部”,则可以看到未更新的行。但是,如果我像这样打印语句:
print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
产出如下:
UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"
如果我将其复制并粘贴到MySQL工作台中,它将执行,并且我可以看到行已被更新。
我在工作台中禁用了安全更新,并尝试在执行语句之前将其添加为
cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')
但这也行不通。下面是另一件让人困惑的事情,在我前面的代码中,我运行了以下代码:
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
result = cursor_obj.fetchall()
cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
cursor_obj.close()
finally:
connection.close()
此代码中的update语句工作正常,没有任何问题。我还尝试将echo=True添加到create中:
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)
产出如下:
2021-12-31 10:17:09,613 INFO sqlalchemy.engine.Engine显示像'sql_mode‘这样的变量。
2021-12-31 10:17:09,616 INFO sqlalchemy.engine.Engine原始sql {}
2021-12-31 10:17:09,700 INFO sqlalchemy.engine.Engine显示像'lower_case_table_names‘这样的变量。
2021-12-31 10:17:09,701信息sqlalchemy.engine.Engine生成于0.00143 s {}
2021-12-31 10:17:09,858 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2021-12-31 10:17:09,859 INFO sqlalchemy.engine.Engine原始sql {}
这不是很有用。
我也尝试过:
from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"'))
这会产生以下错误:
TypeError:类型为'TextClause‘的对象没有len()
不太确定从这里往哪里走。
发布于 2022-01-01 08:53:02
使用字符串格式在Python中创建SQL语句很容易出错,如果有更好的工具,应该避免使用。
您可以使用像这样的SQLAlchemy 核心运行这样的原始查询,而不必进入原始连接:
import sqlalchemy as sa
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
# Reflect the database table into an object
tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
# Create an update object
upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)
# The "begin" context manager will automatically commit on exit
with engine.begin() as conn:
conn.execute(upd)
如果需要使用原始SQL,可以这样做(请参阅使用文本SQL):
# We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID = :granule_id'
values = {
'process_start_time': process_start_time,
'process_end_time': process_end_time,
'granule_id': granule_id,
}
with engine.begin() as conn:
conn.execute(sa.text(stmt), values)
https://stackoverflow.com/questions/70544877
复制相似问题