前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程三 并发容器简单使用

多线程三 并发容器简单使用

作者头像
用针戳左手中指指头
发布2021-01-29 11:06:31
2750
发布2021-01-29 11:06:31
举报
文章被收录于专栏:学习计划

1.threadLocal

本地线程变量,在每个线程会独立开辟内存空间。

在高并发先不要使用。

代码语言:javascript
复制
private static ThreadLocal local = new ThreadLocal();

public static void main(String[] args) {
    new Thread(()->{
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        local.set("A");
        System.out.println(Thread.currentThread().getName()+" "+local.get());
    },"线程 1").start();
    new Thread(()->{
        local.set("B");
        System.out.println(Thread.currentThread().getName()+" "+local.get());
    },"线程 2").start();
    System.out.println("main "+local.get());
}

2.ArrayList、Vector、CopyOnWriteArrayList

这里三个容器,ArrayList、vector和CopyOnwriteArrayList

从测试结果中可以看出:ArrayList不是线程安全,会进行很多重复的操作,致使在所有线程运行完后,大小达不到10000,而Vector是线程安全的,不会出现重复读,重复写的问题,而copyOnWriterArrayList呢,是读快,写慢,因为他写的时候要进行复制,大大拉低了效率;

代码语言:javascript
复制
 public static void main(String[] args) {
//        List<String> list1 = new ArrayList<>();// 耗时:12  大小:9754
//        Vector<String> list1 = new Vector<>(); // 耗时:10 大小:10000
        List<String> list1 = new CopyOnWriteArrayList<>(); // 写耗时:75 写大小:10000 读耗时:22
        Thread[] threads = new Thread[100];

        Random r = new Random();
        for (int i = 0; i < 100; i++) {
            threads[i] =  new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    int i1 = r.nextInt(10000);
                    list1.add("A"+i1);
                    ;
                }
            });
        }
        long time = System.currentTimeMillis();
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("大小 "+list1.size());
        System.out.println("耗时:"+(end - time));

        Thread[] threads2 = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads2[i] = new Thread(()->{
                for (int j = 0; j < 10000; j++) {
                    list1.get(j);
                }
            });
        }
        long time2 = System.currentTimeMillis();
        for (Thread thread : threads2) {
            thread.start();
        }
        for (Thread thread : threads2) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("耗时:"+(System.currentTimeMillis() - time2));
    }

Collections提供了一个方法synchronizedList可以将非线程安全的容器转换为线程安全的容器,看它的实现就是在方法上加了一个synchronized关键字,但是使用iterator的时候就不行了,因为他调用的还是List本身的迭代器,没有做同步处理,所以在这种情况下要手动上锁;注意:我们常使用的增加for也是利用了迭代器。

代码语言:javascript
复制
List<String> list = new ArrayList<>();
List<String> strings = Collections.synchronizedList(list);

Synchronized 和Vecotr最主要的区别:

  1. vector扩容为原来的2倍长度,ArrayList扩容为原来的1.5倍
  2. synchronized有很好的扩展和兼容功能,它可以将所有的list的子类转成线程安全的类
  3. 使用synchronizedList的时候,进行遍历需要手动进行同步处理
  4. synchronizedlist可以指定锁定的对象

3.map相关并发容器

代码语言:javascript
复制
public static void main(String[] args) {
//    Map<String, String> map = new ConcurrentHashMap<>(); // 302
//        Map<String, String> map = new ConcurrentSkipListMap<>(); // 358
//    Map<String, String> map = new Hashtable<>(); // 400

      Map<String, String> m = new HashMap<>();
      Map<String, String> map = Collections.synchronizedMap(m); // 465

        Random random = new Random();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for(int j=0; j<10000;j++) {
                    map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
//             map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
                }
                latch.countDown();
            });
        }
        Arrays.asList(threads).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }

hashMap

hashTable

concurrentHashMap

key/value = null

可以存储null键,nullvalue

key和value都不能存储

key和value都不能存储

初始容量

16

11

16

线程安全

效率

< concurrentHashMap

>hashTable

ConcurrentSkipListMap 和上面不同,他的实现方式不一样,且key是有序的,支持更高的并发,存取是是log(N),和线程数无关,线程越多越能体现出优势。

4.ConcurrentLinkedQueue

模拟场景:1000张票,10个售票窗口

代码语言:javascript
复制
public class ThreadContainer2 {

