CountDownLatch使用_countdownlatch一个线程执行100条数据-程序员宅基地

技术标签: java  多线程  

CountDownLatch

表示计数器,可以给CountDownLatch设置一个数字

一个线程调用CountDownLatch的await()方法,将会阻塞

其他线程可以调用CountDownLatch的countDown()方法来对CountDownLatch中的数字减一, 当数字被减成0后,所有await的线程都将被唤醒。

对应的底层原理就是,调用await()方法的线程会利用AQS排队,一旦数字被减为0,则会将AQS中排队的线程依次唤醒。

Semaphore

Semaphore就是信号量,Semaphore可以阻塞线程并且可以控制同时访问线程的个数,通过acquire()获取一个许可,如果没有获取到就继续等待,通过release()释放一个许可。Semaphore和锁有点类似,都可以控制对某个资源的访问权限。

使用场景

CountDownLatch可以实现类似计数器的功能。比如有一个任务A,它要等到其它3任务完成才能执行,此时就可以用CountDownLatch来实现。

Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。
假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

案例1:多线程分段处理List集合

实际应用中,分批后线程数量过大,会导致线程阻塞,线程切换上下文,效率不高,本次是根据数据量动态设置线程数,同时控制最大并发数量(业务中有IO操作,避免过大并发导致堵塞),实现效率提高

思路:

         1根据数据量动态设置线程数
         2.使用Semaphore 控制允许并发访问线程的个数
         3.CountDownLatch计数器闭锁

如何控制某个方法允许并发访问线程的个数?
    Semaphore类有两个重要方法
    1、semaphore.acquire();
        请求一个信号量,这时候信号量个数-1,当减少到0的时候,下一次acquire不会再执行,只有当执行一个release()的时候,信号量不为0的时候才可以继续执行acquire
    2、semaphore.release();
        释放一个信号量,这时候信号量个数+1,

也就是说在线程里执行某个方法的时候,在方法里用该类对象进行控制,就能保证所有的线程中最多只有指定信号量个数个该方法在执行。

