1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.protocol.networkhandler;
22
23 import static openr66.context.R66FiniteDualStates.AUTHENTR;
24 import goldengate.common.database.DbAdmin;
25 import goldengate.common.digest.FilesystemBasedDigest;
26 import goldengate.common.logging.GgInternalLogger;
27 import goldengate.common.logging.GgInternalLoggerFactory;
28
29 import java.net.ConnectException;
30 import java.net.SocketAddress;
31 import java.util.Collections;
32 import java.util.SortedSet;
33 import java.util.TreeSet;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import openr66.context.ErrorCode;
41 import openr66.context.R66Result;
42 import openr66.context.R66Session;
43 import openr66.context.task.exception.OpenR66RunnerErrorException;
44 import openr66.protocol.configuration.Configuration;
45 import openr66.protocol.exception.OpenR66Exception;
46 import openr66.protocol.exception.OpenR66ProtocolNetworkException;
47 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
48 import openr66.protocol.exception.OpenR66ProtocolNoDataException;
49 import openr66.protocol.exception.OpenR66ProtocolNoSslException;
50 import openr66.protocol.exception.OpenR66ProtocolPacketException;
51 import openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
52 import openr66.protocol.exception.OpenR66ProtocolSystemException;
53 import openr66.protocol.localhandler.LocalChannelReference;
54 import openr66.protocol.localhandler.RetrieveRunner;
55 import openr66.protocol.localhandler.packet.AuthentPacket;
56 import openr66.protocol.localhandler.packet.ConnectionErrorPacket;
57 import openr66.protocol.networkhandler.packet.NetworkPacket;
58 import openr66.protocol.networkhandler.ssl.NetworkSslServerHandler;
59 import openr66.protocol.networkhandler.ssl.NetworkSslServerPipelineFactory;
60 import openr66.protocol.utils.ChannelUtils;
61 import openr66.protocol.utils.OpenR66SignalHandler;
62 import openr66.protocol.utils.R66Future;
63
64 import org.jboss.netty.bootstrap.ClientBootstrap;
65 import org.jboss.netty.channel.Channel;
66 import org.jboss.netty.channel.ChannelFactory;
67 import org.jboss.netty.channel.ChannelFuture;
68 import org.jboss.netty.channel.Channels;
69 import org.jboss.netty.channel.group.ChannelGroup;
70 import org.jboss.netty.channel.group.DefaultChannelGroup;
71 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
72 import org.jboss.netty.util.Timeout;
73 import org.jboss.netty.util.TimerTask;
74
75
76
77
78
79
80 public class NetworkTransaction {
81
82
83
84 private static final GgInternalLogger logger = GgInternalLoggerFactory
85 .getLogger(NetworkTransaction.class);
86
87
88
89
90 private static final ConcurrentHashMap<Integer, NetworkChannel> networkChannelShutdownOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, NetworkChannel>();
91
92
93
94
95 private static final ConcurrentHashMap<Integer, NetworkChannel> networkChannelOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, NetworkChannel>();
96
97
98
99 private static final ConcurrentHashMap<Integer, ReentrantLock> reentrantLockOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, ReentrantLock>();
100
101
102
103 private static final ConcurrentHashMap<String, ClientNetworkChannels> remoteClients = new ConcurrentHashMap<String, ClientNetworkChannels>();
104
105
106
107 private static final ReentrantLock lockClient = new ReentrantLock();
108
109
110
111 private static final ConcurrentHashMap<Integer, RetrieveRunner> retrieveRunnerConcurrentHashMap =
112 new ConcurrentHashMap<Integer, RetrieveRunner>();
113
114
115
116
117 private static final ReentrantLock lock = new ReentrantLock();
118
119
120
121
122 private static final ExecutorService retrieveExecutor = Executors
123 .newCachedThreadPool();
124
125
126
127
128 private final ExecutorService execServerBoss = Executors
129 .newCachedThreadPool();
130
131
132
133
134 private final ExecutorService execServerWorker = Executors
135 .newCachedThreadPool();
136
137 private final ChannelFactory channelClientFactory = new NioClientSocketChannelFactory(
138 execServerBoss, execServerWorker,
139 Configuration.configuration.CLIENT_THREAD);
140
141 private final ClientBootstrap clientBootstrap = new ClientBootstrap(
142 channelClientFactory);
143 private final ClientBootstrap clientSslBootstrap = new ClientBootstrap(
144 channelClientFactory);
145 private final ChannelGroup networkChannelGroup = new DefaultChannelGroup(
146 "NetworkChannels");
147 private final NetworkServerPipelineFactory networkServerPipelineFactory;
148 private final NetworkSslServerPipelineFactory networkSslServerPipelineFactory;
149
150 public NetworkTransaction() {
151 networkServerPipelineFactory = new NetworkServerPipelineFactory(false);
152 clientBootstrap.setPipelineFactory(networkServerPipelineFactory);
153 clientBootstrap.setOption("tcpNoDelay", true);
154 clientBootstrap.setOption("reuseAddress", true);
155 clientBootstrap.setOption("connectTimeoutMillis",
156 Configuration.configuration.TIMEOUTCON);
157 if (Configuration.configuration.useSSL && Configuration.configuration.HOST_SSLID != null) {
158 networkSslServerPipelineFactory =
159 new NetworkSslServerPipelineFactory(true, execServerWorker);
160 clientSslBootstrap.setPipelineFactory(networkSslServerPipelineFactory);
161 clientSslBootstrap.setOption("tcpNoDelay", true);
162 clientSslBootstrap.setOption("reuseAddress", true);
163 clientSslBootstrap.setOption("connectTimeoutMillis", Configuration.configuration.TIMEOUTCON);
164 } else {
165 networkSslServerPipelineFactory = null;
166 logger.warn("No SSL support configured");
167 }
168 }
169
170 private static ReentrantLock getChannelLock(SocketAddress socketAddress) {
171 lock.lock();
172 try {
173 if (socketAddress == null) {
174
175 logger.info("SocketAddress empty here !");
176 return lock;
177 }
178 Integer hash = socketAddress.hashCode();
179 ReentrantLock socketLock = reentrantLockOnSocketAddressConcurrentHashMap.get(hash);
180 if (socketLock == null) {
181 socketLock = new ReentrantLock(true);
182 reentrantLockOnSocketAddressConcurrentHashMap.put(hash, socketLock);
183 }
184 return socketLock;
185 } finally {
186 lock.unlock();
187 }
188 }
189
190 private static void removeChannelLock(SocketAddress socketAddress) {
191 lock.lock();
192 try {
193 if (socketAddress == null) {
194
195 logger.info("SocketAddress empty here !");
196 return;
197 }
198 Integer hash = socketAddress.hashCode();
199 reentrantLockOnSocketAddressConcurrentHashMap.remove(hash);
200 } finally {
201 lock.unlock();
202 }
203 }
204
205
206
207
208
209
210
211
212 public LocalChannelReference createConnectionWithRetry(SocketAddress socketAddress,
213 boolean isSSL, R66Future futureRequest) {
214 LocalChannelReference localChannelReference = null;
215 OpenR66Exception lastException = null;
216 for (int i = 0; i < Configuration.RETRYNB; i ++) {
217 try {
218 localChannelReference =
219 createConnection(socketAddress, isSSL, futureRequest);
220 break;
221 } catch (OpenR66ProtocolNetworkException e1) {
222
223 lastException = e1;
224 localChannelReference = null;
225 try {
226 Thread.sleep(Configuration.WAITFORNETOP);
227 } catch (InterruptedException e) {
228 break;
229 }
230 } catch (OpenR66ProtocolRemoteShutdownException e1) {
231 lastException = e1;
232 localChannelReference = null;
233 break;
234 } catch (OpenR66ProtocolNoConnectionException e1) {
235 lastException = e1;
236 localChannelReference = null;
237 break;
238 }
239 }
240 if (localChannelReference == null) {
241 logger.debug("Cannot connect : {}", lastException.getMessage());
242 } else if (lastException != null) {
243 logger.debug("Connection retried since {}", lastException.getMessage());
244 }
245 return localChannelReference;
246 }
247
248
249
250
251
252
253
254
255
256
257 public LocalChannelReference createConnection(SocketAddress socketAddress, boolean isSSL,
258 R66Future futureRequest)
259 throws OpenR66ProtocolNetworkException,
260 OpenR66ProtocolRemoteShutdownException,
261 OpenR66ProtocolNoConnectionException {
262 NetworkChannel networkChannel = null;
263 LocalChannelReference localChannelReference = null;
264 boolean ok = false;
265
266 if (!Configuration.configuration.HOST_AUTH.isClient()) {
267 boolean valid = false;
268 for (int i = 0; i < Configuration.RETRYNB*2; i++) {
269 if (Configuration.configuration.constraintLimitHandler.checkConstraintsSleep(i)) {
270 logger.debug("Constraints exceeded: "+i);
271 } else {
272 logger.debug("Constraints NOT exceeded");
273 valid = true;
274 break;
275 }
276 }
277 if (!valid) {
278
279 logger.debug("Overloaded local system");
280 throw new OpenR66ProtocolNetworkException(
281 "Cannot connect to remote server due to local overload");
282 }
283 }
284 try {
285 networkChannel = createNewConnection(socketAddress, isSSL);
286 localChannelReference = createNewClient(networkChannel, futureRequest);
287 ok = true;
288 } finally {
289 if (!ok) {
290 if (networkChannel != null) {
291 removeNetworkChannel(networkChannel.channel, null, null);
292 }
293 }
294 }
295 if (localChannelReference.getFutureValidateStartup().isDone() &&
296 localChannelReference.getFutureValidateStartup().isSuccess()) {
297 sendValidationConnection(localChannelReference);
298 } else {
299 OpenR66ProtocolNetworkException exc =
300 new OpenR66ProtocolNetworkException("Startup is invalid");
301 logger.debug("Startup is Invalid", exc);
302 R66Result finalValue = new R66Result(
303 exc, null, true, ErrorCode.ConnectionImpossible, null);
304 localChannelReference.invalidateRequest(finalValue);
305 Channels.close(localChannelReference.getLocalChannel());
306 throw exc;
307 }
308 return localChannelReference;
309 }
310
311
312
313
314
315
316
317
318
319
320 private NetworkChannel createNewConnection(SocketAddress socketServerAddress, boolean isSSL)
321 throws OpenR66ProtocolNetworkException,
322 OpenR66ProtocolRemoteShutdownException,
323 OpenR66ProtocolNoConnectionException {
324 ReentrantLock socketLock = getChannelLock(socketServerAddress);
325 socketLock.lock();
326 try {
327 if (!isAddressValid(socketServerAddress)) {
328 throw new OpenR66ProtocolRemoteShutdownException(
329 "Cannot connect to remote server since it is shutting down");
330 }
331 NetworkChannel networkChannel;
332 try {
333 networkChannel = getRemoteChannel(socketServerAddress);
334 } catch (OpenR66ProtocolNoDataException e1) {
335 networkChannel = null;
336 }
337 if (networkChannel != null) {
338 networkChannel.count.incrementAndGet();
339 logger.info("Already Connected: {}", networkChannel);
340 return networkChannel;
341 }
342 ChannelFuture channelFuture = null;
343 for (int i = 0; i < Configuration.RETRYNB; i ++) {
344 if (isSSL) {
345 if (Configuration.configuration.HOST_SSLID != null) {
346 channelFuture = clientSslBootstrap.connect(socketServerAddress);
347 } else {
348 throw new OpenR66ProtocolNoConnectionException("No SSL support");
349 }
350 } else {
351 channelFuture = clientBootstrap.connect(socketServerAddress);
352 }
353 try {
354 channelFuture.await();
355 } catch (InterruptedException e1) {
356 }
357 if (channelFuture.isSuccess()) {
358 final Channel channel = channelFuture.getChannel();
359 if (isSSL) {
360 if (! NetworkSslServerHandler.isSslConnectedChannel(channel)) {
361 logger.debug("KO CONNECT since SSL handshake is over");
362 Channels.close(channel);
363 throw new OpenR66ProtocolNoConnectionException(
364 "Cannot finish connect to remote server");
365 }
366 }
367 networkChannelGroup.add(channel);
368 return putRemoteChannel(channel);
369 } else {
370 try {
371 Thread.sleep(Configuration.WAITFORNETOP);
372 } catch (InterruptedException e) {
373 }
374 if (! channelFuture.isDone()) {
375 throw new OpenR66ProtocolNoConnectionException(
376 "Cannot connect to remote server due to interruption");
377 }
378 if (channelFuture.getCause() instanceof ConnectException) {
379 logger.debug("KO CONNECT:" +
380 channelFuture.getCause().getMessage());
381 throw new OpenR66ProtocolNoConnectionException(
382 "Cannot connect to remote server", channelFuture
383 .getCause());
384 } else {
385 logger.debug("KO CONNECT but retry", channelFuture
386 .getCause());
387 }
388 }
389 }
390 throw new OpenR66ProtocolNetworkException(
391 "Cannot connect to remote server", channelFuture.getCause());
392 } finally {
393 socketLock.unlock();
394 }
395 }
396
397
398
399
400
401
402
403
404 private LocalChannelReference createNewClient(NetworkChannel networkChannel,
405 R66Future futureRequest)
406 throws OpenR66ProtocolNetworkException,
407 OpenR66ProtocolRemoteShutdownException {
408 if (!networkChannel.channel.isConnected()) {
409 throw new OpenR66ProtocolNetworkException(
410 "Network channel no more connected");
411 }
412 LocalChannelReference localChannelReference = null;
413 try {
414 localChannelReference = Configuration.configuration
415 .getLocalTransaction().createNewClient(networkChannel.channel,
416 ChannelUtils.NOCHANNEL, futureRequest);
417 } catch (OpenR66ProtocolSystemException e) {
418
419
420
421
422
423
424 throw new OpenR66ProtocolNetworkException(
425 "Cannot connect to local channel", e);
426 }
427 return localChannelReference;
428 }
429
430
431
432
433
434
435
436
437 public static LocalChannelReference createConnectionFromNetworkChannelStartup(Channel channel,
438 NetworkPacket packet)
439 throws OpenR66ProtocolRemoteShutdownException, OpenR66ProtocolSystemException {
440 NetworkTransaction.addNetworkChannel(channel);
441 LocalChannelReference localChannelReference = Configuration.configuration
442 .getLocalTransaction().createNewClient(channel,
443 packet.getRemoteId(), null);
444 return localChannelReference;
445 }
446
447
448
449
450
451
452 private void sendValidationConnection(
453 LocalChannelReference localChannelReference)
454 throws OpenR66ProtocolNetworkException,
455 OpenR66ProtocolRemoteShutdownException {
456 AuthentPacket authent;
457 try {
458 authent = new AuthentPacket(
459 Configuration.configuration.getHostId(
460 localChannelReference.getNetworkServerHandler().isSsl()),
461 FilesystemBasedDigest.passwdCrypt(
462 Configuration.configuration.HOST_AUTH.getHostkey()),
463 localChannelReference.getLocalId());
464 } catch (OpenR66ProtocolNoSslException e1) {
465 R66Result finalValue = new R66Result(
466 new OpenR66ProtocolSystemException("No SSL support", e1),
467 localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
468 logger.error("Authent is Invalid due to no SSL: {}", e1.getMessage());
469 if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
470 ConnectionErrorPacket error = new ConnectionErrorPacket(
471 "Cannot connect to localChannel since no SSL is supported", null);
472 try {
473 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
474 } catch (OpenR66ProtocolPacketException e) {
475 }
476 }
477 localChannelReference.invalidateRequest(finalValue);
478 Channels.close(localChannelReference.getLocalChannel());
479 throw new OpenR66ProtocolNetworkException(e1);
480 }
481 logger.debug("Will send request of connection validation");
482 localChannelReference.sessionNewState(AUTHENTR);
483 try {
484 ChannelUtils.writeAbstractLocalPacket(localChannelReference, authent, true);
485 } catch (OpenR66ProtocolPacketException e) {
486 R66Result finalValue = new R66Result(
487 new OpenR66ProtocolSystemException("Wrong Authent Protocol",e),
488 localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
489 logger.error("Authent is Invalid due to protocol: {}", e.getMessage());
490 localChannelReference.invalidateRequest(finalValue);
491 if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
492 ConnectionErrorPacket error = new ConnectionErrorPacket(
493 "Cannot connect to localChannel since Authent Protocol is invalid", null);
494 try {
495 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
496 } catch (OpenR66ProtocolPacketException e1) {
497 }
498 }
499 Channels.close(localChannelReference.getLocalChannel());
500 throw new OpenR66ProtocolNetworkException("Bad packet", e);
501 }
502 R66Future future = localChannelReference.getFutureValidateConnection();
503 if (future.isFailed()) {
504 logger.debug("Will close NETWORK channel since Future cancelled: {}",
505 future);
506 R66Result finalValue = new R66Result(
507 new OpenR66ProtocolSystemException("Out of time or Connection invalid during Authentication"),
508 localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
509 logger.warn("Authent is Invalid due to: {} {}", finalValue.exception.getMessage(),
510 future.toString());
511 localChannelReference.invalidateRequest(finalValue);
512 if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
513 ConnectionErrorPacket error = new ConnectionErrorPacket(
514 "Cannot connect to localChannel with Out of Time", null);
515 try {
516 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
517 } catch (OpenR66ProtocolPacketException e) {
518 }
519 }
520 Channels.close(localChannelReference.getLocalChannel());
521 throw new OpenR66ProtocolNetworkException(
522 "Cannot validate connection: " + future.getResult(), future
523 .getCause());
524 }
525 }
526 public static ExecutorService getRetrieveExecutor() {
527 return retrieveExecutor;
528 }
529 public static ConcurrentHashMap<Integer, RetrieveRunner> getRetrieveRunnerConcurrentHashMap() {
530 return retrieveRunnerConcurrentHashMap;
531 }
532
533
534
535
536
537 public static void runRetrieve(R66Session session, Channel channel) {
538 RetrieveRunner retrieveRunner = new RetrieveRunner(session, channel);
539 retrieveRunnerConcurrentHashMap.put(session.getLocalChannelReference().getLocalId(),
540 retrieveRunner);
541 retrieveRunner.setDaemon(true);
542 retrieveExecutor.execute(retrieveRunner);
543 }
544
545
546
547
548 public static void stopRetrieve(LocalChannelReference localChannelReference) {
549 RetrieveRunner retrieveRunner =
550 retrieveRunnerConcurrentHashMap.remove(localChannelReference.getLocalId());
551 if (retrieveRunner != null) {
552 retrieveRunner.stopRunner();
553 }
554 }
555
556
557
558
559 public static void normalEndRetrieve(LocalChannelReference localChannelReference) {
560 retrieveRunnerConcurrentHashMap.remove(localChannelReference.getLocalId());
561 }
562
563
564
565 public static void closeRetrieveExecutors() {
566 retrieveExecutor.shutdownNow();
567 }
568
569
570
571 public void closeAll() {
572 logger.debug("close All Network Channels");
573 try {
574 Thread.sleep(Configuration.RETRYINMS*2);
575 } catch (InterruptedException e) {
576 }
577 if (!Configuration.configuration.isServer) {
578 OpenR66SignalHandler.launchFinalExit();
579 }
580 closeRetrieveExecutors();
581 networkChannelGroup.close().awaitUninterruptibly();
582 clientBootstrap.releaseExternalResources();
583 clientSslBootstrap.releaseExternalResources();
584 channelClientFactory.releaseExternalResources();
585 try {
586 Thread.sleep(Configuration.WAITFORNETOP);
587 } catch (InterruptedException e) {
588 }
589 DbAdmin.closeAllConnection();
590 Configuration.configuration.clientStop();
591 logger.debug("Last action before exit");
592 ChannelUtils.stopLogger();
593 }
594
595
596
597
598
599 public static void addNetworkChannel(Channel channel)
600 throws OpenR66ProtocolRemoteShutdownException {
601 if (!isAddressValid(channel.getRemoteAddress())) {
602 throw new OpenR66ProtocolRemoteShutdownException(
603 "Channel is already in shutdown");
604 }
605 try {
606 putRemoteChannel(channel);
607 } catch (OpenR66ProtocolNoConnectionException e) {
608 throw new OpenR66ProtocolRemoteShutdownException(
609 "Channel is already in shutdown");
610 }
611 }
612
613
614
615
616
617
618 public static void addLocalChannelToNetworkChannel(Channel networkChannel,
619 Channel localChannel) throws OpenR66ProtocolRemoteShutdownException {
620 SocketAddress address = networkChannel.getRemoteAddress();
621 if (address != null) {
622 NetworkChannel networkChannelObject = networkChannelOnSocketAddressConcurrentHashMap
623 .get(address.hashCode());
624 if (networkChannelObject != null) {
625 networkChannelObject.add(localChannel);
626 }
627 }
628 }
629
630
631
632
633 public static void shuttingdownNetworkChannel(Channel channel) {
634 SocketAddress address = channel.getRemoteAddress();
635 shuttingdownNetworkChannel(address, channel);
636 }
637
638
639
640
641
642
643 public static boolean shuttingdownNetworkChannel(SocketAddress address, Channel channel) {
644 if (address != null) {
645 ReentrantLock socketLock = getChannelLock(address);
646 socketLock.lock();
647 try {
648 NetworkChannel networkChannel =
649 networkChannelShutdownOnSocketAddressConcurrentHashMap
650 .get(address.hashCode());
651 if (networkChannel != null) {
652
653 logger.debug("Already set as shutdown");
654 return false;
655 }
656 networkChannel = networkChannelOnSocketAddressConcurrentHashMap
657 .get(address.hashCode());
658 if (networkChannel != null) {
659 logger.debug("Set as shutdown");
660 } else {
661 if (channel != null) {
662 logger.debug("Newly Set as shutdown");
663 networkChannel = new NetworkChannel(channel);
664 }
665 }
666 if (networkChannel != null) {
667 networkChannelShutdownOnSocketAddressConcurrentHashMap.put(address
668 .hashCode(), networkChannel);
669 if (networkChannel.isShuttingDown) {
670 return false;
671 }
672 networkChannel.shutdownAllLocalChannels();
673 R66ShutdownNetworkChannelTimerTask timerTask = new R66ShutdownNetworkChannelTimerTask(address.hashCode());
674 Configuration.configuration.getTimerClose().newTimeout(timerTask,
675 Configuration.configuration.TIMEOUTCON * 3, TimeUnit.MILLISECONDS);
676 return true;
677 }
678 } finally {
679 socketLock.unlock();
680 }
681 }
682 return false;
683 }
684
685
686
687
688
689 public static boolean isShuttingdownNetworkChannel(SocketAddress address) {
690 ReentrantLock socketLock = getChannelLock(address);
691 socketLock.lock();
692 try {
693 return !isAddressValid(address);
694 } finally {
695 socketLock.unlock();
696 }
697 }
698
699
700
701
702
703 public static void removeClient(String requester, NetworkChannel networkChannel) {
704 if (networkChannel != null) {
705 lockClient.lock();
706 try {
707 ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
708 if (clientNetworkChannels != null) {
709 clientNetworkChannels.remove(networkChannel);
710 if (clientNetworkChannels.isEmpty()) {
711 remoteClients.remove(requester);
712 }
713 }
714 } finally {
715 lockClient.unlock();
716 }
717 }
718 }
719
720
721
722
723
724 public static boolean shuttingdownNetworkChannels(String requester) {
725 lockClient.lock();
726 try {
727 ClientNetworkChannels clientNetworkChannels = remoteClients.remove(requester);
728 if (clientNetworkChannels != null) {
729 return clientNetworkChannels.shutdownAll();
730 }
731 } finally {
732 lockClient.unlock();
733 }
734 return false;
735 }
736
737
738
739
740
741 public static void addClient(Channel channel, String requester) {
742 SocketAddress address = channel.getRemoteAddress();
743 if (address != null) {
744 NetworkChannel networkChannel =
745 networkChannelOnSocketAddressConcurrentHashMap.get(address.hashCode());
746 if (networkChannel != null) {
747 lockClient.lock();
748 try {
749 ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
750 if (clientNetworkChannels == null) {
751 clientNetworkChannels = new ClientNetworkChannels(requester);
752 remoteClients.put(requester, clientNetworkChannels);
753 }
754 clientNetworkChannels.add(networkChannel);
755 } finally {
756 lockClient.unlock();
757 }
758 }
759 }
760 }
761
762
763
764
765
766 public static int getNumberClients(String requester) {
767 ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
768 if (clientNetworkChannels != null) {
769 return clientNetworkChannels.size();
770 }
771 return 0;
772 }
773
774
775
776
777
778 public static int removeForceNetworkChannel(SocketAddress address) {
779 ReentrantLock socketLock = getChannelLock(address);
780 socketLock.lock();
781 try {
782 if (address != null) {
783 NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
784 .get(address.hashCode());
785 if (networkChannel != null) {
786 if (networkChannel.isShuttingDown) {
787 return networkChannel.count.get();
788 }
789 logger.debug("NC left: {}", networkChannel);
790 int count = networkChannel.count.get();
791 networkChannel.shutdownAllLocalChannels();
792 networkChannelOnSocketAddressConcurrentHashMap
793 .remove(address.hashCode());
794 return count;
795 } else {
796 logger.debug("Network not registered");
797 }
798 }
799 return 0;
800 } finally {
801 socketLock.unlock();
802 removeChannelLock(address);
803 }
804 }
805
806
807
808
809
810
811
812 static class CloseFutureChannel implements TimerTask {
813
814 private static SortedSet<Integer> inCloseRunning =
815 Collections.synchronizedSortedSet(new TreeSet<Integer>());
816 private NetworkChannel networkChannel;
817 private String requester;
818 private SocketAddress address;
819
820
821
822
823
824
825 CloseFutureChannel(NetworkChannel networkChannel, SocketAddress address, String requester)
826 throws OpenR66RunnerErrorException {
827 if (! inCloseRunning.add(networkChannel.channel.getId()))
828 throw new OpenR66RunnerErrorException("Already scheduled");
829 this.networkChannel = networkChannel;
830 this.requester = requester;
831 this.address = address;
832 }
833
834
835
836
837 @Override
838 public void run(Timeout timeout) throws Exception {
839 ReentrantLock socketLock = getChannelLock(address);
840 socketLock.lock();
841 try {
842 if (networkChannel.count.get() <= 0) {
843 long time = networkChannel.lastTimeUsed +
844 Configuration.configuration.TIMEOUTCON*2 -
845 System.currentTimeMillis();
846 if (time > Configuration.RETRYINMS) {
847
848 Configuration.configuration.getTimerClose().newTimeout(this, time, TimeUnit.MILLISECONDS);
849 return;
850 }
851 if (requester != null)
852 NetworkTransaction.removeClient(requester, networkChannel);
853 networkChannelOnSocketAddressConcurrentHashMap
854 .remove(address.hashCode());
855 logger.info("Will close NETWORK channel");
856 Channels.close(networkChannel.channel);
857 }
858 inCloseRunning.remove(networkChannel.channel.getId());
859 } finally {
860 socketLock.unlock();
861 }
862 }
863
864 }
865
866
867
868
869
870
871
872 public static int removeNetworkChannel(Channel channel, Channel localChannel,
873 String requester) {
874 SocketAddress address = channel.getRemoteAddress();
875 ReentrantLock socketLock = getChannelLock(address);
876 socketLock.lock();
877 try {
878 if (address != null) {
879 NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
880 .get(address.hashCode());
881 if (networkChannel != null) {
882 int count = networkChannel.count.decrementAndGet();
883 logger.info("Close con: "+networkChannel);
884 if (localChannel != null) {
885 networkChannel.remove(localChannel);
886 }
887 if (count <= 0) {
888 CloseFutureChannel cfc;
889 try {
890 cfc = new CloseFutureChannel(networkChannel, address, requester);
891 Configuration.configuration.getTimerClose().
892 newTimeout(cfc, Configuration.configuration.TIMEOUTCON*2, TimeUnit.MILLISECONDS);
893 } catch (OpenR66RunnerErrorException e) {
894 }
895 }
896 logger.debug("NC left: {}", networkChannel);
897 return count;
898 } else {
899 if (channel.isConnected()) {
900 logger.debug("Should not be here",
901 new OpenR66ProtocolSystemException());
902 }
903 }
904 }
905 return 0;
906 } finally {
907 socketLock.unlock();
908 }
909 }
910
911
912
913
914
915
916 public static int existConnection(SocketAddress address, String host) {
917 return (networkChannelOnSocketAddressConcurrentHashMap.containsKey(address.hashCode())?1:0)
918 + getNumberClients(host);
919 }
920
921
922
923
924
925 public static int getNbLocalChannel(Channel channel) {
926 SocketAddress address = channel.getRemoteAddress();
927 if (address != null) {
928 NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
929 .get(address.hashCode());
930 if (networkChannel != null) {
931 return networkChannel.count.get();
932 }
933 }
934 return -1;
935 }
936
937
938
939
940
941
942 private static boolean isAddressValid(SocketAddress address) {
943 if (OpenR66SignalHandler.isInShutdown()) {
944 logger.debug("IS IN SHUTDOWN");
945 return false;
946 }
947 if (address == null) {
948 logger.debug("ADDRESS IS NULL");
949 return false;
950 }
951 try {
952 NetworkChannel networkChannel = getRemoteChannel(address);
953 logger.debug("IS IN SHUTDOWN: " + networkChannel.isShuttingDown);
954 return !networkChannel.isShuttingDown;
955 } catch (OpenR66ProtocolRemoteShutdownException e) {
956 logger.debug("ALREADY IN SHUTDOWN");
957 return false;
958 } catch (OpenR66ProtocolNoDataException e) {
959 logger.debug("NOT FOUND SO NO SHUTDOWN");
960 return true;
961 }
962 }
963
964
965
966
967
968 public static NetworkChannel getNetworkChannel(Channel channel) {
969 SocketAddress address = channel.getRemoteAddress();
970 if (address != null) {
971 return networkChannelOnSocketAddressConcurrentHashMap.get(address
972 .hashCode());
973 }
974 return null;
975 }
976
977
978
979
980
981
982
983 private static NetworkChannel getRemoteChannel(SocketAddress address)
984 throws OpenR66ProtocolRemoteShutdownException,
985 OpenR66ProtocolNoDataException {
986 if (OpenR66SignalHandler.isInShutdown()) {
987 logger.debug("IS IN SHUTDOWN");
988 throw new OpenR66ProtocolRemoteShutdownException(
989 "Local Host already in shutdown");
990 }
991 if (address == null) {
992 throw new OpenR66ProtocolRemoteShutdownException(
993 "Remote Host already in shutdown");
994 }
995 int hashCode = address.hashCode();
996 NetworkChannel nc = networkChannelShutdownOnSocketAddressConcurrentHashMap
997 .get(hashCode);
998 if (nc != null) {
999 throw new OpenR66ProtocolRemoteShutdownException(
1000 "Remote Host already in shutdown");
1001 }
1002 nc = networkChannelOnSocketAddressConcurrentHashMap.get(hashCode);
1003 if (nc == null) {
1004 throw new OpenR66ProtocolNoDataException("Channel not found");
1005 }
1006 return nc;
1007 }
1008
1009
1010
1011
1012
1013
1014
1015 private static NetworkChannel putRemoteChannel(Channel channel)
1016 throws OpenR66ProtocolRemoteShutdownException, OpenR66ProtocolNoConnectionException {
1017 SocketAddress address = channel.getRemoteAddress();
1018 if (address != null) {
1019 NetworkChannel networkChannel;
1020 try {
1021 networkChannel = getRemoteChannel(address);
1022 networkChannel.count.incrementAndGet();
1023 logger.info("NC active: {}", networkChannel);
1024 return networkChannel;
1025 } catch (OpenR66ProtocolRemoteShutdownException e) {
1026 throw e;
1027 } catch (OpenR66ProtocolNoDataException e) {
1028 networkChannel = new NetworkChannel(channel);
1029 logger.debug("NC new active: {}", networkChannel);
1030 networkChannelOnSocketAddressConcurrentHashMap.put(address
1031 .hashCode(), networkChannel);
1032 return networkChannel;
1033 }
1034 }
1035 throw new OpenR66ProtocolNoConnectionException("Address is not correct");
1036 }
1037
1038
1039
1040
1041
1042
1043
1044 private static class R66ShutdownNetworkChannelTimerTask implements TimerTask {
1045
1046
1047
1048 private final int href;
1049
1050
1051
1052
1053
1054
1055 public R66ShutdownNetworkChannelTimerTask(int href) {
1056 super();
1057 this.href = href;
1058 }
1059
1060
1061
1062
1063 @Override
1064 public void run(Timeout timeout) throws Exception {
1065 logger.debug("DEBUG: Will remove shutdown for : "+href);
1066 NetworkChannel networkChannel =
1067 networkChannelShutdownOnSocketAddressConcurrentHashMap.remove(href);
1068 if (networkChannel != null && networkChannel.channel != null
1069 && networkChannel.channel.isConnected()) {
1070 Channels.close(networkChannel.channel);
1071 }
1072 }
1073 }
1074 }