日本国产欧美大码A视频 _国产高颜值极品在线视频_色偷偷亚洲第一综合网_国产精品一二三社区视频_久久久青草视频

IT培訓-高端面授IT培訓機構
云和教育:云和數(shù)據(jù)集團高端IT職業(yè)教育品牌
  • 國家級
    全民數(shù)字素養(yǎng)與技能培訓基地
  • 河南省
    第一批產(chǎn)教融合型企業(yè)建設培育單位
  • 鄭州市
    數(shù)字技能人才(碼農(nóng))培養(yǎng)評價聯(lián)盟

【技術】大數(shù)據(jù)處理系列之(一)Java線程池使用

  • 發(fā)布時間:
    2014-12-17
  • 版權所有:
    云和教育
  • 分享:

前言:最近在做分布式海量數(shù)據(jù)處理項目,使用到了java的線程池,所以搜集了一些資料對它的使用做了一下總結(jié)和探究。文中最核心的東西在于后面兩節(jié)無界隊列線程池和有界隊列線程池的實例使用以及線上問題處理方案。

1. 為什么要用線程池?

在Java中,如果每當一個請求到達就創(chuàng)建一個新線程,開銷是相當大的。在實際使用中,每個請求創(chuàng)建新線程的服務器在創(chuàng)建和銷毀線程上花費的時間和消耗的系統(tǒng)資源,甚至可能要比花在實際處理實際的用戶請求的時間和資源要多的多。除了創(chuàng)建和銷毀線程的開銷之外,活動的線程也需要消耗系統(tǒng)資源。如果在一個JVM中創(chuàng)建太多的線程,可能會導致系統(tǒng)由于過度消耗內(nèi)存或者“切換過度”而導致系統(tǒng)資源不足。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數(shù)目,盡可能減少創(chuàng)建和銷毀線程的次數(shù),特別是一些資源耗費比較大的線程的創(chuàng)建和銷毀,盡量利用已有對象來進行服務,這就是“池化資源”技術產(chǎn)生的原因。

線程池主要用來解決線程生命周期開銷問題和資源不足問題,通過對多個任務重用線程,線程創(chuàng)建的開銷被分攤到多個任務上了,而且由于在請求到達時線程已經(jīng)存在,所以消除了創(chuàng)建所帶來的延遲。這樣,就可以立即請求服務,使應用程序響應更快。另外,通過適當?shù)恼{(diào)整線程池中的線程數(shù)據(jù)可以防止出現(xiàn)資源不足的情況。

網(wǎng)上找來的這段話,清晰的描述了為什么要使用線程池,使用線程池有哪些好處。工程項目中使用線程池的場景比比皆是。

本文關注的重點是如何在實戰(zhàn)中來使用好線程池這一技術,來滿足海量數(shù)據(jù)大并發(fā)用戶請求的場景。

2. ThreadPoolExecutor類

Java中的線程池技術主要用的是ThreadPoolExecutor 這個類。先來看這個類的構造函數(shù),

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize 線程池維護線程的最少數(shù)量

maximumPoolSize 線程池維護線程的最大數(shù)量

keepAliveTime 線程池維護線程所允許的空閑時間

workQueue 任務隊列,用來存放我們所定義的任務處理線程

threadFactory 線程創(chuàng)建工廠

handler 線程池對拒絕任務的處理策略

ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize 設置的邊界自動調(diào)整池大小。當新任務在方法execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當隊列滿時才創(chuàng)建新線程。 如果設置的corePoolSize 和 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。

ThreadPoolExecutor是Executors類的實現(xiàn),Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池,主要有以下幾個:

newSingleThreadExecutor:創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執(zhí)行所有任務。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。

newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。

newCachedThreadPool:創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務)的線程,當任務數(shù)增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。

在實際的項目中,我們會使用得到比較多的是newFixedThreadPool,創(chuàng)建固定大小的線程池,但是這個方法在真實的線上環(huán)境中還是會有很多問題,這個將會在下面一節(jié)中詳細講到。

當任務源源不斷的過來,而我們的系統(tǒng)又處理不過來的時候,我們要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經(jīng)包含四種處理策略。

1)CallerRunsPolicy:線程調(diào)用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

r.run();

}

}

這個策略顯然不想放棄執(zhí)行任務。但是由于池中已經(jīng)沒有任何資源了,那么就直接使用調(diào)用該execute的線程本身來執(zhí)行。

2)AbortPolicy:處理程序遭到拒絕將拋出運行時 RejectedExecutionException

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException();

}

這種策略直接拋出異常,丟棄任務。

3)DiscardPolicy:不能執(zhí)行的任務將被刪除

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不拋出異常。

4)DiscardOldestPolicy:如果執(zhí)行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復此過程)

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

e.getQueue().poll();

e.execute(r);

}

}

該策略就稍微復雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最早的任務,然后重新嘗試運行該任務。這個策略需要適當小心。

3. ThreadPoolExecutor無界隊列使用

