以下是几种Java实现并发请求的方式:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ConcurrentRequestExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<String>> futures = new ArrayList<>();
// 模拟10个并发请求
for (int i = 0; i < 10; i++) {
final int taskId = i;
futures.add(executor.submit(() -> {
// 模拟HTTP请求
Thread.sleep(1000);
return "Result from task " + taskId;
}));
}
// 获取结果
futures.forEach(future -> {
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
// 关闭线程池
executor.shutdown();
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建自定义线程池
ExecutorService executor = Executors.newWorkStealingPool();
List<CompletableFuture<String>> futures = new ArrayList<>();
// 提交异步任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟HTTP请求
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from task " + taskId;
}, executor);
futures.add(future);
}
// 合并所有任务
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// 获取所有结果
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
// 处理结果
allResults.thenAccept(results -> {
results.forEach(System.out::println);
executor.shutdown();
}).join();
}
}
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class ReactorExample {
public static void main(String[] args) {
// 创建10个请求的流
Flux.range(1, 10)
.flatMap(i -> Flux.just(i)
.publishOn(Schedulers.elastic())
.map(id -> {
// 模拟HTTP请求
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from task " + id;
}))
.doOnNext(System.out::println)
.blockLast(Duration.ofSeconds(30)); // 等待所有任务完成
}
}
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class HttpClientConcurrentExample {
public static void main(String[] args) throws InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
ExecutorService executor = Executors.newFixedThreadPool(5);
List<CompletableFuture<String>> futures = new ArrayList<>();
// 模拟发送多个请求
for (int i = 0; i < 3; i++) {
final int requestId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://jsonplaceholder.typicode.com/todos/" + (requestId + 1)))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return "Request " + requestId + " response: " + response.body();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
return "Error in request " + requestId;
}
}, executor);
futures.add(future);
}
// 等待所有请求完成并处理结果
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
futures.forEach(f -> System.out.println(f.join()));
executor.shutdown();
}).join();
}
}
这些示例展示了不同的并发模式:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。