1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.protocol.networkhandler;
22
23 import goldengate.common.database.DbSession;
24 import goldengate.common.database.exception.GoldenGateDatabaseNoConnectionException;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27
28 import java.net.BindException;
29 import java.net.SocketAddress;
30
31 import openr66.database.DbConstant;
32 import openr66.protocol.configuration.Configuration;
33 import openr66.protocol.exception.OpenR66Exception;
34 import openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
35 import openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
36 import openr66.protocol.exception.OpenR66ProtocolNetworkException;
37 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
38 import openr66.protocol.exception.OpenR66ProtocolPacketException;
39 import openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
40 import openr66.protocol.exception.OpenR66ProtocolSystemException;
41 import openr66.protocol.localhandler.LocalChannelReference;
42 import openr66.protocol.localhandler.packet.AbstractLocalPacket;
43 import openr66.protocol.localhandler.packet.ConnectionErrorPacket;
44 import openr66.protocol.localhandler.packet.KeepAlivePacket;
45 import openr66.protocol.localhandler.packet.LocalPacketCodec;
46 import openr66.protocol.localhandler.packet.LocalPacketFactory;
47 import openr66.protocol.networkhandler.packet.NetworkPacket;
48 import openr66.protocol.utils.ChannelCloseTimer;
49 import openr66.protocol.utils.ChannelUtils;
50
51 import org.jboss.netty.channel.Channel;
52 import org.jboss.netty.channel.ChannelHandlerContext;
53 import org.jboss.netty.channel.ChannelStateEvent;
54 import org.jboss.netty.channel.Channels;
55 import org.jboss.netty.channel.ExceptionEvent;
56 import org.jboss.netty.channel.MessageEvent;
57 import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
58 import org.jboss.netty.handler.timeout.IdleStateEvent;
59 import org.jboss.netty.handler.timeout.ReadTimeoutException;
60
61
62
63
64
65 public class NetworkServerHandler extends IdleStateAwareChannelHandler {
66
67
68
69
70 private static final GgInternalLogger logger = GgInternalLoggerFactory
71 .getLogger(NetworkServerHandler.class);
72
73
74
75
76 private volatile Channel networkChannel;
77
78
79
80 private volatile SocketAddress remoteAddress;
81
82
83
84
85 protected volatile DbSession dbSession;
86
87
88
89 protected volatile boolean isSSL = false;
90
91
92
93 protected boolean isServer = false;
94
95
96
97 protected volatile boolean keepAlivedSent = false;
98
99
100
101
102 public NetworkServerHandler(boolean isServer) {
103 this.isServer = isServer;
104 }
105
106
107
108
109
110
111
112
113 @Override
114 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
115 if (NetworkTransaction.getNbLocalChannel(e.getChannel()) > 0) {
116 logger.info("Network Channel Closed: {} LocalChannels Left: {}",
117 e.getChannel().getId(),
118 NetworkTransaction.getNbLocalChannel(e.getChannel()));
119
120 try {
121 Thread.sleep(Configuration.WAITFORNETOP);
122 } catch (InterruptedException e1) {
123 }
124 Configuration.configuration.getLocalTransaction()
125 .closeLocalChannelsFromNetworkChannel(e.getChannel());
126 }
127 if (remoteAddress == null) {
128 remoteAddress = e.getChannel().getRemoteAddress();
129 }
130 NetworkTransaction.removeForceNetworkChannel(remoteAddress);
131
132 if (dbSession != null && dbSession.internalId != DbConstant.admin.session.internalId) {
133 dbSession.disconnect();
134 }
135 }
136
137
138
139
140
141
142
143
144
145 @Override
146 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws OpenR66ProtocolNetworkException {
147 this.networkChannel = e.getChannel();
148 this.remoteAddress = this.networkChannel.getRemoteAddress();
149 try {
150 if (DbConstant.admin.isConnected) {
151 this.dbSession = new DbSession(DbConstant.admin, false);
152 }
153 } catch (GoldenGateDatabaseNoConnectionException e1) {
154
155 logger.warn("Use default database connection");
156 this.dbSession = DbConstant.admin.session;
157 }
158 logger.debug("Network Channel Connected: {} ", e.getChannel().getId());
159 }
160
161
162
163
164
165 @Override
166 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
167 throws Exception {
168 if (Configuration.configuration.isShutdown)
169 return;
170 if (keepAlivedSent) {
171 logger.error("Not getting KAlive: closing channel");
172 if (Configuration.configuration.r66Mib != null) {
173 Configuration.configuration.r66Mib.notifyWarning(
174 "KeepAlive get no answer", "Closing network connection");
175 }
176 ChannelCloseTimer.closeFutureChannel(e.getChannel());
177 } else {
178 keepAlivedSent = true;
179 KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
180 NetworkPacket response =
181 new NetworkPacket(ChannelUtils.NOCHANNEL,
182 ChannelUtils.NOCHANNEL, keepAlivePacket);
183 logger.info("Write KAlive");
184 Channels.write(e.getChannel(), response);
185 }
186 }
187
188 public void setKeepAlivedSent() {
189 keepAlivedSent = false;
190 }
191
192
193
194
195
196
197
198
199
200 @Override
201 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
202 final NetworkPacket packet = (NetworkPacket) e.getMessage();
203 if (packet.getCode() == LocalPacketFactory.NOOPPACKET) {
204
205 return;
206 } else if (packet.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
207 logger.debug("NetworkRecv: {}",packet);
208
209 if (packet.getLocalId() == ChannelUtils.NOCHANNEL) {
210
211
212 logger
213 .error("Will close NETWORK channel, Cannot continue connection with remote Host: " +
214 packet.toString() +
215 " : " +
216 e.getChannel().getRemoteAddress());
217 Channels.close(e.getChannel());
218 return;
219 }
220 } else if (packet.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
221 keepAlivedSent = false;
222 try {
223 KeepAlivePacket keepAlivePacket = (KeepAlivePacket)
224 LocalPacketCodec.decodeNetworkPacket(packet.getBuffer());
225 if (keepAlivePacket.isToValidate()) {
226 keepAlivePacket.validate();
227 NetworkPacket response =
228 new NetworkPacket(ChannelUtils.NOCHANNEL,
229 ChannelUtils.NOCHANNEL, keepAlivePacket);
230 logger.info("Answer KAlive");
231 Channels.write(e.getChannel(), response);
232 } else {
233 logger.info("Get KAlive");
234 }
235 } catch (OpenR66ProtocolPacketException e1) {
236 }
237 return;
238 }
239 LocalChannelReference localChannelReference = null;
240 if (packet.getLocalId() == ChannelUtils.NOCHANNEL) {
241 logger.debug("NetworkRecv Create: {} {}",packet,
242 e.getChannel().getId());
243 try {
244 localChannelReference =
245 NetworkTransaction.createConnectionFromNetworkChannelStartup(
246 e.getChannel(), packet);
247 } catch (OpenR66ProtocolSystemException e1) {
248 logger.error("Cannot create LocalChannel for: " + packet+" due to "+ e1.getMessage());
249 final ConnectionErrorPacket error = new ConnectionErrorPacket(
250 "Cannot connect to localChannel since cannot create it", null);
251 writeError(e.getChannel(), packet.getRemoteId(), packet
252 .getLocalId(), error);
253 NetworkTransaction.removeNetworkChannel(e.getChannel(), null, null);
254 return;
255 } catch (OpenR66ProtocolRemoteShutdownException e1) {
256 logger.warn("Will Close Local from Network Channel");
257 Configuration.configuration.getLocalTransaction()
258 .closeLocalChannelsFromNetworkChannel(e.getChannel());
259 Channels.close(e.getChannel());
260
261
262 return;
263 }
264 } else {
265 if (packet.getCode() == LocalPacketFactory.ENDREQUESTPACKET) {
266
267 try {
268 localChannelReference = Configuration.configuration
269 .getLocalTransaction().getClient(packet.getRemoteId(),
270 packet.getLocalId());
271 } catch (OpenR66ProtocolSystemException e1) {
272
273 try {
274 logger.debug("Cannot get LocalChannel while an end of request comes: {}",
275 LocalPacketCodec.decodeNetworkPacket(packet.getBuffer()));
276 } catch (OpenR66ProtocolPacketException e2) {
277 logger.debug("Cannot get LocalChannel while an end of request comes: {}",
278 packet.toString());
279 }
280 return;
281 }
282
283 } else if (packet.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
284
285 try {
286 localChannelReference = Configuration.configuration
287 .getLocalTransaction().getClient(packet.getRemoteId(),
288 packet.getLocalId());
289 } catch (OpenR66ProtocolSystemException e1) {
290
291 try {
292 logger.debug("Cannot get LocalChannel while an external error comes: {}",
293 LocalPacketCodec.decodeNetworkPacket(packet.getBuffer()));
294 } catch (OpenR66ProtocolPacketException e2) {
295 logger.debug("Cannot get LocalChannel while an external error comes: {}",
296 packet.toString());
297 }
298 return;
299 }
300
301 } else {
302 try {
303 localChannelReference = Configuration.configuration
304 .getLocalTransaction().getClient(packet.getRemoteId(),
305 packet.getLocalId());
306 } catch (OpenR66ProtocolSystemException e1) {
307 if (remoteAddress == null) {
308 remoteAddress = e.getChannel().getRemoteAddress();
309 }
310 if (NetworkTransaction.isShuttingdownNetworkChannel(remoteAddress)) {
311
312 return;
313 }
314 logger.debug("Cannot get LocalChannel: " + packet + " due to " +
315 e1.getMessage());
316 final ConnectionErrorPacket error = new ConnectionErrorPacket(
317 "Cannot get localChannel since cannot retrieve it", null);
318 writeError(e.getChannel(), packet.getRemoteId(), packet
319 .getLocalId(), error);
320 return;
321 }
322 }
323 }
324 Channels.write(localChannelReference.getLocalChannel(), packet
325 .getBuffer());
326 }
327
328
329
330
331
332
333
334
335
336 @Override
337 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
338 logger.debug("Network Channel Exception: {}",e.getChannel().getId(), e
339 .getCause());
340 if (e.getCause() instanceof ReadTimeoutException) {
341 ReadTimeoutException exception = (ReadTimeoutException) e.getCause();
342
343 logger.error("ReadTimeout so Will close NETWORK channel {}", exception.getMessage());
344 ChannelCloseTimer.closeFutureChannel(e.getChannel());
345 return;
346 }
347 if (e.getCause() instanceof BindException) {
348
349 logger.debug("BindException");
350 ChannelCloseTimer.closeFutureChannel(e.getChannel());
351 return;
352 }
353 OpenR66Exception exception = OpenR66ExceptionTrappedFactory
354 .getExceptionFromTrappedException(e.getChannel(), e);
355 if (exception != null) {
356 if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
357 if (NetworkTransaction.getNbLocalChannel(e.getChannel()) > 0) {
358 logger.debug(
359 "Network Channel Exception: {} {}", e.getChannel().getId(),
360 exception.getMessage());
361 }
362 logger.debug("Will close NETWORK channel");
363 ChannelCloseTimer.closeFutureChannel(e.getChannel());
364 return;
365 } else if (exception instanceof OpenR66ProtocolNoConnectionException) {
366 logger.debug("Connection impossible with NETWORK channel {}",
367 exception.getMessage());
368 Channels.close(e.getChannel());
369 return;
370 } else {
371 logger.debug(
372 "Network Channel Exception: {} {}", e.getChannel().getId(),
373 exception.getMessage());
374 }
375 final ConnectionErrorPacket errorPacket = new ConnectionErrorPacket(
376 exception.getMessage(), null);
377 writeError(e.getChannel(), ChannelUtils.NOCHANNEL,
378 ChannelUtils.NOCHANNEL, errorPacket);
379 logger.debug("Will close NETWORK channel: {}", exception.getMessage());
380 ChannelCloseTimer.closeFutureChannel(e.getChannel());
381 } else {
382
383 return;
384 }
385 }
386
387
388
389
390
391
392
393
394 void writeError(Channel channel, Integer remoteId, Integer localId,
395 AbstractLocalPacket error) {
396 NetworkPacket networkPacket = null;
397 try {
398 networkPacket = new NetworkPacket(localId, remoteId, error);
399 } catch (OpenR66ProtocolPacketException e) {
400 }
401 try {
402 Channels.write(channel, networkPacket).await();
403 } catch (InterruptedException e) {
404 }
405 }
406
407
408
409
410 public DbSession getDbSession() {
411 return dbSession;
412 }
413
414
415
416
417 public boolean isSsl() {
418 return isSSL;
419 }
420 }