首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么pgx在提交时返回连接繁忙?

为什么pgx在提交时返回连接繁忙?
EN

Stack Overflow用户
提问于 2022-11-30 14:26:38
回答 1查看 29关注 0票数 1

我有一个函数可以将值批量插入到表中。tx.Commit()返回conn busy。正如我从阅读代码中得到的,conn.Begin()实际上让它很忙。

那么问题是如何正确地做到这一点呢?我应该将事务与批处理查询一起使用吗?或者事务是在引擎盖下创建的?

代码语言:javascript
运行
复制
// InsertItems adds items to the table
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) error {

    conn, err := r.pool.Acquire(ctx)
    if err != nil {
        return fmt.Errorf("acquire connection: %w", err)
    }
    defer conn.Release()

    tx, err := conn.Begin(ctx)
    if err != nil {
        return fmt.Errorf("starting pgx transaction: %w", err)
    }
    defer func() { _ = tx.Rollback(ctx) }()

    batch := pgx.Batch{}

    for _, v := range values {

        query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)

        batch.Queue(query, v.ID, v.Date, v.Amount)
    }

    batchRes := tx.SendBatch(ctx, &batch)
    defer func() {
        if err := batchRes.Close(); err != nil {
            logger.Errorf("closing batch result: %v", err)
        }
    }()

    cmdTag, err := batchRes.Exec()
    if err != nil {
        return fmt.Errorf("batch res exec: %w", err)
    }

    logger.Debugf("inserted rows: %d", cmdTag.RowsAffected())

    if err := tx.Commit(ctx); err != nil {
        return fmt.Errorf("commiting pgx transaction: %w", err)
    }

    return nil
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-30 14:59:08

在对批处理结果调用close之前,您正在调用commit。您需要首先关闭批处理结果,然后才能再次使用底层连接。

要强制执行延迟操作的正确顺序,可以执行以下操作:

代码语言:javascript
运行
复制
// InsertItems adds items to the table
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) (err error) {
    conn, err := r.pool.Acquire(ctx)
    if err != nil {
        return fmt.Errorf("acquire connection: %w", err)
    }
    defer conn.Release()

    batch := new(pgx.Batch)
    for _, v := range values {
        query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)
        _ = batch.Queue(query, v.ID, v.Date, v.Amount)
    }

    tx, err := conn.Begin(ctx)
    if err != nil {
        return fmt.Errorf("starting pgx transaction: %w", err)
    }   
    result := tx.SendBatch(ctx, batch)
    defer func() {
        if e := result.Close(); e != nil {
            logger.Errorf("closing batch result: %v", e)
            err = e
        }
        
        if err != nil {
            _ = tx.Rollback(ctx)
        } else {
            if e := tx.Commit(ctx); e != nil {
                err = e
            }
        }
    }()
    tag, err := result.Exec()
    if err != nil {
        return fmt.Errorf("batch res exec: %w", err)
    }
    
    logger.Debugf("inserted rows: %d", tag.RowsAffected())
    return nil
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74629374

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档