forked from apache/cloudstack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAgentMonitor.java
More file actions
executable file
·289 lines (253 loc) · 10.8 KB
/
AgentMonitor.java
File metadata and controls
executable file
·289 lines (253 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent.manager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.alert.AlertManager;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.HostPodVO;
import com.cloud.dc.dao.ClusterDao;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.dc.dao.HostPodDao;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.Status.Event;
import com.cloud.host.dao.HostDao;
import com.cloud.resource.ResourceManager;
import com.cloud.resource.ResourceState;
import com.cloud.utils.component.Inject;
import com.cloud.utils.db.ConnectionConcierge;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.SearchCriteria2;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.SearchCriteriaService;
import com.cloud.utils.time.InaccurateClock;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.dao.VMInstanceDao;
public class AgentMonitor extends Thread implements Listener {
private static Logger s_logger = Logger.getLogger(AgentMonitor.class);
private static Logger status_Logger = Logger.getLogger(Status.class);
private long _pingTimeout;
private HostDao _hostDao;
private boolean _stop;
private AgentManagerImpl _agentMgr;
private VMInstanceDao _vmDao;
private DataCenterDao _dcDao = null;
private HostPodDao _podDao = null;
private AlertManager _alertMgr;
private long _msId;
private ConnectionConcierge _concierge;
@Inject
ClusterDao _clusterDao;
@Inject
ResourceManager _resourceMgr;
// private ConnectionConcierge _concierge;
private Map<Long, Long> _pingMap;
protected AgentMonitor() {
}
public AgentMonitor(long msId, HostDao hostDao, VMInstanceDao vmDao, DataCenterDao dcDao, HostPodDao podDao, AgentManagerImpl agentMgr, AlertManager alertMgr, long pingTimeout) {
super("AgentMonitor");
_msId = msId;
_pingTimeout = pingTimeout;
_hostDao = hostDao;
_agentMgr = agentMgr;
_stop = false;
_vmDao = vmDao;
_dcDao = dcDao;
_podDao = podDao;
_alertMgr = alertMgr;
_pingMap = new ConcurrentHashMap<Long, Long>(10007);
// try {
// Connection conn = Transaction.getStandaloneConnectionWithException();
// conn.setAutoCommit(true);
// conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
// _concierge = new ConnectionConcierge("AgentMonitor", conn, true);
// } catch (SQLException e) {
// throw new CloudRuntimeException("Unable to get a db connection", e);
// }
}
/**
* Check if the agent is behind on ping
*
* @param agentId
* agent or host id.
* @return null if the agent is not kept here. true if behind; false if not.
*/
public Boolean isAgentBehindOnPing(long agentId) {
Long pingTime = _pingMap.get(agentId);
if (pingTime == null) {
return null;
}
return pingTime < (InaccurateClock.getTimeInSeconds() - _pingTimeout);
}
public Long getAgentPingTime(long agentId) {
return _pingMap.get(agentId);
}
public void pingBy(long agentId) {
_pingMap.put(agentId, InaccurateClock.getTimeInSeconds());
}
// TODO : use host machine time is not safe in clustering environment
@Override
public void run() {
s_logger.info("Agent Monitor is started.");
while (!_stop) {
try {
// check every 60 seconds
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
s_logger.info("Who woke me from my slumber?");
}
try {
List<Long> behindAgents = findAgentsBehindOnPing();
for (Long agentId : behindAgents) {
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getId(), Op.EQ, agentId);
HostVO h = sc.find();
ResourceState resourceState = h.getResourceState();
if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance || resourceState == ResourceState.ErrorInMaintenance) {
/* Host is in non-operation state, so no investigation and direct put agent to Disconnected */
status_Logger.debug("Ping timeout but host " + agentId + " is in resource state of " + resourceState + ", so no investigation");
_agentMgr.disconnectWithoutInvestigation(agentId, Event.ShutdownRequested);
} else {
status_Logger.debug("Ping timeout for host " + agentId + ", do invstigation");
_agentMgr.disconnectWithInvestigation(agentId, Event.PingTimeout);
}
}
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getResourceState(), Op.IN, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
List<HostVO> hosts = sc.list();
for (HostVO host : hosts) {
long hostId = host.getId();
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + " (id:" + hostId + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
if (host.getType() != Host.Type.Storage) {
List<VMInstanceVO> vos = _vmDao.listByHostId(hostId);
List<VMInstanceVO> vosMigrating = _vmDao.listVmsMigratingFromHost(hostId);
if (vos.isEmpty() && vosMigrating.isEmpty()) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host [" + hostDesc + "] is ready for maintenance");
_resourceMgr.resourceStateTransitTo(host, ResourceState.Event.InternalEnterMaintenance, _msId);
}
}
}
} catch (Throwable th) {
s_logger.error("Caught the following exception: ", th);
}
}
s_logger.info("Agent Monitor is leaving the building!");
}
public void signalStop() {
_stop = true;
interrupt();
}
@Override
public boolean isRecurring() {
return true;
}
@Override
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
return false;
}
@Override @DB
public boolean processCommands(long agentId, long seq, Command[] commands) {
boolean processed = false;
for (Command cmd : commands) {
if (cmd instanceof PingCommand) {
pingBy(agentId);
}
}
return processed;
}
protected List<Long> findAgentsBehindOnPing() {
List<Long> agentsBehind = new ArrayList<Long>();
long cutoffTime = InaccurateClock.getTimeInSeconds() - _pingTimeout;
for (Map.Entry<Long, Long> entry : _pingMap.entrySet()) {
if (entry.getValue() < cutoffTime) {
agentsBehind.add(entry.getKey());
}
}
if (agentsBehind.size() > 0) {
s_logger.info("Found the following agents behind on ping: " + agentsBehind);
}
return agentsBehind;
}
/**
* @deprecated We're using the in-memory
*/
@Deprecated
protected List<HostVO> findHostsBehindOnPing() {
long time = (System.currentTimeMillis() >> 10) - _pingTimeout;
List<HostVO> hosts = _hostDao.findLostHosts(time);
if (s_logger.isInfoEnabled()) {
s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout +
", mark time : " + time);
}
for (HostVO host : hosts) {
if (host.getType().equals(Host.Type.ExternalFirewall) ||
host.getType().equals(Host.Type.ExternalLoadBalancer) ||
host.getType().equals(Host.Type.TrafficMonitor) ||
host.getType().equals(Host.Type.SecondaryStorage)) {
continue;
}
if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Asking agent mgr to investgate why host " + host.getId() +
" is behind on ping. last ping time: " + host.getLastPinged());
}
_agentMgr.disconnectWithInvestigation(host.getId(), Event.PingTimeout);
}
}
return hosts;
}
@Override
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
return null;
}
@Override
public void processConnect(HostVO host, StartupCommand cmd, boolean forRebalance) {
if (host.getType().equals(Host.Type.TrafficMonitor) ||
host.getType().equals(Host.Type.SecondaryStorage)) {
return;
}
// NOTE: We don't use pingBy here because we're initiating.
_pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds());
}
@Override
public boolean processDisconnect(long agentId, Status state) {
_pingMap.remove(agentId);
return true;
}
@Override
public boolean processTimeout(long agentId, long seq) {
return true;
}
@Override
public int getTimeout() {
return -1;
}
}