View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.protocol.networkhandler;
22  
23  import static openr66.context.R66FiniteDualStates.AUTHENTR;
24  import goldengate.common.database.DbAdmin;
25  import goldengate.common.digest.FilesystemBasedDigest;
26  import goldengate.common.logging.GgInternalLogger;
27  import goldengate.common.logging.GgInternalLoggerFactory;
28  
29  import java.net.ConnectException;
30  import java.net.SocketAddress;
31  import java.util.Collections;
32  import java.util.SortedSet;
33  import java.util.TreeSet;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import openr66.context.ErrorCode;
41  import openr66.context.R66Result;
42  import openr66.context.R66Session;
43  import openr66.context.task.exception.OpenR66RunnerErrorException;
44  import openr66.protocol.configuration.Configuration;
45  import openr66.protocol.exception.OpenR66Exception;
46  import openr66.protocol.exception.OpenR66ProtocolNetworkException;
47  import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
48  import openr66.protocol.exception.OpenR66ProtocolNoDataException;
49  import openr66.protocol.exception.OpenR66ProtocolNoSslException;
50  import openr66.protocol.exception.OpenR66ProtocolPacketException;
51  import openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
52  import openr66.protocol.exception.OpenR66ProtocolSystemException;
53  import openr66.protocol.localhandler.LocalChannelReference;
54  import openr66.protocol.localhandler.RetrieveRunner;
55  import openr66.protocol.localhandler.packet.AuthentPacket;
56  import openr66.protocol.localhandler.packet.ConnectionErrorPacket;
57  import openr66.protocol.networkhandler.packet.NetworkPacket;
58  import openr66.protocol.networkhandler.ssl.NetworkSslServerHandler;
59  import openr66.protocol.networkhandler.ssl.NetworkSslServerPipelineFactory;
60  import openr66.protocol.utils.ChannelUtils;
61  import openr66.protocol.utils.OpenR66SignalHandler;
62  import openr66.protocol.utils.R66Future;
63  
64  import org.jboss.netty.bootstrap.ClientBootstrap;
65  import org.jboss.netty.channel.Channel;
66  import org.jboss.netty.channel.ChannelFactory;
67  import org.jboss.netty.channel.ChannelFuture;
68  import org.jboss.netty.channel.Channels;
69  import org.jboss.netty.channel.group.ChannelGroup;
70  import org.jboss.netty.channel.group.DefaultChannelGroup;
71  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
72  import org.jboss.netty.util.Timeout;
73  import org.jboss.netty.util.TimerTask;
74  
75  /**
76   * This class handles Network Transaction connections
77   *
78   * @author frederic bregier
79   */
80  public class NetworkTransaction {
81      /**
82       * Internal Logger
83       */
84      private static final GgInternalLogger logger = GgInternalLoggerFactory
85              .getLogger(NetworkTransaction.class);
86  
87      /**
88       * Hashmap for Currently Shutdown remote host
89       */
90      private static final ConcurrentHashMap<Integer, NetworkChannel> networkChannelShutdownOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, NetworkChannel>();
91  
92      /**
93       * Hashmap for currently active remote host
94       */
95      private static final ConcurrentHashMap<Integer, NetworkChannel> networkChannelOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, NetworkChannel>();
96      /**
97       * Hashmap for lock based on remote address
98       */
99      private static final ConcurrentHashMap<Integer, ReentrantLock> reentrantLockOnSocketAddressConcurrentHashMap = new ConcurrentHashMap<Integer, ReentrantLock>();
100     /**
101      * Remote Client NetworkChannels
102      */
103     private static final ConcurrentHashMap<String, ClientNetworkChannels> remoteClients = new ConcurrentHashMap<String, ClientNetworkChannels>();
104     /**
105      * Lock for Client NetworkChannels operations
106      */
107     private static final ReentrantLock lockClient = new ReentrantLock();
108     /**
109      * Hashmap for currently active Retrieve Runner (sender)
110      */
111     private static final ConcurrentHashMap<Integer, RetrieveRunner> retrieveRunnerConcurrentHashMap =
112         new ConcurrentHashMap<Integer, RetrieveRunner>();
113 
114     /**
115      * Lock for NetworkChannel operations
116      */
117     private static final ReentrantLock lock = new ReentrantLock();
118 
119     /**
120      * ExecutorService for RetrieveOperation
121      */
122     private static final ExecutorService retrieveExecutor = Executors
123             .newCachedThreadPool();
124 
125     /**
126      * ExecutorService Server Boss
127      */
128     private final ExecutorService execServerBoss = Executors
129             .newCachedThreadPool();
130 
131     /**
132      * ExecutorService Server Worker
133      */
134     private final ExecutorService execServerWorker = Executors
135             .newCachedThreadPool();
136 
137     private final ChannelFactory channelClientFactory = new NioClientSocketChannelFactory(
138             execServerBoss, execServerWorker,
139             Configuration.configuration.CLIENT_THREAD);
140 
141     private final ClientBootstrap clientBootstrap = new ClientBootstrap(
142             channelClientFactory);
143     private final ClientBootstrap clientSslBootstrap = new ClientBootstrap(
144             channelClientFactory);
145     private final ChannelGroup networkChannelGroup = new DefaultChannelGroup(
146             "NetworkChannels");
147     private final NetworkServerPipelineFactory networkServerPipelineFactory;
148     private final NetworkSslServerPipelineFactory networkSslServerPipelineFactory;
149     
150     public NetworkTransaction() {
151         networkServerPipelineFactory = new NetworkServerPipelineFactory(false);
152         clientBootstrap.setPipelineFactory(networkServerPipelineFactory);
153         clientBootstrap.setOption("tcpNoDelay", true);
154         clientBootstrap.setOption("reuseAddress", true);
155         clientBootstrap.setOption("connectTimeoutMillis",
156                 Configuration.configuration.TIMEOUTCON);
157         if (Configuration.configuration.useSSL && Configuration.configuration.HOST_SSLID != null) {
158             networkSslServerPipelineFactory =
159                 new NetworkSslServerPipelineFactory(true, execServerWorker);
160             clientSslBootstrap.setPipelineFactory(networkSslServerPipelineFactory);
161             clientSslBootstrap.setOption("tcpNoDelay", true);
162             clientSslBootstrap.setOption("reuseAddress", true);
163             clientSslBootstrap.setOption("connectTimeoutMillis", Configuration.configuration.TIMEOUTCON);
164         } else {
165             networkSslServerPipelineFactory = null;
166             logger.warn("No SSL support configured");
167         }
168     }
169     
170     private static ReentrantLock getChannelLock(SocketAddress socketAddress) {
171         lock.lock();
172         try {
173             if (socketAddress == null) {
174                 // should not
175                 logger.info("SocketAddress empty here !");
176                 return lock;
177             }
178             Integer hash = socketAddress.hashCode();
179             ReentrantLock socketLock = reentrantLockOnSocketAddressConcurrentHashMap.get(hash);
180             if (socketLock == null) {
181                 socketLock = new ReentrantLock(true);
182                 reentrantLockOnSocketAddressConcurrentHashMap.put(hash, socketLock);
183             }
184             return socketLock;
185         } finally {
186             lock.unlock();
187         }
188     }
189 
190     private static void removeChannelLock(SocketAddress socketAddress) {
191         lock.lock();
192         try {
193             if (socketAddress == null) {
194                 // should not
195                 logger.info("SocketAddress empty here !");
196                 return;
197             }
198             Integer hash = socketAddress.hashCode();
199             reentrantLockOnSocketAddressConcurrentHashMap.remove(hash);
200         } finally {
201             lock.unlock();
202         }
203     }
204 
205     /**
206      * Create a connection to the specified socketAddress with multiple retries
207      * @param socketAddress
208      * @param isSSL
209      * @param futureRequest
210      * @return the LocalChannelReference
211      */
212     public LocalChannelReference createConnectionWithRetry(SocketAddress socketAddress,
213             boolean isSSL, R66Future futureRequest) {
214         LocalChannelReference localChannelReference = null;
215         OpenR66Exception lastException = null;
216         for (int i = 0; i < Configuration.RETRYNB; i ++) {
217             try {
218                 localChannelReference =
219                         createConnection(socketAddress, isSSL, futureRequest);
220                 break;
221             } catch (OpenR66ProtocolNetworkException e1) {
222                 // Can retry
223                 lastException = e1;
224                 localChannelReference = null;
225                 try {
226                     Thread.sleep(Configuration.WAITFORNETOP);
227                 } catch (InterruptedException e) {
228                     break;
229                 }
230             } catch (OpenR66ProtocolRemoteShutdownException e1) {
231                 lastException = e1;
232                 localChannelReference = null;
233                 break;
234             } catch (OpenR66ProtocolNoConnectionException e1) {
235                 lastException = e1;
236                 localChannelReference = null;
237                 break;
238             }
239         }
240         if (localChannelReference == null) {
241             logger.debug("Cannot connect : {}", lastException.getMessage());
242         } else if (lastException != null) {
243             logger.debug("Connection retried since {}", lastException.getMessage());
244         }
245         return localChannelReference;
246     }
247     /**
248      * Create a connection to the specified socketAddress
249      * @param socketAddress
250      * @param isSSL
251      * @param futureRequest
252      * @return the LocalChannelReference
253      * @throws OpenR66ProtocolNetworkException
254      * @throws OpenR66ProtocolRemoteShutdownException
255      * @throws OpenR66ProtocolNoConnectionException
256      */
257     public LocalChannelReference createConnection(SocketAddress socketAddress, boolean isSSL,
258             R66Future futureRequest)
259             throws OpenR66ProtocolNetworkException,
260             OpenR66ProtocolRemoteShutdownException,
261             OpenR66ProtocolNoConnectionException {
262         NetworkChannel networkChannel = null;
263         LocalChannelReference localChannelReference = null;
264         boolean ok = false;
265         // check valid limit on server side only (could be the initiator but not a client)
266         if (!Configuration.configuration.HOST_AUTH.isClient()) {
267             boolean valid = false;
268             for (int i = 0; i < Configuration.RETRYNB*2; i++) {
269                 if (Configuration.configuration.constraintLimitHandler.checkConstraintsSleep(i)) {
270                     logger.debug("Constraints exceeded: "+i);
271                 } else {
272                     logger.debug("Constraints NOT exceeded");
273                     valid = true;
274                     break;
275                 }
276             }
277             if (!valid) {
278                 // Limit is locally exceeded
279                 logger.debug("Overloaded local system");
280                 throw new OpenR66ProtocolNetworkException(
281                         "Cannot connect to remote server due to local overload");
282             }
283         }
284         try {
285             networkChannel = createNewConnection(socketAddress, isSSL);
286             localChannelReference = createNewClient(networkChannel, futureRequest);
287             ok = true;
288         } finally {
289             if (!ok) {
290                 if (networkChannel != null) {
291                     removeNetworkChannel(networkChannel.channel, null, null);
292                 }
293             }
294         }
295         if (localChannelReference.getFutureValidateStartup().isDone() &&
296                 localChannelReference.getFutureValidateStartup().isSuccess()) {
297             sendValidationConnection(localChannelReference);
298         } else {
299             OpenR66ProtocolNetworkException exc =
300                 new OpenR66ProtocolNetworkException("Startup is invalid");
301             logger.debug("Startup is Invalid", exc);
302             R66Result finalValue = new R66Result(
303                     exc, null, true, ErrorCode.ConnectionImpossible, null);
304             localChannelReference.invalidateRequest(finalValue);
305             Channels.close(localChannelReference.getLocalChannel());
306             throw exc;
307         }
308         return localChannelReference;
309     }
310 
311     /**
312      *
313      * @param socketServerAddress
314      * @param isSSL
315      * @return the NetworkChannel
316      * @throws OpenR66ProtocolNetworkException
317      * @throws OpenR66ProtocolRemoteShutdownException
318      * @throws OpenR66ProtocolNoConnectionException
319      */
320     private NetworkChannel createNewConnection(SocketAddress socketServerAddress, boolean isSSL)
321             throws OpenR66ProtocolNetworkException,
322             OpenR66ProtocolRemoteShutdownException,
323             OpenR66ProtocolNoConnectionException {
324         ReentrantLock socketLock = getChannelLock(socketServerAddress);
325         socketLock.lock();
326         try {
327         if (!isAddressValid(socketServerAddress)) {
328             throw new OpenR66ProtocolRemoteShutdownException(
329                     "Cannot connect to remote server since it is shutting down");
330         }
331         NetworkChannel networkChannel;
332         try {
333             networkChannel = getRemoteChannel(socketServerAddress);
334         } catch (OpenR66ProtocolNoDataException e1) {
335             networkChannel = null;
336         }
337         if (networkChannel != null) {
338             networkChannel.count.incrementAndGet();
339             logger.info("Already Connected: {}", networkChannel);
340             return networkChannel;
341         }
342         ChannelFuture channelFuture = null;
343         for (int i = 0; i < Configuration.RETRYNB; i ++) {
344             if (isSSL) {
345                 if (Configuration.configuration.HOST_SSLID != null) {
346                     channelFuture = clientSslBootstrap.connect(socketServerAddress);
347                 } else {
348                     throw new OpenR66ProtocolNoConnectionException("No SSL support");
349                 }
350             } else {
351                 channelFuture = clientBootstrap.connect(socketServerAddress);
352             }
353             try {
354                 channelFuture.await();
355             } catch (InterruptedException e1) {
356             }
357             if (channelFuture.isSuccess()) {
358                 final Channel channel = channelFuture.getChannel();
359                 if (isSSL) {
360                     if (! NetworkSslServerHandler.isSslConnectedChannel(channel)) {
361                         logger.debug("KO CONNECT since SSL handshake is over");
362                         Channels.close(channel);
363                         throw new OpenR66ProtocolNoConnectionException(
364                                 "Cannot finish connect to remote server");
365                     }
366                 }
367                 networkChannelGroup.add(channel);
368                 return putRemoteChannel(channel);
369             } else {
370                 try {
371                     Thread.sleep(Configuration.WAITFORNETOP);
372                 } catch (InterruptedException e) {
373                 }
374                 if (! channelFuture.isDone()) {
375                     throw new OpenR66ProtocolNoConnectionException(
376                             "Cannot connect to remote server due to interruption");
377                 }
378                 if (channelFuture.getCause() instanceof ConnectException) {
379                     logger.debug("KO CONNECT:" +
380                             channelFuture.getCause().getMessage());
381                     throw new OpenR66ProtocolNoConnectionException(
382                             "Cannot connect to remote server", channelFuture
383                                     .getCause());
384                 } else {
385                     logger.debug("KO CONNECT but retry", channelFuture
386                             .getCause());
387                 }
388             }
389         }
390         throw new OpenR66ProtocolNetworkException(
391                 "Cannot connect to remote server", channelFuture.getCause());
392         } finally {
393             socketLock.unlock();
394         }
395     }
396    /**
397      *
398      * @param channel
399      * @param futureRequest
400      * @return the LocalChannelReference
401      * @throws OpenR66ProtocolNetworkException
402      * @throws OpenR66ProtocolRemoteShutdownException
403      */
404     private LocalChannelReference createNewClient(NetworkChannel networkChannel,
405             R66Future futureRequest)
406             throws OpenR66ProtocolNetworkException,
407             OpenR66ProtocolRemoteShutdownException {
408         if (!networkChannel.channel.isConnected()) {
409             throw new OpenR66ProtocolNetworkException(
410                     "Network channel no more connected");
411         }
412         LocalChannelReference localChannelReference = null;
413         try {
414             localChannelReference = Configuration.configuration
415                 .getLocalTransaction().createNewClient(networkChannel.channel,
416                 ChannelUtils.NOCHANNEL, futureRequest);
417         } catch (OpenR66ProtocolSystemException e) {
418             // check if the channel has other attached local channels
419             // NO since done in caller (createConnection)
420             /*
421             logger.info("Try to Close Network");
422             removeNetworkChannel(networkChannel.channel, null);
423             */
424             throw new OpenR66ProtocolNetworkException(
425                     "Cannot connect to local channel", e);
426         }
427         return localChannelReference;
428     }
429     /**
430      * Create the LocalChannelReference when a remote local channel starts its connection
431      * @param channel
432      * @param packet
433      * @return the LocalChannelReference
434      * @throws OpenR66ProtocolRemoteShutdownException
435      * @throws OpenR66ProtocolSystemException
436      */
437     public static LocalChannelReference createConnectionFromNetworkChannelStartup(Channel channel,
438             NetworkPacket packet)
439     throws OpenR66ProtocolRemoteShutdownException, OpenR66ProtocolSystemException {
440         NetworkTransaction.addNetworkChannel(channel);
441         LocalChannelReference localChannelReference = Configuration.configuration
442             .getLocalTransaction().createNewClient(channel,
443                 packet.getRemoteId(), null);
444         return localChannelReference;
445     }
446     /**
447      * Send a validation of connection with Authentication
448      * @param localChannelReference
449      * @throws OpenR66ProtocolNetworkException
450      * @throws OpenR66ProtocolRemoteShutdownException
451      */
452     private void sendValidationConnection(
453             LocalChannelReference localChannelReference)
454             throws OpenR66ProtocolNetworkException,
455             OpenR66ProtocolRemoteShutdownException {
456         AuthentPacket authent;
457         try {
458             authent = new AuthentPacket(
459                     Configuration.configuration.getHostId(
460                             localChannelReference.getNetworkServerHandler().isSsl()),
461                     FilesystemBasedDigest.passwdCrypt(
462                     Configuration.configuration.HOST_AUTH.getHostkey()),
463                     localChannelReference.getLocalId());
464         } catch (OpenR66ProtocolNoSslException e1) {
465             R66Result finalValue = new R66Result(
466                     new OpenR66ProtocolSystemException("No SSL support", e1),
467                     localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
468             logger.error("Authent is Invalid due to no SSL: {}", e1.getMessage());
469             if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
470                 ConnectionErrorPacket error = new ConnectionErrorPacket(
471                         "Cannot connect to localChannel since no SSL is supported", null);
472                 try {
473                     ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
474                 } catch (OpenR66ProtocolPacketException e) {
475                 }
476             }
477             localChannelReference.invalidateRequest(finalValue);
478             Channels.close(localChannelReference.getLocalChannel());
479             throw new OpenR66ProtocolNetworkException(e1);
480         }
481         logger.debug("Will send request of connection validation");
482         localChannelReference.sessionNewState(AUTHENTR);
483         try {
484             ChannelUtils.writeAbstractLocalPacket(localChannelReference, authent, true);
485         } catch (OpenR66ProtocolPacketException e) {
486             R66Result finalValue = new R66Result(
487                     new OpenR66ProtocolSystemException("Wrong Authent Protocol",e),
488                     localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
489             logger.error("Authent is Invalid due to protocol: {}", e.getMessage());
490             localChannelReference.invalidateRequest(finalValue);
491             if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
492                 ConnectionErrorPacket error = new ConnectionErrorPacket(
493                         "Cannot connect to localChannel since Authent Protocol is invalid", null);
494                 try {
495                     ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
496                 } catch (OpenR66ProtocolPacketException e1) {
497                 }
498             }
499             Channels.close(localChannelReference.getLocalChannel());
500             throw new OpenR66ProtocolNetworkException("Bad packet", e);
501         }
502         R66Future future = localChannelReference.getFutureValidateConnection();
503         if (future.isFailed()) {
504             logger.debug("Will close NETWORK channel since Future cancelled: {}",
505                     future);
506             R66Result finalValue = new R66Result(
507                     new OpenR66ProtocolSystemException("Out of time or Connection invalid during Authentication"),
508                     localChannelReference.getSession(), true, ErrorCode.ConnectionImpossible, null);
509             logger.warn("Authent is Invalid due to: {} {}", finalValue.exception.getMessage(),
510                     future.toString());
511             localChannelReference.invalidateRequest(finalValue);
512             if (localChannelReference.getRemoteId() != ChannelUtils.NOCHANNEL) {
513                 ConnectionErrorPacket error = new ConnectionErrorPacket(
514                         "Cannot connect to localChannel with Out of Time", null);
515                 try {
516                     ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
517                 } catch (OpenR66ProtocolPacketException e) {
518                 }
519             }
520             Channels.close(localChannelReference.getLocalChannel());
521             throw new OpenR66ProtocolNetworkException(
522                     "Cannot validate connection: " + future.getResult(), future
523                             .getCause());
524         }
525     }
526     public static ExecutorService getRetrieveExecutor() {
527         return retrieveExecutor;
528     }
529     public static ConcurrentHashMap<Integer, RetrieveRunner> getRetrieveRunnerConcurrentHashMap() {
530         return retrieveRunnerConcurrentHashMap;
531     }
532     /**
533      * Start retrieve operation
534      * @param session
535      * @param channel
536      */
537     public static void runRetrieve(R66Session session, Channel channel) {
538         RetrieveRunner retrieveRunner = new RetrieveRunner(session, channel);
539         retrieveRunnerConcurrentHashMap.put(session.getLocalChannelReference().getLocalId(),
540                 retrieveRunner);
541         retrieveRunner.setDaemon(true);
542         retrieveExecutor.execute(retrieveRunner);
543     }
544     /**
545      * Stop a retrieve operation
546      * @param localChannelReference
547      */
548     public static void stopRetrieve(LocalChannelReference localChannelReference) {
549         RetrieveRunner retrieveRunner =
550             retrieveRunnerConcurrentHashMap.remove(localChannelReference.getLocalId());
551         if (retrieveRunner != null) {
552             retrieveRunner.stopRunner();
553         }
554     }
555     /**
556      * Normal end of a Retrieve Operation
557      * @param localChannelReference
558      */
559     public static void normalEndRetrieve(LocalChannelReference localChannelReference) {
560         retrieveRunnerConcurrentHashMap.remove(localChannelReference.getLocalId());
561     }
562     /**
563      * Stop all Retrieve Executors
564      */
565     public static void closeRetrieveExecutors() {
566         retrieveExecutor.shutdownNow();
567     }
568     /**
569      * Close all Network Ttransaction
570      */
571     public void closeAll() {
572         logger.debug("close All Network Channels");
573         try {
574             Thread.sleep(Configuration.RETRYINMS*2);
575         } catch (InterruptedException e) {
576         }
577         if (!Configuration.configuration.isServer) {
578             OpenR66SignalHandler.launchFinalExit();
579         }
580         closeRetrieveExecutors();
581         networkChannelGroup.close().awaitUninterruptibly();
582         clientBootstrap.releaseExternalResources();
583         clientSslBootstrap.releaseExternalResources();
584         channelClientFactory.releaseExternalResources();
585         try {
586             Thread.sleep(Configuration.WAITFORNETOP);
587         } catch (InterruptedException e) {
588         }
589         DbAdmin.closeAllConnection();
590         Configuration.configuration.clientStop();
591         logger.debug("Last action before exit");
592         ChannelUtils.stopLogger();
593     }
594     /**
595      *
596      * @param channel
597      * @throws OpenR66ProtocolRemoteShutdownException
598      */
599     public static void addNetworkChannel(Channel channel)
600             throws OpenR66ProtocolRemoteShutdownException {
601         if (!isAddressValid(channel.getRemoteAddress())) {
602             throw new OpenR66ProtocolRemoteShutdownException(
603                     "Channel is already in shutdown");
604         }
605         try {
606             putRemoteChannel(channel);
607         } catch (OpenR66ProtocolNoConnectionException e) {
608             throw new OpenR66ProtocolRemoteShutdownException(
609                 "Channel is already in shutdown");
610         }
611     }
612     /**
613      * Add a LocalChannel to a NetworkChannel
614      * @param networkChannel
615      * @param localChannel
616      * @throws OpenR66ProtocolRemoteShutdownException
617      */
618     public static void addLocalChannelToNetworkChannel(Channel networkChannel,
619             Channel localChannel) throws OpenR66ProtocolRemoteShutdownException {
620         SocketAddress address = networkChannel.getRemoteAddress();
621         if (address != null) {
622             NetworkChannel networkChannelObject = networkChannelOnSocketAddressConcurrentHashMap
623                     .get(address.hashCode());
624             if (networkChannelObject != null) {
625                 networkChannelObject.add(localChannel);
626             }
627         }
628     }
629     /**
630      * Shutdown one Network Channel
631      * @param channel
632      */
633     public static void shuttingdownNetworkChannel(Channel channel) {
634         SocketAddress address = channel.getRemoteAddress();
635         shuttingdownNetworkChannel(address, channel);
636     }
637     /**
638      * Shutdown one Network Channel according to its SocketAddress
639      * @param address
640      * @param channel (can be null)
641      * @return True if the shutdown is starting, False if cannot be done (can be already done before)
642      */
643     public static boolean shuttingdownNetworkChannel(SocketAddress address, Channel channel) {
644         if (address != null) {
645             ReentrantLock socketLock = getChannelLock(address);
646             socketLock.lock();
647             try {
648                 NetworkChannel networkChannel =
649                     networkChannelShutdownOnSocketAddressConcurrentHashMap
650                         .get(address.hashCode());
651                 if (networkChannel != null) {
652                     // already done
653                     logger.debug("Already set as shutdown");
654                     return false;
655                 }
656                 networkChannel = networkChannelOnSocketAddressConcurrentHashMap
657                         .get(address.hashCode());
658                 if (networkChannel != null) {
659                     logger.debug("Set as shutdown");
660                 } else {
661                     if (channel != null) {
662                         logger.debug("Newly Set as shutdown");
663                         networkChannel = new NetworkChannel(channel);
664                     }
665                 }
666                 if (networkChannel != null) {
667                     networkChannelShutdownOnSocketAddressConcurrentHashMap.put(address
668                             .hashCode(), networkChannel);
669                     if (networkChannel.isShuttingDown) {
670                         return false;
671                     }
672                     networkChannel.shutdownAllLocalChannels();
673                     R66ShutdownNetworkChannelTimerTask timerTask = new R66ShutdownNetworkChannelTimerTask(address.hashCode());
674                     Configuration.configuration.getTimerClose().newTimeout(timerTask,
675                             Configuration.configuration.TIMEOUTCON * 3, TimeUnit.MILLISECONDS);
676                     return true;
677                 }
678             } finally {
679                 socketLock.unlock();
680             }
681         }
682         return false;
683     }
684     /**
685      *
686      * @param address
687      * @return True if this channel is currently in shutdown
688      */
689     public static boolean isShuttingdownNetworkChannel(SocketAddress address) {
690         ReentrantLock socketLock = getChannelLock(address);
691         socketLock.lock();
692         try {
693             return !isAddressValid(address);
694         } finally {
695             socketLock.unlock();
696         }
697     }
698     /**
699      * Remove of requester
700      * @param requester
701      * @param networkChannel
702      */
703     public static void removeClient(String requester, NetworkChannel networkChannel) {
704         if (networkChannel != null) {
705             lockClient.lock();
706             try {
707                 ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
708                 if (clientNetworkChannels != null) {
709                     clientNetworkChannels.remove(networkChannel);
710                     if (clientNetworkChannels.isEmpty()) {
711                         remoteClients.remove(requester);
712                     }
713                 }
714             } finally {
715                 lockClient.unlock();
716             }
717         }
718     }
719     /**
720      * Get NetworkChannel as client
721      * @param requester
722      * @return NetworkChannel associated with this host as client (only 1 allow even if more are available)
723      */
724     public static boolean shuttingdownNetworkChannels(String requester) {
725         lockClient.lock();
726         try {
727             ClientNetworkChannels clientNetworkChannels = remoteClients.remove(requester);
728             if (clientNetworkChannels != null) {
729                 return clientNetworkChannels.shutdownAll();
730             }
731         } finally {
732             lockClient.unlock();
733         }
734         return false;
735     }
736     /**
737      * Add a requester channel (so call only by requested host)
738      * @param channel (network channel)
739      * @param requester
740      */
741     public static void addClient(Channel channel, String requester) {
742         SocketAddress address = channel.getRemoteAddress();
743         if (address != null) {
744             NetworkChannel networkChannel =
745                 networkChannelOnSocketAddressConcurrentHashMap.get(address.hashCode());
746             if (networkChannel != null) {
747                 lockClient.lock();
748                 try {
749                     ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
750                     if (clientNetworkChannels == null) {
751                         clientNetworkChannels = new ClientNetworkChannels(requester);
752                         remoteClients.put(requester, clientNetworkChannels);
753                     }
754                     clientNetworkChannels.add(networkChannel);
755                 } finally {
756                     lockClient.unlock();
757                 }
758             }
759         }
760     }
761     /**
762      *
763      * @param requester
764      * @return The number of NetworkChannels associated with this requester
765      */
766     public static int getNumberClients(String requester) {
767         ClientNetworkChannels clientNetworkChannels = remoteClients.get(requester);
768         if (clientNetworkChannels != null) {
769             return clientNetworkChannels.size();
770         }
771         return 0;
772     }
773     /**
774      * Force remove of NetworkChannel when it is closed
775      * @param address
776      * @return the number of still connected Local Channels
777      */
778     public static int removeForceNetworkChannel(SocketAddress address) {
779         ReentrantLock socketLock = getChannelLock(address);
780         socketLock.lock();
781         try {
782             if (address != null) {
783                 NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
784                     .get(address.hashCode());
785                 if (networkChannel != null) {
786                     if (networkChannel.isShuttingDown) {
787                         return networkChannel.count.get();
788                     }
789                     logger.debug("NC left: {}", networkChannel);
790                     int count = networkChannel.count.get();
791                     networkChannel.shutdownAllLocalChannels();
792                     networkChannelOnSocketAddressConcurrentHashMap
793                         .remove(address.hashCode());
794                     return count;
795                 } else {
796                     logger.debug("Network not registered");
797                 }
798             }
799             return 0;
800         } finally {
801             socketLock.unlock();
802             removeChannelLock(address);
803         }
804     }
805     
806     /**
807      * Class to close the Network Channel if after some delays it has really
808      * no Local Channel attached
809      * @author Frederic Bregier
810      *
811      */
812     static class CloseFutureChannel implements TimerTask {
813 
814         private static SortedSet<Integer> inCloseRunning =
815             Collections.synchronizedSortedSet(new TreeSet<Integer>());
816         private NetworkChannel networkChannel;
817         private String requester;
818         private SocketAddress address;
819         
820         /**
821          * @param networkChannel
822          * @param requester
823          * @throws OpenR66RunnerErrorException 
824          */
825         CloseFutureChannel(NetworkChannel networkChannel, SocketAddress address, String requester) 
826             throws OpenR66RunnerErrorException {
827             if (! inCloseRunning.add(networkChannel.channel.getId()))
828                 throw new OpenR66RunnerErrorException("Already scheduled");
829             this.networkChannel = networkChannel;
830             this.requester = requester;
831             this.address = address;
832         }
833 
834         /* (non-Javadoc)
835          * @see org.jboss.netty.util.TimerTask#run(org.jboss.netty.util.Timeout)
836          */
837         @Override
838         public void run(Timeout timeout) throws Exception {
839             ReentrantLock socketLock = getChannelLock(address);
840             socketLock.lock();
841             try {
842                 if (networkChannel.count.get() <= 0) {
843                     long time = networkChannel.lastTimeUsed + 
844                         Configuration.configuration.TIMEOUTCON*2 - 
845                         System.currentTimeMillis();
846                     if (time > Configuration.RETRYINMS) {
847                         // will re execute this request later on
848                         Configuration.configuration.getTimerClose().newTimeout(this, time, TimeUnit.MILLISECONDS);
849                         return;
850                     }
851                     if (requester != null)
852                         NetworkTransaction.removeClient(requester, networkChannel);
853                     networkChannelOnSocketAddressConcurrentHashMap
854                             .remove(address.hashCode());
855                     logger.info("Will close NETWORK channel");
856                     Channels.close(networkChannel.channel);
857                 }
858                 inCloseRunning.remove(networkChannel.channel.getId());
859             } finally {
860                 socketLock.unlock();
861             }
862         }
863         
864     }
865     /**
866     *
867     * @param channel networkChannel
868     * @param localChannel localChannel
869     * @param requester Requester since call from LocalChannel close (might be null)
870     * @return the number of local channel still connected to this channel
871     */
872    public static int removeNetworkChannel(Channel channel, Channel localChannel, 
873            String requester) {
874        SocketAddress address = channel.getRemoteAddress();
875        ReentrantLock socketLock = getChannelLock(address);
876        socketLock.lock();
877        try {
878            if (address != null) {
879                NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
880                        .get(address.hashCode());
881                if (networkChannel != null) {
882                    int count = networkChannel.count.decrementAndGet();
883                    logger.info("Close con: "+networkChannel);
884                    if (localChannel != null) {
885                        networkChannel.remove(localChannel);
886                    }
887                    if (count <= 0) {
888                        CloseFutureChannel cfc;
889                        try {
890                            cfc = new CloseFutureChannel(networkChannel, address, requester);
891                            Configuration.configuration.getTimerClose().
892                                newTimeout(cfc, Configuration.configuration.TIMEOUTCON*2, TimeUnit.MILLISECONDS);
893                        } catch (OpenR66RunnerErrorException e) {
894                        }
895                    }
896                    logger.debug("NC left: {}", networkChannel);
897                    return count;
898                } else {
899                    if (channel.isConnected()) {
900                        logger.debug("Should not be here",
901                                new OpenR66ProtocolSystemException());
902                    }
903                }
904            }
905            return 0;
906        } finally {
907            socketLock.unlock();
908        }
909    }
910     /**
911      *
912      * @param address
913      * @param host
914      * @return True if a connection is still active on this socket or for this host
915      */
916     public static int existConnection(SocketAddress address, String host) {
917         return (networkChannelOnSocketAddressConcurrentHashMap.containsKey(address.hashCode())?1:0)
918             + getNumberClients(host);
919     }
920     /**
921      *
922      * @param channel
923      * @return the number of local channel associated with this channel
924      */
925     public static int getNbLocalChannel(Channel channel) {
926         SocketAddress address = channel.getRemoteAddress();
927         if (address != null) {
928             NetworkChannel networkChannel = networkChannelOnSocketAddressConcurrentHashMap
929                     .get(address.hashCode());
930             if (networkChannel != null) {
931                 return networkChannel.count.get();
932             }
933         }
934         return -1;
935     }
936 
937     /**
938      *
939      * @param address
940      * @return True if this socket Address is currently valid for connection
941      */
942     private static boolean isAddressValid(SocketAddress address) {
943         if (OpenR66SignalHandler.isInShutdown()) {
944             logger.debug("IS IN SHUTDOWN");
945             return false;
946         }
947         if (address == null) {
948             logger.debug("ADDRESS IS NULL");
949             return false;
950         }
951         try {
952             NetworkChannel networkChannel = getRemoteChannel(address);
953             logger.debug("IS IN SHUTDOWN: " + networkChannel.isShuttingDown);
954             return !networkChannel.isShuttingDown;
955         } catch (OpenR66ProtocolRemoteShutdownException e) {
956             logger.debug("ALREADY IN SHUTDOWN");
957             return false;
958         } catch (OpenR66ProtocolNoDataException e) {
959             logger.debug("NOT FOUND SO NO SHUTDOWN");
960             return true;
961         }
962     }
963     /**
964      *
965      * @param channel
966      * @return the associated NetworkChannel if any (Null if none)
967      */
968     public static NetworkChannel getNetworkChannel(Channel channel) {
969         SocketAddress address = channel.getRemoteAddress();
970         if (address != null) {
971             return networkChannelOnSocketAddressConcurrentHashMap.get(address
972                     .hashCode());
973         }
974         return null;
975     }
976     /**
977      *
978      * @param address
979      * @return NetworkChannel
980      * @throws OpenR66ProtocolRemoteShutdownException
981      * @throws OpenR66ProtocolNoDataException
982      */
983     private static NetworkChannel getRemoteChannel(SocketAddress address)
984             throws OpenR66ProtocolRemoteShutdownException,
985             OpenR66ProtocolNoDataException {
986         if (OpenR66SignalHandler.isInShutdown()) {
987             logger.debug("IS IN SHUTDOWN");
988             throw new OpenR66ProtocolRemoteShutdownException(
989                 "Local Host already in shutdown");
990         }
991         if (address == null) {
992             throw new OpenR66ProtocolRemoteShutdownException(
993                     "Remote Host already in shutdown");
994         }
995         int hashCode = address.hashCode();
996         NetworkChannel nc = networkChannelShutdownOnSocketAddressConcurrentHashMap
997                 .get(hashCode);
998         if (nc != null) {
999             throw new OpenR66ProtocolRemoteShutdownException(
1000                     "Remote Host already in shutdown");
1001         }
1002         nc = networkChannelOnSocketAddressConcurrentHashMap.get(hashCode);
1003         if (nc == null) {
1004             throw new OpenR66ProtocolNoDataException("Channel not found");
1005         }
1006         return nc;
1007     }
1008     /**
1009      *
1010      * @param channel
1011      * @return the associated NetworkChannel
1012      * @throws OpenR66ProtocolRemoteShutdownException
1013      * @throws OpenR66ProtocolNoDataException
1014      */
1015     private static NetworkChannel putRemoteChannel(Channel channel)
1016             throws OpenR66ProtocolRemoteShutdownException, OpenR66ProtocolNoConnectionException {
1017         SocketAddress address = channel.getRemoteAddress();
1018         if (address != null) {
1019             NetworkChannel networkChannel;
1020             try {
1021                 networkChannel = getRemoteChannel(address);
1022                 networkChannel.count.incrementAndGet();
1023                 logger.info("NC active: {}", networkChannel);
1024                 return networkChannel;
1025             } catch (OpenR66ProtocolRemoteShutdownException e) {
1026                 throw e;
1027             } catch (OpenR66ProtocolNoDataException e) {
1028                 networkChannel = new NetworkChannel(channel);
1029                 logger.debug("NC new active: {}", networkChannel);
1030                 networkChannelOnSocketAddressConcurrentHashMap.put(address
1031                         .hashCode(), networkChannel);
1032                 return networkChannel;
1033             }
1034         }
1035         throw new OpenR66ProtocolNoConnectionException("Address is not correct");
1036     }
1037 
1038     /**
1039      * Remover of Shutdown Remote Host
1040      *
1041      * @author Frederic Bregier
1042      *
1043      */
1044     private static class R66ShutdownNetworkChannelTimerTask implements TimerTask {
1045         /**
1046          * href to remove
1047          */
1048         private final int href;
1049 
1050         /**
1051          * Constructor from type
1052          *
1053          * @param href
1054          */
1055         public R66ShutdownNetworkChannelTimerTask(int href) {
1056             super();
1057             this.href = href;
1058         }
1059 
1060         /* (non-Javadoc)
1061          * @see org.jboss.netty.util.TimerTask#run(org.jboss.netty.util.Timeout)
1062          */
1063         @Override
1064         public void run(Timeout timeout) throws Exception {
1065             logger.debug("DEBUG: Will remove shutdown for : "+href);
1066             NetworkChannel networkChannel =
1067                 networkChannelShutdownOnSocketAddressConcurrentHashMap.remove(href);
1068             if (networkChannel != null && networkChannel.channel != null
1069                     && networkChannel.channel.isConnected()) {
1070                 Channels.close(networkChannel.channel);
1071             }
1072         }
1073     }
1074 }