    private static List<Integer> list = new ArrayList<Integer>();
    private Integer tickets = 100;

    public synchronized void sall(){
        if (tickets > 0) {
            list.add(tickets--);
            System.out.println(Thread.currentThread().getName()+" sall "+tickets);
        }
    }

    public Integer getTicket(){
        return tickets;
    }

    public static void main(String[] args) {
        ThreadContainer2 t = new ThreadContainer2();

        System.out.println("开始售票。。。");
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while (true) {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        t.sall();
                    }
            },"售票口 "+(i+1)).start();
        }
    }
}

使用ConcurrentLinkedQueue来实现看看:

代码语言:javascript
复制
private static Queue<Integer> list = new ConcurrentLinkedQueue<Integer>();

static{
    for (int i = 0; i < 1000; i++) {
        list.add(i);
    }
}

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        new Thread(()->{
            while (true) {
                // 这个方法是线程安全的
                Integer poll = list.poll();
                if (poll == null) {
                    break;
                }
                // 就算多个线程都同时来到这里,因为上一个代码是线程安全的,所以就算来到这,也是空
                System.out.println(Thread.currentThread().getName()+" ticket "+poll);
            }
        }," 售票口 "+i).start();
    }
}

代码简化了,没有之前的那种复杂

这个容器比较有用的方法:

代码语言:javascript
复制
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

for (int i = 0; i < 10; i++) {
    queue.add("a"+i);
}
System.out.println("容器"+queue.size());
System.out.println("poll取值:"+queue.poll());
System.out.println("poll 取值后的容器"+queue.size());
System.out.println("peek取值:"+queue.peek());
System.out.println("peek取值后的容量:"+queue.size());

这个容器多用于生产者和消费者问题

offer(e) // 队列可以放入,则返回true,不能返回false

poll() // 取不到数据,返回null,取到的对象会被丢弃

peek() //和poll一样,但是取出的对象不会被丢弃

add() // 底层调用offer,不会抛出异常

5.BlockingQueue提供的方法

抛异常

特定值

阻塞

超时

插入

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

移除

remove(Object o)

E poll()

take()

poll(long timeout, TimeUnit unit)

offer(E e); // 队列可以放入,则返回true,不能返回false

offer(E e, long timeout, TimeUnit unit) // 指定时间内,还不能放入就返回false

E poll() // 取不到数据,返回null,取到的对象会被丢弃

poll(long timeout, TimeUnit unit) // 指定时间内,能取到数据就取队头,没有则返回异常

E peek() // 和poll一样,但是取出的对象不会被丢弃

5.1ArrayBlockingQueue

基于数组的有界队列,创建后无法改变容量

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
   BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
   for (int i = 0; i < 10; i++) {
      strings.put("a" + i);
   }
   strings.add("aaaa");
   strings.offer("aaaa",1, TimeUnit.SECONDS);
   System.out.println(strings);
}

5.2LinkedBlockingQeque

无界,最大长度为Integer.MAX_VALUE

代码语言:javascript
复制
public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

        Thread[] threads = new Thread[1];
        Thread[] threads2 = new Thread[10];

        for (int i = 0; i < 1; i++) {
            threads[i] = new Thread(()->{
//                while (true) {
                    for (int j = 0; j < 100; j++) {
                        try {
                            queue.put(j);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
//                }
            },"线程 "+i);
        }

        for (int i = 0; i < 10; i++) {
            threads2[i] = new Thread(()->{
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName()+" 获取 "+queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        Arrays.asList(threads).forEach(a->a.start());
        Arrays.asList(threads2).forEach(a->a.start());
    }

5.3DelayQueue

队列中的元素需要实现Delayed接口,实现排序和延时的定义

代码语言:javascript
复制
public class ThreadContainer11 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<MyQ> queue = new DelayQueue<>();
        long start = System.currentTimeMillis();
        MyQ m1 = new MyQ(start+1000,"延迟1秒");
        MyQ m2 = new MyQ(start+2000,"延迟2秒");
        MyQ m3 = new MyQ(start+3000,"延迟3秒");
        MyQ m4 = new MyQ(start+4000,"延迟4秒");
        MyQ m5 = new MyQ(start+5000,"延迟5秒");
        queue.put(m1);
        queue.put(m2);
        queue.put(m3);
        queue.put(m4);
        queue.put(m5);

        for (int i = 0; i < 5; i++) {
            System.out.println(queue.take().getName());
        }
    }

    static class MyQ implements Delayed {
        long time ;
        String name;
        public MyQ(long time,String name){
            this.time = time;
            this.name = name;
        }
        // 定义剩余到期时间
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }

        // 定义排序规则
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return 1;
            }else if(this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)){
                return -1;
            }else {
                return 0;
            }
        }
        public String getName(){
            return name;
        }
    }
}

