如何设计一个能根据任务优先级来执行的线程池

根据任务优先级来执行的线程池

  • 不同的线程池会选用不同的阻塞队列作为任务队列,比如FixedThreadPool 使用的是LinkedBlockingQueue(有界队列),默认构造器初始的队列长度为 Integer.MAX_VALUE ,由于队列永远不会被放满,因此FixedThreadPool最多只能创建核心线程数的线程。
  • 假如需要实现一个优先级任务线程池的话,那可以考虑使用 PriorityBlockingQueue (优先级阻塞队列)作为任务队列(ThreadPoolExecutor 的构造函数有一个 workQueue 参数可以传入任务队列)。

要想让 PriorityBlockingQueue 实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:

实现 Comparable 接口

提交到线程池的任务实现 Comparable 接口,并重写 compareTo 方法来指定任务之间的优先级比较规则。

缺点:1.任务类必须实现 Comparable 接口,硬编码不够灵活。2.如果需要多种优先级规则,任务类代码会变得复杂。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.*;

public class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private final int priority;
    private final String name;

    public PriorityTask(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("Executing task: " + name + " with priority: " + priority);
    }

    @Override
    public int compareTo(PriorityTask other) {
        return Integer.compare(this.priority, other.priority); // 优先级值越小,优先级越高
    }
}

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>());
    }
}


//使用示例
public class Main {
    public static void main(String[] args) {
        PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES);
        executor.execute(new PriorityTask(10, "Low priority task"));
        executor.execute(new PriorityTask(1, "High priority task"));
        executor.execute(new PriorityTask(5, "Medium priority task"));
        executor.shutdown();
    }
}

Comparator

创建 PriorityBlockingQueue 时传入一个 Comparator 对象来指定任务之间的排序规则(推荐)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.*;

public class Task implements Runnable {
    private final int priority;
    private final String name;

    public Task(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("Executing task: " + name + " with priority: " + priority);
    }

    public int getPriority() {
        return priority;
    }
}

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
              new PriorityBlockingQueue<>(11, Comparator.comparingInt(Task::getPriority)));
    }
}


//使用示例
public class Main {
    public static void main(String[] args) {
        PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES);
        executor.execute(new Task(10, "Low priority task"));
        executor.execute(new Task(1, "High priority task"));
        executor.execute(new Task(5, "Medium priority task"));
        executor.shutdown();
    }
}

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计