聊聊pg jdbc statement的maxRows参数

本文主要解析一下pg jdbc statement的maxRows参数

Statement.setMaxRows

void setMaxRows(int max)
         throws SQLException
Sets the limit for the maximum number of rows that any ResultSet object generated by this Statement object can contain to the given number. If the limit is exceeded, the excess rows are silently dropped.
Parameters:
max - the new max rows limit; zero means there is no limit
Throws:
SQLException - if a database access error occurs, this method is called on a closed Statement or the condition max >= 0 is not satisfied
See Also:
getMaxRows()

Statement.executeQuery

connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
          fetchSize, flags);

最后是调用QueryExecutorImpl.execute(),然后它又是调用了

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java

// sendOneQuery sends a single statement via the extended query protocol.
  // Per the FE/BE docs this is essentially the same as how a simple query runs
  // (except that it generates some extra acknowledgement messages, and we
  // can send several queries before doing the Sync)
  //
  // Parse S_n from "query string with parameter placeholders"; skipped if already done previously
  // or if oneshot
  // Bind C_n from S_n plus parameters (or from unnamed statement for oneshot queries)
  // Describe C_n; skipped if caller doesn't want metadata
  // Execute C_n with maxRows limit; maxRows = 1 if caller doesn't want results
  // (above repeats once per call to sendOneQuery)
  // Sync (sent by caller)
  //
  private void sendOneQuery(SimpleQuery query, SimpleParameterList params, int maxRows,
      int fetchSize, int flags) throws IOException {
    boolean asSimple = (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0;
    if (asSimple) {
      assert (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) == 0
          : "Simple mode does not support describe requests. sql = " + query.getNativeSql()
          + ", flags = " + flags;
      sendSimpleQuery(query, params);
      return;
    }

    assert !query.getNativeQuery().multiStatement
        : "Queries that might contain ; must be executed with QueryExecutor.QUERY_EXECUTE_AS_SIMPLE mode. "
        + "Given query is " + query.getNativeSql();

    // nb: if we decide to use a portal (usePortal == true) we must also use a named statement
    // (oneShot == false) as otherwise the portal will be closed under us unexpectedly when
    // the unnamed statement is next reused.

    boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    boolean noMeta = (flags & QueryExecutor.QUERY_NO_METADATA) != 0;
    boolean describeOnly = (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) != 0;
    boolean usePortal = (flags & QueryExecutor.QUERY_FORWARD_CURSOR) != 0 && !noResults && !noMeta
        && fetchSize > 0 && !describeOnly;
    boolean oneShot = (flags & QueryExecutor.QUERY_ONESHOT) != 0 && !usePortal;
    boolean noBinaryTransfer = (flags & QUERY_NO_BINARY_TRANSFER) != 0;
    boolean forceDescribePortal = (flags & QUERY_FORCE_DESCRIBE_PORTAL) != 0;

    // Work out how many rows to fetch in this pass.

    int rows;
    if (noResults) {
      rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum
    } else if (!usePortal) {
      rows = maxRows; // Not using a portal -- fetchSize is irrelevant
    } else if (maxRows != 0 && fetchSize > maxRows) {
      // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true)
      rows = maxRows;
    } else {
      rows = fetchSize; // maxRows > fetchSize
    }

    sendParse(query, params, oneShot);

    // Must do this after sendParse to pick up any changes to the
    // query's state.
    //
    boolean queryHasUnknown = query.hasUnresolvedTypes();
    boolean paramsHasUnknown = params.hasUnresolvedTypes();

    boolean describeStatement = describeOnly
        || (!oneShot && paramsHasUnknown && queryHasUnknown && !query.isStatementDescribed());

    if (!describeStatement && paramsHasUnknown && !queryHasUnknown) {
      int queryOIDs[] = query.getStatementTypes();
      int paramOIDs[] = params.getTypeOIDs();
      for (int i = 0; i < paramOIDs.length; i++) {
        // Only supply type information when there isn't any
        // already, don't arbitrarily overwrite user supplied
        // type information.
        if (paramOIDs[i] == Oid.UNSPECIFIED) {
          params.setResolvedType(i + 1, queryOIDs[i]);
        }
      }
    }

    if (describeStatement) {
      sendDescribeStatement(query, params, describeOnly);
      if (describeOnly) {
        return;
      }
    }

    // Construct a new portal if needed.
    Portal portal = null;
    if (usePortal) {
      String portalName = "C_" + (nextUniqueID++);
      portal = new Portal(query, portalName);
    }

    sendBind(query, params, portal, noBinaryTransfer);

    // A statement describe will also output a RowDescription,
    // so don't reissue it here if we've already done so.
    //
    if (!noMeta && !describeStatement) {
      /*
       * don't send describe if we already have cached the row description from previous executions
       *
       * XXX Clearing the fields / unpreparing the query (in sendParse) is incorrect, see bug #267.
       * We might clear the cached fields in a later execution of this query if the bind parameter
       * types change, but we're assuming here that they'll still be valid when we come to process
       * the results of this query, so we don't send a new describe here. We re-describe after the
       * fields are cleared, but the result of that gets processed after processing the results from
       * earlier executions that we didn't describe because we didn't think we had to.
       *
       * To work around this, force a Describe at each execution in batches where this can be a
       * problem. It won't cause more round trips so the performance impact is low, and it'll ensure
       * that the field information available when we decoded the results. This is undeniably a
       * hack, but there aren't many good alternatives.
       */
      if (!query.isPortalDescribed() || forceDescribePortal) {
        sendDescribePortal(query, portal);
      }
    }

    sendExecute(query, portal, rows);
  }

重点看这段

// Work out how many rows to fetch in this pass.

    int rows;
    if (noResults) {
      rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum
    } else if (!usePortal) {
      rows = maxRows; // Not using a portal -- fetchSize is irrelevant
    } else if (maxRows != 0 && fetchSize > maxRows) {
      // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true)
      rows = maxRows;
    } else {
      rows = fetchSize; // maxRows > fetchSize
    }

rows参数对maxRows和fetchSize两个取最小值,最后调用sendExecute

private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException {
    //
    // Send Execute.
    //

    if (logger.logDebug()) {
      logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")");
    }

    byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName());
    int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length);

    // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
    pgStream.sendChar('E'); // Execute
    pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
    if (encodedPortalName != null) {
      pgStream.send(encodedPortalName); // portal name
    }
    pgStream.sendChar(0); // portal name terminator
    pgStream.sendInteger4(limit); // row limit

    pendingExecuteQueue.add(new ExecuteRequest(query, portal, false));
  }