5.4LinkedTransferQueue

比其他队列多一个transfer ;使用transfer 的方式例如有多个消费者,和一个生产者,当存在消费者的情况下,他会将对象给消费者,不会放到队列里,再让消费取,比较快

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

    Thread t1 = new Thread(()->{
        try {
            System.out.println("线程1 "+queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Thread t2 = new Thread(()->{
        try {
            System.out.println("线程2 "+queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    t1.start();
    t2.start();
    
    queue.transfer("aaaaaa");
    System.out.println(queue.size());
}

5.5 PriorityBlockingQueue

这个队列,会对放入的对象进行优先级排序,而优先级排序是有compareTo方法定义的,必须实现接口Comparable,但是他的排序只会将排序后的第一个放到队列第一个,其他仍是乱的

代码语言:javascript
复制
public class ThreadContainer8 implements Comparable {

    private Integer i;

    public ThreadContainer8(){}

    public ThreadContainer8(Integer i) {
        this.i = i;
    }

    @Override
    public int compareTo(Object o) {
        return i - ((ThreadContainer8) o).getI();
    }

    public Integer getI() {
        return i;
    }

    public static void main(String[] args) {
        PriorityBlockingQueue<ThreadContainer8> q = new PriorityBlockingQueue();
        Random r = new Random();
        Thread[] threads = new Thread[10];

      for (int i = 0; i < 10; i++) {
          threads[i] = new Thread(()->{
              for (int j = 0; j < 10; j++) {
                  q.put(new ThreadContainer8(r.nextInt(100)));
              }
          });
      }
        Arrays.asList(threads).forEach(t->t.start());
        Arrays.asList(threads).forEach(t->
                {
                    try {
                        t.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        );
        q.forEach(e-> System.out.print(e.getI()+" "));
        System.out.println();
        System.out.println("s="+q.peek().getI());
    }
}

5.6 SynchronousQueue

同步队列;容量为1,放入一个就要消费掉,不要用add

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        // 报queue full异常
//        for (int i = 0; i < 5; i++) {
//            queue.add(i);
//        }
        System.out.println(queue.size());

        Thread t1 = new Thread(()->{
            try {
                System.out.println("线程1 "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(()->{
            try {
                System.out.println("线程2 "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
        for (int i = 0; i < 2; i++) {
            queue.put(i);
        }
    }

总结:

名称

类名

说明

List相关并发容器

CopyOnWriteArrayList

读快写慢;写前复制,写完替换

Map/Set相关并发容器

ConcurrentHashMap

key和value都不能存储

ConcurrentSkipListMap

key和value都不能存储;key有序;它的存取时间log(N)

队列相关容器

ConcurrentLinkedQeque

FIFO(先入先出);cas实现

ConcurrentLinkedDueue

双向队列

ArrayBlockingQueue

有界队列,固定容量,不支持空元素

DelayQueue

提供过期时间功能;队列头是最接近过期的元素,没有过期元素则会取出null

LinkedBlockingQeque

无界队列,最大是Integer.MAX_VALUE;添加删除不是互斥的

LinkedBlockingDueue

双向阻塞队列

LinkedTransferQueue

比其他队列多transfer方法,这个方法是会将元素给一个正在等待的消费者,不会放入队列,没有消费者则等待

PriorityBlockingQueue

放入对象必须实现Comparable接口,按compareTo实现方法,将优先级高的放在第一个,其他对象无序

SynchronousQueue

容量为1,即时消费,提供了put的阻塞方法,底层使用transfer

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/04/06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.threadLocal
  • 2.ArrayList、Vector、CopyOnWriteArrayList
  • 3.map相关并发容器
  • 4.ConcurrentLinkedQueue
  • 5.BlockingQueue提供的方法
    • 5.1ArrayBlockingQueue
      • 5.2LinkedBlockingQeque
        • 5.3DelayQueue
          • 5.4LinkedTransferQueue
            • 5.5 PriorityBlockingQueue
              • 5.6 SynchronousQueue
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档