public class ThreadPool {

private final static String poolName = “mypool”;

static private ThreadPool threadFixedPool = new ThreadPool(2);

private ExecutorService executor;

static public ThreadPool getFixedInstance() {

return threadFixedPool;

}

private ThreadPool(int num) {

executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));

}

public void execute(Runnable r) {

executor.execute(r);

}

public static void main(String[] params) {

class MyRunnable implements Runnable {

public void run() {

System.out.println(“OK!”);

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

for (int i = 0; i < 10; i++) {

ThreadPool.getFixedInstance().execute(new MyRunnable());

}

try {

Thread.sleep(2000);

System.out.println(“Process end.”);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

在這段代碼中,我們發(fā)現(xiàn)我們用到了Executors.newFixedThreadPool()函數(shù),這個函數(shù)的實現(xiàn)是這樣子的:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

它實際上是創(chuàng)建了一個無界隊列的固定大小的線程池。執(zhí)行這段代碼,我們發(fā)現(xiàn)所有的任務都正常處理了。但是在真實的線上環(huán)境中會存在這樣的一個問題,前端的用戶請求源源不斷的過來,后端的處理線程如果處理時間變長,無法快速的將用戶請求處理完返回結(jié)果給前端,那么任務隊列中將堵塞大量的請求。這些請求在前端都是有超時時間設置的,假設請求是通過套接字過來,當我們的后端處理進程處理完一個請求后,從隊列中拿下一個任務,發(fā)現(xiàn)這個任務的套接字已經(jīng)無效了,這是因為在用戶端已經(jīng)超時,將套接字建立的連接關閉了。這樣一來我們這邊的處理程序再去讀取套接字時,就會發(fā)生I/0 Exception. 惡性循環(huán),導致我們所有的處理服務線程讀的都是超時的套接字,所有的請求過來都拋I/O異常,這樣等于我們整個系統(tǒng)都掛掉了,已經(jīng)無法對外提供正常的服務了。

對于海量數(shù)據(jù)的處理,現(xiàn)在業(yè)界都是采用集群系統(tǒng)來進行處理,當請求的數(shù)量不斷加大的時候,我們可以通過增加處理節(jié)點,反正現(xiàn)在硬件設備相對便宜。但是要保證系統(tǒng)的可靠性和穩(wěn)定性,在程序方面我們還是可以進一步的優(yōu)化的,我們下一節(jié)要講述的就是針對線上出現(xiàn)的這類問題的一種處理策略。

4. ThreadPoolExecutor有界隊列使用

public class ThreadPool {

private final static String poolName = “mypool”;

static private ThreadPool threadFixedPool = null;

public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

private ExecutorService executor;

static public ThreadPool getFixedInstance() {

return threadFixedPool;

}

private ThreadPool(int num) {

executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory

(poolName), new ThreadPoolExecutor.AbortPolicy());

}

public void execute(Runnable r) {

executor.execute(r);

}

public static void main(String[] params) {

class MyRunnable implements Runnable {

public void run() {

System.out.println(“OK!”);

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

int count = 0;

for (int i = 0; i < 10; i++) {

try {

ThreadPool.getFixedInstance().execute(new MyRunnable());

} catch (RejectedExecutionException e) {

e.printStackTrace();

count++;

}

}

try {

log.info(“queue size:” + ThreadPool.getFixedInstance().queue.size());

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(“Reject task: ” + count);

}

}

首先我們來看下這段代碼幾個重要的參數(shù),corePoolSize 為2,maximumPoolSize為4,任務隊列大小為2,每個任務平均處理時間為10ms,一共有10個并發(fā)任務。

執(zhí)行這段代碼,我們會發(fā)現(xiàn),有4個任務失敗了。這里就驗證了我們在上面提到有界隊列時候線程池的執(zhí)行順序。當新任務在方法 execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求。 如果運行的線程多于corePoolSize 而少于 maximumPoolSize,則僅當隊列滿時才創(chuàng)建新線程,如果此時線程數(shù)量達到maximumPoolSize,并且隊列已經(jīng)滿,就會拒絕繼續(xù)進來的請求。

現(xiàn)在我們調(diào)整一下代碼中的幾個參數(shù),將并發(fā)任務數(shù)改為200,執(zhí)行結(jié)果Reject task: 182,說明有18個任務成功了,線程處理完一個請求后會接著去處理下一個過來的請求。在真實的線上環(huán)境中,會源源不斷的有新的請求過來,當前的被拒絕了,但只要線程池線程把當下的任務處理完之后還是可以處理下一個發(fā)送過來的請求。

通過有界隊列可以實現(xiàn)系統(tǒng)的過載保護,在高壓的情況下,我們的系統(tǒng)處理能力不會變?yōu)?,還能正常對外進行服務,雖然有些服務可能會被拒絕,至于如何減少被拒絕的數(shù)量以及對拒絕的請求采取何種處理策略我將會在下一篇文章《系統(tǒng)的過載保護》中繼續(xù)闡述。