服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 解决线程池中ThreadGroup的坑

解决线程池中ThreadGroup的坑

2022-02-16 11:07kevin-go Java教程

这篇文章主要介绍了解决线程池中ThreadGroup的坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

线程池中ThreadGroup的坑

在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。

定义一个线程组,通过以下代码可以实现。

?
1
2
ThreadGroup group=new ThreadGroup(“groupName”);
Thread thread=new Thread(group,”the first thread of group”);

ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。

所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。

线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的“父”。

那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?

我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?

ThreadGroup是否可行

通过new Thread(threadGroup,runnable)实现线程池中任务分组

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
        ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        final ThreadGroup group = new ThreadGroup("Main_Test_Group");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(group, new Runnable() {
                @Override
                public void run() {
                    int sleep = (int)(Math.random() * 10);
                    try {
                        Thread.sleep(1000 * 3);
                        System.out.println(Thread.currentThread().getName()+"执行完毕");
                        System.out.println("当前线程组中的运行线程数"+group.activeCount());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, group.getName()+" #"+i+"");
            pool.execute(thread);
        }
    }

运行结果

pool-1-thread-3执行完毕
pool-1-thread-1执行完毕
当前线程组中的运行线程数0
pool-1-thread-2执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
pool-1-thread-4执行完毕
pool-1-thread-5执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0

运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.

从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:

?
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.

Executors内部类DefaultThreadFactory

下面的源码即是默认的Thread工厂

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyTheadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private ThreadGroup defaultGroup;
    public MyTheadFactory() {
        SecurityManager s = System.getSecurityManager();
        defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public MyTheadFactory(ThreadGroup group) {
       this.defaultGroup = group;
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

ThreadGroup的使用及手写线程池

监听线程异常关闭

以下代码在window下不方便测试,需在linux 上 测试

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`
// 模拟关闭 kill PID
public static void main(String[] args)  {
        Runtime.getRuntime().addShutdownHook(new Thread( () -> {
            System.out.println("线程被杀掉了");
        }));
        while(true){
            System.out.println("i am working ...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

如何拿到Thread线程中异常

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
                int i = 10/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.setUncaughtExceptionHandler((t,e)->{
            System.out.println("线程的名字"+ t.getName());
            System.out.println(e);
        });  // 通过注入接口的方式
        thread.start();
    }

ThreadGroup

注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()

线程池使用

自己搭建的简单线程池实现

其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
 * @Author: shengjm
 * @Date: 2020/2/10 9:52
 * @Description:
 */
public class SimpleThreadPool extends Thread{
    /**
     * 线程数量
     */
    private int size;
    private final int queueSize;
    /**
     * 默认线程队列数量
     */
    private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
    private static volatile int seq = 0;
    private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
    private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
    private final DiscardPolicy discardPolicy;
    private volatile boolean destory = false;
    private int min;
    private int max;
    private int active;
    /**
     * 定义异常策略的实现
     */
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException("线程池已经被撑爆了,后继多余的人将丢失");
    };
    /**
     *
     */
    public SimpleThreadPool(){
        this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
    }
    /**
     *
     */
    public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
        this.min = min;
        this.active = active;
        this.max = max;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        init();
    }
 /**
  * 初始化
  */
    private void init() {
        for(int i = 0; i < min; i++){
            createWorkTask();
        }
        this.size = min;
        this.start();
    }
    private void createWorkTask(){
        WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
        task.start();
        THREAD_QUEUE.add(task);
    }
 /**
  * 线程池自动扩充
  */
    @Override
    public void run() {
        while(!destory){
            System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+  TASK_QUEUE.size());
            try {
                Thread.sleep(1000);
                if(TASK_QUEUE.size() > active && size < active){
                    for (int i = size; i < active;i++){
                        createWorkTask();
                    }
                    size = active;
                }else if(TASK_QUEUE.size() > max && size < max){
                    for (int i = size; i < max;i++){
                        createWorkTask();
                    }
                    size = max;
                }
                synchronized (THREAD_QUEUE){
                    if(TASK_QUEUE.isEmpty() && size > active){
                        int release = size - active;
                        for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
                            if(release <=0){
                                break;
                            }
                            WorkerTask task = it.next();
                            task.close();
                            task.interrupt();
                            it.remove();
                            release--;
                        }
                        size = active;
                    }
                }
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    public void submit(Runnable runnable){
        synchronized (TASK_QUEUE){
            if(destory){
                throw new DiscardException("线程池已经被摧毁了...");
            }
            if(TASK_QUEUE.size() > queueSize){
                discardPolicy.discard();
            }
            TASK_QUEUE.addLast(runnable);
            TASK_QUEUE.notifyAll();
        }
    }
 /**
  * 关闭
  */
    public void shutdown(){
        while(!TASK_QUEUE.isEmpty()){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        synchronized (THREAD_QUEUE) {
            int initVal = THREAD_QUEUE.size();
            while (initVal > 0) {
                for (WorkerTask workerTask : THREAD_QUEUE) {
                    if (workerTask.getTaskState() == TaskState.BLOCKED) {
                        workerTask.interrupt();
                        workerTask.close();
                        initVal--;
                    } else {
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            this.destory = true;
        }
    }
    public int getSize() {
        return size;
    }
    public int getMin() {
        return min;
    }
    public int getMax() {
        return max;
    }
    public int getActive() {
        return active;
    }
    /**
     * 线程状态
     */
    private enum TaskState{
        FREE , RUNNING , BLOCKED , DEAD
    }
    /**
     * 自定义异常类
     */
    public static class DiscardException extends RuntimeException{
        public DiscardException(String message){
            super(message);
        }
    }
    /**
     * 定义异常策略
     */
    @FunctionalInterface
    public interface DiscardPolicy{
        void discard() throws DiscardException;
    }
    private static class WorkerTask extends Thread{
        private volatile TaskState taskState = TaskState.FREE;
        public TaskState getTaskState(){
            return this.taskState;
        }
        public WorkerTask(ThreadGroup group , String name){
            super(group , name);
        }
        @Override
        public void run(){
            OUTER:
            while(this.taskState != TaskState.DEAD){
                Runnable runnable;
                synchronized (TASK_QUEUE){
                    while(TASK_QUEUE.isEmpty()){
                        try {
                            taskState = TaskState.BLOCKED;
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            break OUTER;
                        }
                    }
                    runnable = TASK_QUEUE.removeFirst();
                }
                if(runnable != null){
                    taskState = TaskState.RUNNING;
                    runnable.run();
                    taskState = TaskState.FREE;
                }
            }
        }
        public void close(){
            this.taskState = TaskState.DEAD;
        }
    }
    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
        IntStream.rangeClosed(0,40).forEach(i -> {
            simpleThreadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
            });
        });
//        try {
//            Thread.sleep(15000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        simpleThreadPool.shutdown();
    }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/tyBaoErGe/article/details/50196379

延伸 · 阅读

精彩推荐