forked from variflight/feeyo-redisproxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNIOReactor.java
More file actions
169 lines (133 loc) · 3.72 KB
/
NIOReactor.java
File metadata and controls
169 lines (133 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.feeyo.redis.nio;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 网络事件反应器
*
* @author wuzh
* @author zhuam
*/
public final class NIOReactor {
private static Logger LOGGER = LoggerFactory.getLogger( NIOReactor.class );
private final String name;
private final RW reactorR;
public NIOReactor(String name) throws IOException {
this.name = name;
this.reactorR = new RW();
}
public String getName() {
return name;
}
final void startup() {
new Thread(reactorR, name + "-RW").start();
}
final void postRegister(Connection c) {
c.setReactor( this.name );
reactorR.registerQueue.offer(c);
reactorR.selector.wakeup();
}
final Queue<Connection> getRegisterQueue() {
return reactorR.registerQueue;
}
final long getReactCount() {
return reactorR.reactCount;
}
// IO/RW 线程
private final class RW implements Runnable {
private final Selector selector;
private final ConcurrentLinkedQueue<Connection> registerQueue;
private long reactCount;
private RW() throws IOException {
this.selector = Selector.open();
this.registerQueue = new ConcurrentLinkedQueue<Connection>();
}
@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
for (;;) {
++reactCount;
try {
// 查看有无连接就绪
selector.select(500L);
// 处理注册队列
register(selector);
keys = selector.selectedKeys();
for (final SelectionKey key : keys) {
Connection con = null;
try {
Object att = key.attachment();
if ( att != null ) {
con = (Connection) att;
// 处理读
if (key.isValid() && key.isReadable()) {
try {
con.asynRead();
} catch (IOException e) {
con.close("program err:" + e.toString());
continue;
} catch (Exception e) {
LOGGER.warn("caught err:", e);
con.close("program err:" + e.toString());
continue;
}
}
// 处理写
if (key.isValid() && key.isWritable()) {
con.doNextWriteCheck();
}
} else {
key.cancel();
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} catch (Exception e) {
LOGGER.warn(con + " " + e);
} catch (final Throwable e) {
// Catch exceptions such as OOM and close connection if exists
//so that the reactor can keep running!
if (con != null) {
con.close("Bad: " + e);
}
LOGGER.error("caught err: ", e);
continue;
}
}
} catch (Exception e) {
LOGGER.error(name, e);
} catch (Throwable e) {
// Catch exceptions such as OOM so that the reactor can keep running!
LOGGER.error(name +" caught err: ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}
}
// 注册 IO 读写事件
private void register(Selector selector) {
if ( registerQueue.isEmpty() ) {
return;
}
Connection c = null;
while ((c = registerQueue.poll()) != null) {
try {
c.register(selector);
} catch (Exception e) {
LOGGER.warn("register error ", e);
c.close("register err");
}
}
}
}
}