`
chenzehe
  • 浏览: 532751 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

记一次线程池的使用

阅读更多

本文介绍多线程在使用时由直接new Thread()到Executors.newCachedThreadPool(),再到直接new ThreadPoolExecutor()的过程。

 

背景:

项目需要在Servlet前加个Filter做数据的转发,只是数据的简单透传,为了不影响用户的体验,而采用多线程来实现。

 

最简单的实现

直接在filter中new Thread来实现,如下:

 但是访问量一大,这种方法会把系统搞死。

 

线程池实现

后面采用线程池的方式实现,虽然之前也有研究过线程池,但是对线程池的记忆最深的就是线程池是对线程的缓存,重复利用,避免线程的重复创建和销毁带来的性能损耗,还有就是ExecutorService类提供的几种线程池方式:signle线程数的线程、线程数固定的线程、线程数不固定的线程。当时也没多考虑,担心影响系统的并发数,就直接使用Executors.newCachedThreadPool()缓存的线程池,如下:

 但是上线一段时间后,运维反馈系统不太稳定,但也没有直接原因表示是该线程池导致,接着研究。

 

自己的设想

后面觉得这里就是数据的转发,其实也就是一个典型的生产者和消费者模型,可以使用一个阻塞队列来把请求数据缓冲起来,再使用一个线程池把数据消费掉,想到之前研究过的并发大师Doug Lea写的几个并发队列,不错,再比较下这几个队列使用的是显示锁还是CAS就能确定使用哪个队列了,有搞头...

 

线程池的原理

古人云:没文化,真可怕。正当我开开心心的写我的生产者消费者,使用线程池来消费队列里的任务时悲剧的发现,线程池也有队列来缓存任务,我上面的设计就是线程池的另一个作用,线程池不只是对线程的管理,避免线程的重复创建和销毁带来的性能损耗,而且还对任务的缓冲,提高系统的响应速度。

 

比较靠谱的实现

通过直接研究线程池的实现类ThreadPoolExecutor,而不是之前只关注线程池的铺助类和接口,使用线程数量固定、LinkedBlockingQueue为缓存任务的队列,如果任务队列满了就做丢弃处理,如下:

 

ThreadPoolExecutor类的构造函数为:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

 

 创建一个线程池需要输入几个参数:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
    • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

 明白了上面线程池的构造参数定义后,再回头看前面使用Executors.newCachedThreadPool()方法的实现就是个坑:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 

线程池的合理配置

1、线程池的线程数量:一般建议2*CPU(Runtime.getRuntime().availableProcessors()),如果是I/O密集型的任务,可以再大点。

2、队列的使用: 建议使用有界队列,并且经过测试确定任务数在队列里占用内存大小,这里我测试了5W个任务时大约占用20M内存。

 

Java的线程实现

并发的实现可以通过多种方式来实现,例如:单进程-单线程模型,通过在一台服务器上启多个进程实现多任务的并行处理。但是在JAVA语言中,通过是通过单进程-多线程的模型进行多任务的并发处理。每个java.lang.Thread类的实例就代表一个线程,但是Thread类的很多接口都被声明为Native,直接调用本地方法实现,所以在new的Thread实例过多时,不但占用了JVM的堆内存,还要考虑占用系统的本地内存。

public synchronized void start() {
    if (threadStatus != 0 || this != me)
        throw new IllegalThreadStateException();
    group.add(this);
    start0();
    if (stopBeforeStart) {
	stop0(throwableFromStop);
    }
}

private native void start0();

 

 主流的操作系统都提供了线程实现,目前实现线程的方式主要有三种,分别是:

  1. 内核线程(KLT)实现,这种线程由内核来完成线程切换,内核通过线程调度器对线程进行调度,并负责将线程任务映射到不同的处理器上;
  2. 用户线程实现(UT),通常情况下,用户线程指的是完全建立在用户空间线程库上的线程,用户线程的创建、启动、运行、销毁和切换完全在用户态中完成,不需要内核的帮助,因此执行性能更高;
  3. 混合实现:将内核线程和用户线程混合在一起使用的方式。

由于虚拟机规范并没有强制规定JAVA的线程必须使用哪种方式实现,因此,不同的操作系统实现的方式也可能存在差异。对于SUN的JDK,在Windows和Linux操作系统上采用了内核线程的实现方式,在Solaris版本的JDK中,提供了一些专有的虚拟机线程参数,用于设置使用哪种线程模型。

 

JAVA内存模型

Java内存模型规定所有的变量都存储在主内存中(JVM内存的一部分),每个线程有自己独立的工作内存,它保存了被该线程使用的变量的主内存拷贝,线程对这些变量的操作都在自己的工作内存中进行,不能直接操作主内存和其它工作内存中存储的变量或者变量副本,线程间的变量访问需通过主内存来完成,三者的关系如下图所示:

 

JAVA内存模型定义了八种操作来完成主内存和工作内存的变量访问,具体如下:

  1. lock:主内存变量,把一个变量标识为某个线程独占的状态;
  2. unlock:主内存变量,把一个处于锁定状态变量释放出来,被释放后的变量才可以被其它线程锁定;
  3. read:主内存变量,把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
  4. load:工作内存变量,把read读取到的主内存中的变量值放入工作内存的变量拷贝中;
  5. use:工作内存变量,把工作内存中变量的值传递给java虚拟机执行引擎,每当虚拟机遇到一个需要使用到变量值的字节码指令时将会执行该操作;
  6. assign:工作内存变量,把从执行引擎接收到的变量的值赋值给工作变量,每当虚拟机遇到一个给变量赋值的字节码时将会执行该操作;
  7. store:工作内存变量,把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用;
  8. write:主内存变量,把store操作从工作内存中得到的变量值放入主内存的变量中。

 

总结

线程池的作用不只是线程复用,还能缓冲任务进行流控,new Thread()不但占用JVM内存,还占用本地内存。

 

 

 

分享到:
评论

相关推荐

    记一次使用线程池出现的问题(线程池异常).pdf

    记一次使用线程池出现的问题(线程池异常).pdf记一次使用线程池出现的问题(线程池异常).pdf记一次使用线程池出现的问题(线程池异常).pdf记一次使用线程池出现的问题(线程池异常).pdf记一次使用线程池出现的...

    java线程池概念.txt

    这里线程池线程大小还需要判断一次;前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于...

    10个线程的Python多线程爬虫(采集新浪数据).rar

     使用python编写一个网站爬虫程序,支持参数如下:  spider.py -u url -d deep -f logfile -l loglevel(1-5) --testself -thread number --dbfile filepath --key="HTML5"  参数说明:  -u 指定爬虫开始地址  -...

    SpringBoot学生信息管理系统

    6、用户并发选课控制,采用异步处理,并且使用缓存层Redis记录相关信息,同时采用aop编程思想,在第一次请求选课接口时,加载redis中lua脚本文件(只会加载一次)。| 学生信息管理系统/教务系统管理系统

    基于Spring打造简单高效通用的异步任务处理系统

    l Spring封装Job调度:当任务信息都持久化在DB中之后,我们需要将这些信息读取出来执行具体的业务逻辑操作,这里我们通过ScheduledExecutorFactoryBean来实现对任务的循环调度,比如说可采取每隔5min扫描一次待处理...

    一次Docker中Redis连接暴增的问题排查实战记录

    周六生产服务器出现redis服务器不可用状态,错误信息为: 状态不可用,等待后台检查程序恢复方可使用。Unexpected end of stream; expected type ‘Status’ ...也就是应用程序启动时才被执行一次

    基于C++实现的轻量级Web服务器源码+项目说明.zip

    epoll使用EPOLLONESHOT保证一个socket连接在任意时刻都只被一个线程处理 使用线程池提高并发度,并降低频繁创建线程的开销 同步互斥的介绍 使用RAII手法封装互斥器(pthrea_mutex_t)、 条件变量(pthread_cond_t)等...

    javaSE代码实例

    13.2.2 “一次投入,终身回报”的String内存机制 249 13.2.3 String对象特殊机制付出的代价 252 13.3 StringBuffer类 253 13.3.1 弥补String不足的StringBuffer类 253 13.3.2 编写方法链以及StringBuffer类...

    newrelic-elasticsearch:用于监控Elasticsearch集群的新Relic插件

    每分钟轮询一次集群/节点统计信息并记录指标,包括RPM,线程池,合并,JVM和OS统计信息。 安装 使用New Relic NPI进行交互式安装 请参阅 ./npi install me.snov.newrelic-elasticsearch 码头工人 docker run -e ...

    C/C++笔试题(附答案,华为面试题系列)

     第一次握手:建立连接时,客户端发送syn包(syn=j)到服务器,并进入SYN_SEND状 态,等待服务器确认; 第二次握手:服务器收到syn包,必须确认客户的SYN(ack=j+1),同时自己也发送一个 SYN包(syn=k),即SYN+ACK...

    Python Cookbook

    1.18 一次完成多个替换 36 1.19 检查字符串中的结束标记 39 1.20 使用Unicode来处理国际化文本 40 1.21 在Unicode和普通字符串之间转换 43 1.22 在标准输出中打印Unicode字符 45 1.23 对Unicode数据编码并用于...

    Visual.C#.编程精彩百例

    实例42 线程池(ThreadPool)的应用 实例43 多线程互斥运行 实例44 多线程时钟应用程序 实例45 监视多线程 实例46 防止多线程应用程序死锁 实例47 文件同步操作与应用 实例48 在COM程序设计中使用.NET组件 ...

    Job Plus项目是基于SpringBoot+Vue的轻量级定时任务管理系统+源代码+文档说明

    30. 一致性:基于Redis分布式锁保证集群分布式调度的最终一致性, 一次任务调度只会触发一次执行; 31. 全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论...

    Java范例开发大全(全书源程序)

    实例85 寻找指定字符第一次出现的位置 114 实例86 寻找指定字符最后出现的位置 115 实例87 我究竟有多长 116 实例88 替换指定的字符 117 实例89 分割字符串 117 实例90 如何使用substring()方法截取子串 118 ...

    vc++ 应用源码包_6

    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 freeeim_FreeEIM_企业即时通讯软件源代码2010年8月份最新版 FTP、HTTP 多线程断点续传下载文件 源码 gdiplus应用实例 ...

    vc++ 应用源码包_5

    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 freeeim_FreeEIM_企业即时通讯软件源代码2010年8月份最新版 FTP、HTTP 多线程断点续传下载文件 源码 gdiplus应用实例 ...

    vc++ 应用源码包_1

    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 freeeim_FreeEIM_企业即时通讯软件源代码2010年8月份最新版 FTP、HTTP 多线程断点续传下载文件 源码 gdiplus应用实例 ...

    vc++ 应用源码包_3

    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 freeeim_FreeEIM_企业即时通讯软件源代码2010年8月份最新版 FTP、HTTP 多线程断点续传下载文件 源码 gdiplus应用实例 ...

    vc++ 应用源码包_2

    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 freeeim_FreeEIM_企业即时通讯软件源代码2010年8月份最新版 FTP、HTTP 多线程断点续传下载文件 源码 gdiplus应用实例 ...

Global site tag (gtag.js) - Google Analytics