举例:我开启了100个线程,执行一个()方法,但是我只想要所有线程中,最多有五个线程在执行该方法,其他的线程就必须排队等待。
则可以使用Semaphore对象进行控制,该对象new初始化的时候有个int参数,即指定最多信号量个数。
 

  /**
     * <p>
     * 实例3:多线程分段处理List集合
     * </P>
     */
    @Test
    public void multiThreadedListHandler3() {

        // 开始时间
        long start = System.currentTimeMillis();
        List<String> list = new ArrayList<>(10000);
        for (long i = 1; i <= 20000000; i++) {
            list.add(i + "");
        }
        int listSize = list.size();
        //执行数量
        int limitNum = 1000;
        //线程数(也就是执行次数)
        int threadNum = listSize % limitNum == 0 ? listSize / limitNum : listSize / limitNum + 1;
        int pcount = Runtime.getRuntime().availableProcessors();
        //最大线程数控制
        int maxthreadNum = 5;

        ExecutorService executor = new ThreadPoolExecutor(5, maxthreadNum, 1, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        //最大并发线程数控制
        final Semaphore semaphore = new Semaphore(maxthreadNum);
        List handleList = null;
        for (int i = 0; i < threadNum; i++) {
            if ((i + 1) == threadNum) {
                int startIndex = i * limitNum;
                int endIndex = list.size();
                handleList = list.subList(startIndex, endIndex);
            } else {
                int startIndex = i * limitNum;
                int endIndex = (i + 1) * limitNum;
                handleList = list.subList(startIndex, endIndex);
            }
            SyncTask task = new SyncTask(handleList, countDownLatch, semaphore);
            executor.execute(task);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
            System.out.println("线程任务执行结束");
            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        }
    }





@Slf4j
static class SyncTask implements Runnable {
    private List<String> list;
    private CountDownLatch countDownLatch;
    private Semaphore semaphore;

    public SyncTask(List<String> list, CountDownLatch countDownLatch, Semaphore semaphore) {
        this.list = list;
        this.countDownLatch = countDownLatch;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        if (!CollectionUtils.isEmpty(list)) {
            try {
                semaphore.acquire();
                list.stream().forEach(t -> {
                    //业务处理
                });
                System.out.println(Thread.currentThread().getName() + "线程:" + list);
                //  log.debug(String.format("%s", Thread.currentThread().getName() + "线程:" + list));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
        //线程任务完成
        countDownLatch.countDown();
    }
}

案例2:

将集合按指定数量分组,list中的元素被平均分配到n个集合中(平均分配给坐席)

/**
     * 将集合按指定数量分组,list中的元素被平均分配到n个集合中(平均分配给坐席)
     *
     * @param list             数据集合
     * @param currentSeatsList 客服名单集合
     * @return 分组结果
     */
    @Test
    public void collectionElementsAreGroupedByAverage() {

        // 开始时间
        long start = System.currentTimeMillis();
        //数据
        List<String> list = new ArrayList<>(10);
        for (long i = 1; i <= 123; i++) {
            list.add(i + "");
        }
        //客服名单
        List<String> currentSeatsList = new ArrayList<>(10);
        for (long i = 1; i <= 12; i++) {
            currentSeatsList.add("customer" + i);
        }

        int listSize = list.size();
        //根据坐席名单计算分批数量,分批数量=listSize/坐席数量
        int limitNum = listSize % currentSeatsList.size() == 0 ? listSize / currentSeatsList.size() : listSize / currentSeatsList.size() + 1;
        //执行次数
        int batchNum = listSize % limitNum == 0 ? listSize / limitNum : listSize / limitNum + 1;
        //cpu
        int pcount = Runtime.getRuntime().availableProcessors();
        //最大线程数控制
        int maxthreadNum = 5;

        ExecutorService executor = new ThreadPoolExecutor(pcount, maxthreadNum, 1, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(batchNum), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

        //CountDownLatch计数器闭锁
        CountDownLatch countDownLatch = new CountDownLatch(batchNum);
        //控制最大并发线程数量
        final Semaphore semaphore = new Semaphore(maxthreadNum);

        List handleList = null;
        for (int i = 0; i < batchNum; i++) {
            if ((i + 1) == batchNum) {
                int startIndex = i * limitNum;
                int endIndex = list.size();
                handleList = list.subList(startIndex, endIndex);
            } else {
                int startIndex = i * limitNum;
                int endIndex = (i + 1) * limitNum;
                handleList = list.subList(startIndex, endIndex);
            }
            Task1 task = new Task1(handleList, countDownLatch, semaphore, currentSeatsList.get(i));
            executor.execute(task);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
            System.out.println("线程任务执行结束");
            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        }
    }



@Slf4j
class Task1 implements Runnable {
    private List<String> list;
    private CountDownLatch countDownLatch;
    private Semaphore semaphore;
    private String currentSeats;


    public Task1(List<String> list, CountDownLatch countDownLatch, Semaphore semaphore, String currentSeats) {
        this.list = list;
        this.countDownLatch = countDownLatch;
        this.semaphore = semaphore;
        this.currentSeats = currentSeats;
    }

    @Override
    public void run() {
        if (!CollectionUtils.isEmpty(list)) {
            try {
                semaphore.acquire();
                list.stream().forEach(t -> {
                    //业务处理
                });
                String strs = String.format("客服:%s  线程:%s", currentSeats, Thread.currentThread().getName() + list);
                System.out.println(strs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
        //线程任务完成
        countDownLatch.countDown();
    }

}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_40428665/article/details/121751482

智能推荐

[AIGC] Java List和Map常用API以及其Python实现方式对照介绍-程序员宅基地

文章浏览阅读477次,点赞9次,收藏8次。Java和Python作为当今非常浅显易懂的编程语言,其数据结构中对于List和Map(Java)或List和Dict(Python)的操作无疑是每个程序员都非常必需的知识。本文将介绍在Java中对List和Map常用的一些操作,并给出在Python中对应的实现方式。

meson 概述-程序员宅基地

文章浏览阅读1.2w次,点赞8次,收藏42次。meson是一个编译系统,类似于 CMake 或者GNU Autotools. meson只是负责配置构建,后台默认是用ninja来编译的(当然也支持其它后台)。ninja是一个小型的致力于编译速度优化的编译系统,相当于make的替代物。所以meson+ninja相当于Cmake+make。meson设计目标是好用,同时保持高性能。它采用了一种自定义语言,号称简单、清晰和简洁性。很多灵感来自于Python编程语言,具有较好的易读性。meson的另一个设计目标,是为现代编程工具提供辅助的支持,包括单元测试_meson

oracle查看编码以及修改编码-程序员宅基地

文章浏览阅读87次。oracle的编码一直是个很重要的问题,以前也总结的写过,但都忘了,今天再在这写一下。首先查看oracle数据库的编码SQL>select * from nls_database_parameters where parameter ='NLS_CHARACTERSET';PARAMETER--------------------VALUE---..._orcid邮政编号怎么修改

【设计模式】享元模式的使用场景及与其他共享技术的对比-程序员宅基地

文章浏览阅读905次,点赞23次,收藏19次。享元模式(Flyweight Pattern)是一种非常常用的结构型设计模式,通过共享对象的方式,减少系统中的重复对象,提高内存使用效率。本文主要讲了享元模式的概念、使用场景以及与其他技术的对比。在使用方式上,与缓存、池化技术是高度类似的,都是创建好对象并存储起来,在后续想要使用的时候直接从存储的数据结构中获取,而不用重新创建。它与缓存、池化技术之间的区别,更多的是在于使用目的上的区别,只要能判断出,当前的对象是在通过共享对象的方式,减少系统中的重复对象,提高内存使用效率。

3D点云系列(一)点云介绍_点云数据-程序员宅基地

文章浏览阅读4.9k次。点云数据简介 点云数据(point cloud data)是指在一个三维坐标系统中的一组向量的集合。扫描资料以点的形式记录,每一个点包含有三维坐标,有些可能含有颜色信息(RGB)或反射强度信息(Intensity)。 我们常用的点云数据为激光雷达采集的数据,激光雷达的非接触式测量特点,具有测量速度快、精度高、识别准确等优点,成为移动机器人定位导航的核心传感器。在激光雷达技术领域中,目前主要通过三角测距法与TOF方法来进行测..._点云数据

python匿名函数的好处_Python之匿名函数如何理解?-程序员宅基地

文章浏览阅读393次。常规我们定义函数时,函数都是有名字的,比如:def add(x,y):return x+y这个函数是有名字的,叫add,我们在使用的时候直接调用即可:print(add(3,2))那么匿名函数呢,经常看一些代码会出现类似lambda(x,y:x+y),那么我们该如何去理解匿名函数呢?我们先看定义:匿名函数:根据名字就可以知道,被定义的函数是没有名字的;lambda是一个表达式而不是一个语句。它能够..._匿名函数的优点

随便推点

chatgpt生成的一些qt进度条样式_qt进度条绿色-程序员宅基地

文章浏览阅读344次。【代码】chatgpt生成的一些qt进度条样式。_qt进度条绿色

深读 |《人人都是产品经理2.0》上篇_人人都是产品经理2.0电子书-程序员宅基地

文章浏览阅读711次。本文首发于微信公众号:胡先生的理想国《人人都是产品经理2.0》(以下简称“《人人》”)是写给-1至3年级泛互联网产品经理的一本书,苏杰老师的写作思路集中于一个互联网产品从无至有的全过程和产品经理职业生涯规划两条线,并穿插对于互联网创业的一些思考。(《人人》高清思维导图见文章末尾)产品过程:“想清楚”→“做出来”→“推出去”职业生涯:“大话产品经理” →“产品经理的工作” →“产品经理的七层修炼与蜕变”创业建议:“组建团队”→“产品规划”→“大公司与创业公司” →“泛创业”本篇读_人人都是产品经理2.0电子书

Thread Pool Engine, and Work-Stealing scheduling algorithm-程序员宅基地

文章浏览阅读138次。http://pages.videotron.com/aminer/threadpool.htmhttp://pages.videotron.com/aminer/zip/threadpool.zip FPC Pascal v2.2.0+ / Delphi 5+http://pages.videotron.com/aminer/zip/pthreadpool_xe4.zip (for D..._work stealing的不同任务steal方案(窃取一半vs窃取一个大的任务粒度vs小的任务粒

kicad绿油开窗_KiCad里Pcbnew中各层的使用说明-程序员宅基地

文章浏览阅读581次。Kicad里Pcbnew提供了至多50个层供电路板设计师使用。总计32个铜层供导线走线(可覆铜)总计14个固定用途技术层12个技术层对(上技术层和下技术层对称),包括Adhesive,Solder Paste,Silk Screen,Solder Mask,Courtyard,Fabrication,共计6对。在KiCad里Pcbnew的层描述中,F.代表电路板上层(Front),B.代..._kicad铜开窗

浅谈逆向——OD断点简介(OD的使用3)_od断电需要过检测吗-程序员宅基地

文章浏览阅读7.6k次,点赞4次,收藏27次。浅谈逆向-OD断点简介常用断点INT 3断点硬件断点常用断点INT 3断点,硬件断点,内存断点,消息断点…INT 3断点常用,OD中可以使用bp命令或F2快捷键来设置/取消。当执行一个INT 3断点时,该地址处的内容被调试器用INT 3替换了。此时OD将INT 3隐藏起来,显示中断前的指令。由于INT 3指令的机器码时0xCC故此 也常称之为CC指令。当被调试进程执行INT 3指令导致一..._od断电需要过检测吗

Introduction to Causal Inference:Chapter 1因果推断概论-程序员宅基地

文章浏览阅读1.1k次,点赞2次,收藏12次。本文是学习brady neal于2020年开设的因果推断课程Introduction to Causal Inference的记录概述本chapter主要分四个部分:辛普森悖论为什么相关性不是因果关系什么展示了因果关系在观测性研究中如何发现因果关系1 因果推断的动机:辛普森悖论1.1 辛普森悖论案例辛普森悖论(Simpson‘s paradox)是广泛存在于统计学事件的一个现象,指的是分组下的统计表现与总体统计表现相悖。这里举了一个例子,假设有一个新的疾病:COVID-27有两种_introduction to causal inference

推荐文章

热门文章

相关标签