服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|JavaScript|易语言|

服务器之家 - 编程语言 - Java教程 - 详解Java中CountDownLatch异步转同步工具类

详解Java中CountDownLatch异步转同步工具类

2021-09-22 00:57lingzhi_ying Java教程

今天给大家带来的是关于Java的相关知识,文章围绕着CountDownLatch异步转同步工具类展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下

使用场景

由于公司业务需求,需要对接socket、MQTT等消息队列。
众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线。无法像http请求有回复。
下发指令给硬件时,需要校验此次数据下发是否成功。
用户体验而言,点击按钮就要知道此次的下发成功或失败。

详解Java中CountDownLatch异步转同步工具类

如上图模型,

第一种方案使用Tread.sleep
优点:占用资源小,放弃当前cpu资源
缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响应第二种方案使用CountDownLatch

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.lzy.demo.delay;
 
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class CountDownLatchPool {
 
    //countDonw池
    private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
    //延迟队列
    private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();
 
    private volatile static boolean flag =false;
    //单线程池
    private final static ExecutorService t = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<>(1));
 
    public static void addCountDownLatch(Integer messageId) {
        CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) );
        if(countDownLatch == null){
            countDownLatch = countDownLatchMap.get(messageId);
        }
        try {
            addDelayQueue(messageId);
            countDownLatch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("阻塞等待结束~~~~~~");
    }
 
    public static void removeCountDownLatch(Integer messageId){
        CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
        if(countDownLatch == null)
            return;
        countDownLatch.countDown();
        countDownLatchMap.remove(messageId);
        System.out.println("清除Map数据"+countDownLatchMap);
    }
 
    private static void addDelayQueue(Integer messageId){
        delayQueue.add(new MessageDelayQueueUtil(messageId));
        clearMessageId();
    }
 
    private static void clearMessageId(){
        synchronized (CountDownLatchPool.class){
            if(flag){
                return;
            }
            flag = true;
        }
        t.execute(()->{
            while (delayQueue.size() > 0){
                System.out.println("进入线程并开始执行");
                try {
                    MessageDelayQueueUtil take = delayQueue.take();
                    Integer messageId1 = take.getMessageId();
                    removeCountDownLatch(messageId1);
                    System.out.println("清除队列数据"+messageId1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            flag = false;
            System.out.println("结束end----");
        });
    }
 
    public static void main(String[] args) throws InterruptedException {
        /*
        测试超时清空map
        new Thread(()->addCountDownLatch(1)).start();
        new Thread(()->addCountDownLatch(2)).start();
        new Thread(()->addCountDownLatch(3)).start();
        */
        //提前创建线程,清空countdown
        new Thread(()->{
            try {
                Thread.sleep(500L);
                removeCountDownLatch(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //开始阻塞
        addCountDownLatch(1);
        //通过调整上面的sleep我们发现阻塞市场取决于countDownLatch.countDown()执行时间
        System.out.println("阻塞结束----");
    }
}
class MessageDelayQueueUtil implements Delayed {
 
    private Integer messageId;
    private long avaibleTime;
 
    public Integer getMessageId() {
        return messageId;
    }
 
    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }
 
    public long getAvaibleTime() {
        return avaibleTime;
    }
 
    public void setAvaibleTime(long avaibleTime) {
        this.avaibleTime = avaibleTime;
    }
 
    public MessageDelayQueueUtil(Integer messageId){
        this.messageId = messageId;
        //avaibleTime = 当前时间+ delayTime
        //重试3次,每次3秒+1秒的延迟
        this.avaibleTime=3000*3+1000 + System.currentTimeMillis();
    }
 
    @Override
    public long getDelay(TimeUnit unit) {
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }
 
    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
    }
}

由于socket并不确定每次都会有数据返回,所以map的数据会越来越大,最终导致内存溢出
需定时清除map内的无效数据。
可以使用DelayedQuene延迟队列来处理,相当于给对象添加一个过期时间

使用方法 addCountDownLatch 等待消息,异步回调消息清空removeCountDownLatch

到此这篇关于详解Java中CountDownLatch异步转同步工具类的文章就介绍到这了,更多相关CountDownLatch异步转同步工具类内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/qq_37256345/article/details/117808156

延伸 · 阅读

精彩推荐