个人知识:我从爪哇极客上读到:“.对于玩具项目来说,SimpleAsyncTaskExecutor是可以的,但是对于任何更大的项目,它都是有风险的,因为它不限制并发线程,也不重用线程。因此,为了安全起见,我们还将添加一个任务执行器bean.”在百隆中,一个非常简单的例子是如何添加我们自己的任务执行器。但我可以找到任何指导,解释什么是后果和一些值得应用的案例。
个人愿望:我正在努力为我们的微服务日志提供一个共同的架构,在Kafka主题上发表日志。对于基于日志的我的情况,“不限制并发线程和不重用它所造成的风险”的说法似乎是合理的。
我正在本地桌面上成功地运行下面的代码,但是我想知道我是否正确地提供了一个定制的任务执行器。
我的问题是:考虑到我已经在使用kafkatempla (即默认情况下是同步的、单例的和线程安全的--至少就其理解而言,用于生成/发送消息),下面的配置是否真的朝着正确的方向去重用线程,避免在使用SimpleAsyncTaskExecutor时意外地创建线程?
生产者配置
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafkaMsgExecutor-");
executor.initialize();
return executor;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
}
生产者
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Async
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(final SendResult<String, String> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
为演示目的:
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
private Producer p;
@Override
public void run(String... strings) throws Exception {
p.send("test", " qualquer messagem demonstrativa");
}
}
发布于 2020-02-27 08:24:23
这是SimpleAsyncTaskExecutor
的默认实现
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
为每个任务创建新线程,用Java创建线程并不便宜:(参考文献)
线程对象使用大量内存,而在大规模应用程序中,分配和释放许多线程对象会造成大量内存管理开销。
使用此任务执行器重复执行任务将对应用程序性能产生负面影响(而且,在默认情况下,此执行器不会限制并发任务的数量)
这就是为什么建议您使用线程池实现,线程创建的开销仍然存在,但是由于线程被重用而不是创建-触发-遗忘,线程的开销大大减少了。
在配置ThreadPoolTaskExecutor
时,应该根据应用程序负载正确地定义两个值得注意的参数:
private int maxPoolSize = Integer.MAX_VALUE
;
这是池中的最大线程数。private int queueCapacity = Integer.MAX_VALUE;
这是排队的最大任务数。当队列已满时,默认值可能导致OutOfMemory异常。使用默认值(Integer.MAX_VALUE
)可能会导致服务器资源不足/崩溃。
您可以通过增加最大池大小setMaxPoolSize()
的数量来改进思想,以减少加载增加时的热度,将核心池大小设置为更高值的setCorePoolSize()
(当负载增加时,maxPoolSize - corePoolSize
之间的线程数将被初始化)。
https://stackoverflow.com/questions/60385961
复制相似问题