首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用自己的执行器替换默认的SimpleAsyncTaskExecutor有哪些缺点和风险?

用自己的执行器替换默认的SimpleAsyncTaskExecutor有哪些缺点和风险?
EN

Stack Overflow用户
提问于 2020-02-25 00:34:12
回答 1查看 1.6K关注 0票数 0

个人知识:我从爪哇极客上读到:“.对于玩具项目来说,SimpleAsyncTaskExecutor是可以的,但是对于任何更大的项目,它都是有风险的,因为它不限制并发线程,也不重用线程。因此,为了安全起见,我们还将添加一个任务执行器bean.”在百隆中,一个非常简单的例子是如何添加我们自己的任务执行器。但我可以找到任何指导,解释什么是后果和一些值得应用的案例。

个人愿望:我正在努力为我们的微服务日志提供一个共同的架构,在Kafka主题上发表日志。对于基于日志的我的情况,“不限制并发线程和不重用它所造成的风险”的说法似乎是合理的。

我正在本地桌面上成功地运行下面的代码,但是我想知道我是否正确地提供了一个定制的任务执行器。

我的问题是:考虑到我已经在使用kafkatempla (即默认情况下是同步的、单例的和线程安全的--至少就其理解而言,用于生成/发送消息),下面的配置是否真的朝着正确的方向去重用线程,避免在使用SimpleAsyncTaskExecutor时意外地创建线程?

生产者配置

代码语言:javascript
运行
复制
@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

}

生产者

代码语言:javascript
运行
复制
@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Async
    public void send(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(final SendResult<String, String> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

为演示目的:

代码语言:javascript
运行
复制
@SpringBootApplication
public class KafkaDemoApplication  implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);

    }

    @Autowired
    private Producer p;

    @Override
    public void run(String... strings) throws Exception {
        p.send("test", " qualquer messagem demonstrativa");
    }

}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-27 08:24:23

这是SimpleAsyncTaskExecutor的默认实现

代码语言:javascript
运行
复制
protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}

为每个任务创建新线程,用Java创建线程并不便宜:(参考文献)

线程对象使用大量内存,而在大规模应用程序中,分配和释放许多线程对象会造成大量内存管理开销。

使用此任务执行器重复执行任务将对应用程序性能产生负面影响(而且,在默认情况下,此执行器不会限制并发任务的数量)

这就是为什么建议您使用线程池实现,线程创建的开销仍然存在,但是由于线程被重用而不是创建-触发-遗忘,线程的开销大大减少了。

在配置ThreadPoolTaskExecutor时,应该根据应用程序负载正确地定义两个值得注意的参数:

  1. private int maxPoolSize = Integer.MAX_VALUE; 这是池中的最大线程数。
  2. private int queueCapacity = Integer.MAX_VALUE; 这是排队的最大任务数。当队列已满时,默认值可能导致OutOfMemory异常。

使用默认值(Integer.MAX_VALUE)可能会导致服务器资源不足/崩溃。

您可以通过增加最大池大小setMaxPoolSize()的数量来改进思想,以减少加载增加时的热度,将核心池大小设置为更高值的setCorePoolSize() (当负载增加时,maxPoolSize - corePoolSize之间的线程数将被初始化)。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60385961

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档