我正在努力弄清楚如何正确地使用Java的执行器。我意识到向ExecutorService提交任务有它自己的开销。然而,我惊讶地看到它是如此之高。
我的程序需要以尽可能低的延迟处理大量数据(股票市场数据)。大多数计算都是相当简单的算术操作。
我试着测试一些非常简单的东西:"Math.random() * Math.random()“
最简单的测试是在一个简单的循环中运行这个计算。第二个测试在匿名Runnable中执行相同的计算(这应该是用来度量创建新对象的成本)。第三个测试将Runnable传递给ExecutorService (这是用于度量引入执行器的成本)。
我在我的小型笔记本电脑(2个cpus,1.5g内存)上运行了这些测试:
(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422(大约四分中有一次,前两个数最后相等)
注意,执行程序比在单个线程上执行花费的时间要长得多。对于1到8之间的线程池大小,这些数字大致相同。
问:我是不是遗漏了一些显而易见的东西,或者这些结果是预期的?这些结果告诉我,我传递给执行者的任何任务都必须做一些非平凡的计算。如果我正在处理数百万条消息,并且需要对每条消息执行非常简单(廉价)的转换,那么我可能仍然无法使用executors...trying将计算分散到多个CPU上,结果可能比在一个线程中执行它们要花费更多。设计决策比我原先想象的要复杂得多。有什么想法吗?
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecServicePerformance {
private static int count = 100000;
public static void main(String[] args) throws InterruptedException {
//warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors();
long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println("simpleCompuation:"+(stop-start));
start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println("computationWithObjCreation:"+(stop-start));
start = System.currentTimeMillis();
computationWithObjCreationAndExecutors();
stop = System.currentTimeMillis();
System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));
}
private static void computationWithObjCreation() {
for(int i=0;i<count;i++){
new Runnable(){
@Override
public void run() {
double x = Math.random()*Math.random();
}
}.run();
}
}
private static void simpleCompuation() {
for(int i=0;i<count;i++){
double x = Math.random()*Math.random();
}
}
private static void computationWithObjCreationAndExecutors()
throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
for(int i=0;i<count;i++){
es.submit(new Runnable() {
@Override
public void run() {
double x = Math.random()*Math.random();
}
});
}
es.shutdown();
es.awaitTermination(10, TimeUnit.SECONDS);
}
}发布于 2009-10-30 10:43:57
使用执行器的
)。
编辑:我改变了你的例子,让它在我的双核x200笔记本电脑上运行。
provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9正如您在源代码中看到的,我也将批处理供应和执行器生命周期从度量中删除。这比其他两种方法更公平。
自己看看结果..。
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecServicePerformance {
private static int count = 100000;
public static void main( String[] args ) throws InterruptedException {
final int cpus = Runtime.getRuntime().availableProcessors();
final ExecutorService es = Executors.newFixedThreadPool( cpus );
final Vector< Batch > batches = new Vector< Batch >( cpus );
final int batchComputations = count / cpus;
for ( int i = 0; i < cpus; i++ ) {
batches.add( new Batch( batchComputations ) );
}
System.out.println( "provisioned " + cpus + " batches to be executed" );
// warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors( es, batches );
long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println( "simpleCompuation:" + ( stop - start ) );
start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreation:" + ( stop - start ) );
// Executor
start = System.currentTimeMillis();
computationWithObjCreationAndExecutors( es, batches );
es.shutdown();
es.awaitTermination( 10, TimeUnit.SECONDS );
// Note: Executor#shutdown() and Executor#awaitTermination() requires
// some extra time. But the result should still be clear.
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreationAndExecutors:"
+ ( stop - start ) );
}
private static void computationWithObjCreation() {
for ( int i = 0; i < count; i++ ) {
new Runnable() {
@Override
public void run() {
double x = Math.random() * Math.random();
}
}.run();
}
}
private static void simpleCompuation() {
for ( int i = 0; i < count; i++ ) {
double x = Math.random() * Math.random();
}
}
private static void computationWithObjCreationAndExecutors(
ExecutorService es, List< Batch > batches )
throws InterruptedException {
for ( Batch batch : batches ) {
es.submit( batch );
}
}
private static class Batch implements Runnable {
private final int computations;
public Batch( final int computations ) {
this.computations = computations;
}
@Override
public void run() {
int countdown = computations;
while ( countdown-- > -1 ) {
double x = Math.random() * Math.random();
}
}
}
}发布于 2009-10-30 04:49:25
这是一个不公平的测试线程池,因为以下原因,
考虑到除了创建对象和运行作业之外,线程池还必须执行以下额外步骤,
G 215
当您有一个真正的作业和多个线程时,线程池的好处将是显而易见的。
发布于 2014-11-19 19:41:57
您提到的“开销”与ExecutorService无关,它是由在Math.random上同步的多个线程造成的,从而创建了锁争用。
所以是的,你遗漏了一些东西(下面的“正确”答案实际上是不正确的)。
下面是一些Java 8代码来演示运行一个简单函数的8个线程,其中没有锁争用:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;
import com.google.common.base.Stopwatch;
public class ExecServicePerformance {
private static final int repetitions = 120;
private static int totalOperations = 250000;
private static final int cpus = 8;
private static final List<Batch> batches = batches(cpus);
private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };
public static void main( String[] args ) throws InterruptedException {
printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
printExecutionTime("Executor pool", ExecServicePerformance::executorPool);
}
private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
long time = 0;
for (int i = 0; i < repetitions; i++) {
Stopwatch stopwatch = Stopwatch.createStarted();
f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
System.out.println(msg + " exec time: " + time);
}
private static void synchronous() {
for ( int i = 0; i < totalOperations; i++ ) {
performanceFunc.apply(i);
}
}
private static void synchronousBatches() {
for ( Batch batch : batches) {
batch.synchronously();
}
}
private static void asynchronousBatches() {
CountDownLatch cb = new CountDownLatch(cpus);
for ( Batch batch : batches) {
Runnable r = () -> { batch.synchronously(); cb.countDown(); };
Thread t = new Thread(r);
t.start();
}
try {
cb.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void executorPool() {
final ExecutorService es = Executors.newFixedThreadPool(cpus);
for ( Batch batch : batches ) {
Runnable r = () -> { batch.synchronously(); };
es.submit(r);
}
es.shutdown();
try {
es.awaitTermination( 10, TimeUnit.SECONDS );
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static List<Batch> batches(final int cpus) {
List<Batch> list = new ArrayList<Batch>();
for ( int i = 0; i < cpus; i++ ) {
list.add( new Batch( totalOperations / cpus ) );
}
System.out.println("Batches: " + list.size());
return list;
}
private static class Batch {
private final int operationsInBatch;
public Batch( final int ops ) {
this.operationsInBatch = ops;
}
public void synchronously() {
for ( int i = 0; i < operationsInBatch; i++ ) {
performanceFunc.apply(i);
}
}
}
}结果对25k操作(Ms)的120次测试的时间:
F 210
获胜者:遗嘱执行人服务。
https://stackoverflow.com/questions/1647990
复制相似问题