当我在vertx mongo客户端上聚合mongo集合时,我得到了上面的错误,有人能帮助我吗?这里有什么问题?
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:31)
at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:294)
at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:276)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:205)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:100)
at com.mongodb.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:458)
at com.mongodb.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:110)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:383)
at com.mongodb.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:359)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:651)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:618)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:487)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:484)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:233)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:216)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:562)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
at com.mongodb.connection.AsynchronousSocketChannelStream.readAsync(AsynchronousSocketChannelStream.java:128)
at com.mongodb.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:484)
at com.mongodb.connection.InternalStreamConnection.access$1100(InternalStreamConnection.java:74)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:608)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:593)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:487)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:484)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:233)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:216)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:439)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
下面是我的代码:
JsonObject pricing_config = new JsonObject()
.put("host", config().getString("pricing_host", ""))
.put("port", Integer.parseInt(config().getString("pricing_port", "0")))
.put("username", config().getString("pricing_username", ""))
.put("password", config().getString("pricing_password", ""))
.put("db_name", config().getString("pricing_db_name", ""))
//.put("authSource", config().getString("pricing_authSource", ""))
.put(MongoConstants.USE_OBJECT_ID_KEY, true);
MongoClient mongoClient = MongoClient.createNonShared(this.vertx, pricing_config);
AggregateOptions options = new AggregateOptions()
//.setMaxTime(20000)
//.setMaxAwaitTime(20000)
.setBatchSize(20)
.setAllowDiskUse(true);
return this.mongoClient.aggregateWithOptions(aggregationCollection, pipeline, options)
.toFlowable()
.map(res -> res.getString("_id"))
.toList();
我尝试了AggregateOptions的各种选项,但不起作用,这里的问题是它不能获取后续的批处理,因为光标在获取第一批处理后超时,如果我将其增加到Integer.MAX,它将只能在一个批处理中处理16MB的数据,否则它将再次失败。
发布于 2020-04-07 00:11:56
我已经找到了这个问题的答案,正如error所说,您不能将MaxTimeMs
参数设置为聚合选项,在聚合操作返回非等待数据游标RC的情况下,这里的MaxTimeMs的默认值是1000ms,因此通过显式地将MaxTimeMs设置为零,可以解决此问题。
因此您的聚合选项如下所示,
AggregateOptions options = new AggregateOptions()
.setBatchSize(10000)
.setMaxAwaitTime(0)
.setAllowDiskUse(true);
https://stackoverflow.com/questions/59742687
复制相似问题