Java 自學筆記 07 - Concurrent Queue (w/ Producer Consumer Problem)
在 Java 的多線程操作中,Concurrent Queue 是一種非常重要的容器,在共享資源的同時也能夠幫我們進行流程控制。
Java 的 Concurrent Queue 可以分為兩類,分別是 Blocking Queue 以及 Non Blocking Queue,兩種都是線程安全的,文章的最後會用 Blocking Queue 來實作 Producer Consumer Problem !
Non Blocking Queue
Non Blocking Queue 其實就是一般的 Queue,對 Queue 進行操作要馬成功要馬失敗,無論如何會立刻知道結果。
ConcurrentLinkedQueue 使用 CAS (Compare-And-Swap) 的機制來實現同步功能,對於響應式的應用來說,等待時間通常是不被允許的,這時候 Non Blocking Queue 會是好選擇。
什麼是 CAS (Compare-And-Swap) ?
CAS 是原子操作的一種 (atomic),它的基本思想是比較目標內存位置的值與預期值,只有在相等的情況下才將該位置的值更新為新值。
以 queue 的插入來說,首先要記錄 queue 尾部位置,每次操作時去檢查現在的位置跟所記錄的是否一樣,如果改變了就重來一遍,沒變的話就插入並記錄下新的尾部位置。
Blocking Queue
Blocking Queue 相比於 Non Blocking Queue 多了阻塞等待的動作,大部分的 Concurrent Queue 都是屬於 Blocking Queue,如 ArrayBlockingQueue、LinkedBlockingQueue 等,其阻塞是基於 Lock 來實現的,具體來說 Blocking Queue 多了四種操作:
- put(e): 從尾部放入元素,如果佇列已滿則阻塞等待,直到出現空位並放入。
- take(e): 從頭部取出元素,如果佇列為空則阻塞等待,直到出現元素並取出。
- offer(e, 時間, 時間單位): 從尾部放入元素,如果佇列已滿則阻塞等待一段時間,超過時間則停止等待並回傳 false。
- poll(e, 時間, 時間單位): 從頭部取出元素,如果佇列已滿則阻塞等待一段時間,超過時間則停止等待並回傳 false。
Producer Consumer Problem
用一個簡單的 Producer Consumer Problem 來練習:
6 Producers * 5 messages --> 3 Consumers * 10 messages
- 使用 LinkedBlockingQueue 作為傳遞訊息的容器
- Producer 使用
put()
將訊息放入 queue,因為沒有限制長度所以基本不會阻塞 - Consumer 使用
take()
將訊息從 queue 中取出,因為 要等 Producer 產出訊息所以會阻塞
1 | import java.util.*; |
執行結果如下,每次跑都會不太一樣:1
2
3Consumer 2 received: [1, 3, 4, 2, 4, 5, 4, 1, 3, 3]
Consumer 1 received: [6, 1, 3, 5, 1, 2, 4, 6, 4, 2]
Consumer 3 received: [2, 6, 5, 5, 5, 1, 2, 6, 3, 6]
參考資料
A Guide to Concurrent Queues in Java
LinkedBlockingQueue与ConcurrentLinkedQueue的区别