前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >TTL的CRR操作

TTL的CRR操作

作者头像
阿超
发布2022-08-21 11:13:23
7500
发布2022-08-21 11:13:23
举报
文章被收录于专栏:快乐阿超快乐阿超

前段时间遇到的TTL(TransmittableThreadLocal)在异步编程中的上下文丢失问题,我是采用了直接更换线程池的方式

但今天抽空看了下官方文档,发现了:

所有TTL值的抓取、回放和恢复方法(即CRR操作)

CRRcapture(快照)replay(回放)restore(备份)

自己简单写了个测试用例,用于在CompletableFuture和并行流场景下解决ThreadLocal的上下文丢失问题

大伙一定要复制到本地跑一下,需要的GAV是这个:

代码语言:javascript
复制
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.12.4</version>
</dependency>

代码:

代码语言:javascript
复制
import com.alibaba.ttl.TransmittableThreadLocal;
import lombok.SneakyThrows;
import org.junit.Ignore;
import org.junit.jupiter.api.Assertions;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

/**
 * TTL单元测试
 *
 * @author <achao1441470436@gmail.com>
 * @since 2022/1/24 18:41
 */
public class TtlTest {

    @Test
    @SneakyThrows
    public void testCompletableFuture() {
        ThreadLocal<Integer> threadLocal = new InheritableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(2).forEach(i -> {
            threadLocal.set(i);
            CompletableFuture.runAsync(() -> Assertions.assertEquals(i, threadLocal.get())).join();
            threadLocal.remove();
        });
    }

    @Test
    @SneakyThrows
    public void testCompletableFutureReplayRestore() {
        ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(2).forEach(i -> {
            threadLocal.set(i);
            // (1) 抓取当前线程的所有TTL值
            final Object captured = TransmittableThreadLocal.Transmitter.capture();
            CompletableFuture.runAsync(() -> {
                // 异步
                // (2) 在线程 B中回放在capture方法中抓取的TTL值,并返回 回放前TTL值的备份
                final Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
                try {
                    // 你的业务逻辑,这里你可以获取到外面设置的TTL值
                    Assertions.assertEquals(i, threadLocal.get());
                } finally {
                    // (3) 恢复线程 B执行replay方法之前的TTL值(即备份)
                    TransmittableThreadLocal.Transmitter.restore(backup);
                }
            }).join();
            threadLocal.remove();
        });
    }

    @Test
    @SneakyThrows
    public void testCompletableFutureTransmitter() {
        ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(2).forEach(i -> {
            threadLocal.set(i);
            // (1) 抓取当前线程的所有TTL值
            final Object captured = TransmittableThreadLocal.Transmitter.capture();
            CompletableFuture.runAsync(() ->
                    // 异步
                    TransmittableThreadLocal.Transmitter.runSupplierWithCaptured(captured, () -> {
                        // 你的业务逻辑,这里你可以获取到外面设置的TTL值
                        Assertions.assertEquals(i, threadLocal.get());
                        return null;
                    })).join();
            threadLocal.remove();
        });
    }

    @Test
    public void testParallelStream() {
        ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set)
                .flatMap(item -> Stream.of(item, item).parallel().peek(i -> Assertions.assertEquals(i, threadLocal.get())))
                .peek(t -> threadLocal.remove()).forEach(System.out::println);
    }

    @Test
    public void testParallelStreamReplayRestore() {
        ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set)
                .flatMap(item -> {
                    // (1) 抓取当前线程的所有TTL值
                    final Object captured = TransmittableThreadLocal.Transmitter.capture();
                    return Stream.of(item, item).parallel().peek(i -> {
                        // 异步
                        // (2) 在线程 B中回放在capture方法中抓取的TTL值,并返回 回放前TTL值的备份
                        final Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
                        try {
                            // 你的业务逻辑,这里你可以获取到外面设置的TTL值
                            Assertions.assertEquals(i, threadLocal.get());
                        } finally {
                            // (3) 恢复线程 B执行replay方法之前的TTL值(即备份)
                            TransmittableThreadLocal.Transmitter.restore(backup);
                        }
                    });
                })
                .peek(t -> threadLocal.remove()).forEach(System.out::println);
    }

    @Test
    public void testParallelStreamTransmitter() {
        ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
        Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set)
                .flatMap(item -> {
                    // (1) 抓取当前线程的所有TTL值
                    final Object captured = TransmittableThreadLocal.Transmitter.capture();
                    return Stream.of(item, item).parallel().peek(i ->
                            // 异步
                            TransmittableThreadLocal.Transmitter.runSupplierWithCaptured(captured, () -> {
                                // 你的业务逻辑,这里你可以获取到外面设置的TTL值
                                Assertions.assertEquals(i, threadLocal.get());
                                return null;
                            }));
                })
                .peek(t -> threadLocal.remove()).forEach(System.out::println);
    }

}

最后测试结果:

image-20220124161010303
image-20220124161010303
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-01-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档