动态详情

Java线程池扩展之关联线程池与业务

2017-8-14 10:23:53 分类:技术博客

自定义线程池工厂

Java API针对不同需求,利用Executors类提供了4种不同的线程池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor。我们以创建固定线程池为例,说明创建线程池的一般做法:

public class CustomExecutorService { // 用默认方式:创建固定大小为50的线程池 private static ExecutorService DEFAULT_EXECUTORS = Executors.newFixedThreadPool(50); @Test public void testExecutorService() throws Exception { // 循环500次执行任务 for (int i = 0; i < 500; i++) {
            Thread thread = new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName());
                }
            }, "default_thread");
            DEFAULT_EXECUTORS.execute(thread);
        } // 加死循环是为了不至于主线程终止,便于用VisualVM查看线程池情况,正式环境中去掉 while (true) {

        }
    }
}

在生产环境里,我们会开启Web服务器的JMX,用VisualVM来远程监控JVM的运行情况,这种方式监控到的线程池情况如下:

输入图片说明

用默认方法创建线程池生成的线程名称都以pool前缀开头。但是在高并发多线程的情况下,如果遇到内存溢出、死锁、线程阻塞等问题,要监控与具体业务相关的线程池的执行情况(睡眠、运行、等待等情况),可能就比较棘手。这时我们需要给线程池里的线程起个与具体业务相关的名字,来与其他线程池创建的线程区分开。完整测试用例代码如下:

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; public class CustomExecutorService { // 用默认方式:创建固定大小为50的线程池 private static ExecutorService DEFAULT_EXECUTORS = Executors.newFixedThreadPool(50); // 用自定义方式:给线程起与业务相关的名字 private static ExecutorService CUSTOM_EXECUTORS = Executors.newFixedThreadPool(50, new EventThreadFactory()); @Test public void testExecutorService() throws Exception { // 循环500次执行任务 for (int i = 0; i < 500; i++) {
            Thread thread = new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName());
                }
            }, "default_thread");
            DEFAULT_EXECUTORS.execute(thread);
        } // 循环500次执行任务 for (int i = 0; i < 500; i++) {
            Thread thread = new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName());
                }
            }, "custom_thread");
            CUSTOM_EXECUTORS.execute(thread);
        } // 加死循环是为了不至于主线程终止,便于用VisualVM查看线程池情况,正式环境中去掉 while (true) {

        }
    }
} /**
 * 事件线程池,只是对标准的DefaultThreadFactory修改了线程名称,便于追踪。
 */ class EventThreadFactory implements ThreadFactory { // 用AtomicInteger来为线程计数,每次加1 private static final AtomicInteger poolNumber = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程所属的线程组 private final ThreadGroup group; // 线程名称 private final String namePrefix;

    EventThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 命名线程 namePrefix = "push-pool-" + poolNumber.getAndIncrement() + "-thread-";
    } public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); // 设置相同的线程优先级,避免线程池里的线程根据优先级争抢资源,保证任务的正常执行 if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY); return t;
    }
}

可以看到,我们创建了EventThreadFactory工厂类来生成与我们业务相关的线程,需要注意的是自定义的线程工程类需要实现ThreadFactory接口,才能被Executors类使用。这里我给线程起名为push-pool,表明这是一个处理push任务,发送push给客户端的线程池。再次运行以上测试用例,查看监控结果:

输入图片说明

默认线程工厂类

默认的线程工厂类在Executors类里面,是一个静态内部类,代码如下:

/**
 * The default thread factory
 */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix;

   DefaultThreadFactory() {
       SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() :
                             Thread.currentThread().getThreadGroup();
       namePrefix = "pool-" +
                     poolNumber.getAndIncrement() + "-thread-";
   } public Thread newThread(Runnable r) {
       Thread t = new Thread(group, r,
                             namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon())
           t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY)
           t.setPriority(Thread.NORM_PRIORITY); return t;
   }
}

上面的EventThreadFactory线程工厂类的定义就源自这个类。在使用线程工厂的时候,Executors类使用defaultThreadFactory方法来获得线程工厂对象,这样避免了调用者与具体线程工厂的强耦合,提高了代码的可扩展性,该方法代码如下:

public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory();
}

线程池创建规范

良好的开端是成功的一半。以下内容摘自阿里巴巴的Java开发手册,对我们排查线程池问题有一定帮助:

1、【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者 “过度切换” 的问题。

2、线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 各个方法的弊端:

  • newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。

  • newCachedThreadPool 和 newScheduledThreadPool: 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

3、【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。

正例:

public class TimerTaskThread extends Thread {
    public TimerTaskThread(){ super.setName("TimerTaskThread"); ...
    }
}