forked from variflight/feeyo-redisproxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNIOConnector.java
More file actions
138 lines (120 loc) · 3.38 KB
/
NIOConnector.java
File metadata and controls
138 lines (120 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.feeyo.redis.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* NIO 连接器,用于连接对方Server
*
* @author wuzh
*/
public final class NIOConnector extends Thread {
private static Logger LOGGER = LoggerFactory.getLogger( NIOConnector.class );
private final String name;
private final Selector selector;
private final BlockingQueue<Connection> connectQueue;
private long connectCount;
private final NIOReactorPool reactorPool;
public NIOConnector(String name, NIOReactorPool reactorPool) throws IOException {
super.setName(name);
this.name = name;
this.selector = Selector.open();
this.reactorPool = reactorPool;
this.connectQueue = new LinkedBlockingQueue<Connection>();
}
public long getConnectCount() {
return connectCount;
}
/**
* 添加一个需要异步连接的Connection到队列中,等待连接
*
* @param Connection
*/
public void postConnect(Connection c) {
connectQueue.offer(c);
selector.wakeup();
}
@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++connectCount;
try {
//查看有无连接就绪
selector.select( 1000L );
connect(selector);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key: keys) {
Object att = key.attachment();
if (att != null && key.isValid() && key.isConnectable()) {
finishConnect(key, att);
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Exception e) {
LOGGER.warn(name, e);
}
}
}
private void connect(Selector selector) {
Connection c = null;
while ((c = connectQueue.poll()) != null) {
try {
SocketChannel channel = (SocketChannel) c.getChannel();
//注册OP_CONNECT监听与后端连接是否真正建立
channel.register(selector, SelectionKey.OP_CONNECT, c);
//主动连接
channel.connect(new InetSocketAddress(c.host, c.port));
} catch (Exception e) {
LOGGER.error("error:", e);
c.close("connect failed:" + e.toString());
}
}
}
@SuppressWarnings("unchecked")
private void finishConnect(SelectionKey key, Object att) {
Connection c = (Connection) att;
try {
//做原生NIO连接是否完成的判断和操作
if (finishConnect(c, (SocketChannel) c.channel)) {
clearSelectionKey(key);
//c.setId( ConnectIdGenerator.getINSTNCE().getId() );
//与特定NIOReactor绑定监听读写
NIOReactor reactor = reactorPool.getNextReactor();
reactor.postRegister(c);
}
} catch (Throwable e) {
LOGGER.warn("caught err ",e);
//异常, 将key清空
clearSelectionKey(key);
c.close(e.toString());
c.getHandler().onConnectFailed(c, new Exception(e.getMessage()));
}
}
private boolean finishConnect(Connection c, SocketChannel channel) throws IOException {
if (channel.isConnectionPending()) {
channel.finishConnect();
c.setLocalPort(channel.socket().getLocalPort());
return true;
} else {
return false;
}
}
private void clearSelectionKey(SelectionKey key) {
if (key.isValid()) {
key.attach(null);
key.cancel();
}
}
}