forked from DreamCats/java-notes
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathProdConsumerBlockingQueue.java
More file actions
84 lines (71 loc) · 2.86 KB
/
ProdConsumerBlockingQueue.java
File metadata and controls
84 lines (71 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* @program JavaBooks
* @description: ProdConsumerBlockingQueue
* @author: mf
* @create: 2020/02/16 14:20
*/
package com.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsumerBlockingQueue {
private volatile boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public ProdConsumerBlockingQueue(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + " 插入队列" + data + " 成功");
} else {
System.out.println(Thread.currentThread().getName() + " 插入队列" + data + " 失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + " 大老板叫停了,flag=false,生产结束");
}
public void myConsumer() throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + " 超过2s没有取到蛋糕,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + " 消费队列" + result + "成功");
}
}
public void stop() {
flag = false;
}
public static void main(String[] args) {
ProdConsumerBlockingQueue prodConsumerBlockingQueue = new ProdConsumerBlockingQueue(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 生产线程启动");
try {
prodConsumerBlockingQueue.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 消费线程启动");
try {
prodConsumerBlockingQueue.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("5s后main叫停,线程结束");
prodConsumerBlockingQueue.stop();
}
}