从sendExecute的方法参数命名可以看到rows就是limit参数。

PgResultSet.next

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java

public boolean next() throws SQLException {
    checkClosed();

    if (onInsertRow) {
      throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
          PSQLState.INVALID_CURSOR_STATE);
    }

    if (current_row + 1 >= rows.size()) {
      if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
        current_row = rows.size();
        this_row = null;
        rowBuffer = null;
        return false; // End of the resultset.
      }

      // Ask for some more data.
      row_offset += rows.size(); // We are discarding some data.

      int fetchRows = fetchSize;
      if (maxRows != 0) {
        if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
          // Fetch would exceed maxRows, limit it.
          fetchRows = maxRows - row_offset;
        }
      }

      // Execute the fetch and update this resultset.
      connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);

      current_row = 0;

      // Test the new rows array.
      if (rows.isEmpty()) {
        this_row = null;
        rowBuffer = null;
        return false;
      }
    } else {
      current_row++;
    }

    initRowBuffer();
    return true;
  }

可以看到next方法里头也会根据maxRows参数限制fetchRows值,这里的row_offset是值下一批数据第一个元素在整个查询方法结果集中的下标位置。如果fetchSize+row_offset大于了maxRows,则表示下一批拉取的数据如果按fetchSize去拉取,则总拉取数据量会超过maxRows,因此需要修正fetchRows参数,保证总共拉取的数据不超过maxRows

小结

  • 同时开启fetchSize和maxRows参数时,取最小作为limit来executeQuery
  • maxRows是指executeQuery拉取的数据以及next方法拉取的数据量总和的上限值
  • 对于通用服务而言,设置这个值可以避免因为sql不当导致加载太过数据量最终导致OOM

doc

  • setMaxRows
  • JDBC: LIMIT vs setmaxrows(Resultset)
  • JDBC的fetchsize和maxrows
  • What is the difference between Statement.setMaxRows vs Statement.setFetchsize in Hive

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-01-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏木宛城主

Thinking In Design Pattern——Query Object模式

什么是Query Object模式 Query Object的架构设计 Query Object在服务层的应用 测试 Query Obj...

2126
来自专栏积累沉淀

Java使用JDBC连接Hive(新版本)API封装

网上找了很多封装的API,发现都是过时了的,运行报各种错误,经过了几天的调错,终于可以使用java代码操作hive了 首先看看所需的包 ? 所有的分析都在代码里...

47910
来自专栏鸿的学习笔记

python源码阅读笔记之GC(二)

这是指使用glibc的malloc()方法去申请内存,当然这不是啥对象都用这个,而是取决于分配的内存大小

922
来自专栏青蛙要fly的专栏

项目需求讨论: 文字显示排版— Html格式

我们看到,我用红框框出来的地方 1.直接使用系统自带的AlertDialog的提示框,我们看到了我们更新提示里面的具体内容是(-Bug修改 -新增更新提示);...

1592
来自专栏成长道路

JDBC动态SQL语句连接orcale数据库的工具类

import java.sql.Connection; import java.sql.DriverManager; import java.sql.P...

3130
来自专栏技术随笔

医学影像中Dicom的常用Tag分类与说明

4557
来自专栏码匠的流水账

聊聊pg jdbc的queryTimeout及next方法

本文主要介绍一下pg jdbc statement的queryTimeout及resultSet的next方法

2801
来自专栏跟着阿笨一起玩NET

ASP.NET 存储过程操作

存储过程是存放在数据库服务器上的预先编译好的sql语句。使用存储过程,可以直接在数据库中存储并运行功能强大的任务。存储过程在第一应用程序执行时进行语法检查和编...

1061
来自专栏分布式系统进阶

Influxdb中Select查询请求结果涉及到的一些数据结构

相当于c里面的链表元素,itr指向下一个元素的指针,buf表示当前元素,即FloatPoint类型的链表的迭代器.

1252
来自专栏码匠的流水账

聊聊mysql jdbc的queryTimeout及next方法

本文主要介绍一下mysql jdbc statement的queryTimeout及resultSet的next方法

1132

扫码关注云+社区