View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.utils;
22  
23  import goldengate.common.logging.GgInternalLogger;
24  import goldengate.common.logging.GgInternalLoggerFactory;
25  import goldengate.common.logging.GgSlf4JLoggerFactory;
26  import goldengate.ftp.core.config.FtpConfiguration;
27  
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.UnknownHostException;
31  import java.util.Iterator;
32  import java.util.Timer;
33  
34  import org.jboss.netty.channel.Channel;
35  import org.jboss.netty.channel.ChannelFactory;
36  import org.jboss.netty.channel.Channels;
37  import org.jboss.netty.channel.group.ChannelGroupFuture;
38  import org.jboss.netty.channel.group.ChannelGroupFutureListener;
39  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
40  import org.slf4j.LoggerFactory;
41  
42  import ch.qos.logback.classic.LoggerContext;
43  
44  /**
45   * Some useful functions related to Channel of Netty
46   *
47   * @author Frederic Bregier
48   *
49   */
50  public class FtpChannelUtils implements Runnable {
51      /**
52       * Internal Logger
53       */
54      private static final GgInternalLogger logger = GgInternalLoggerFactory
55              .getLogger(FtpChannelUtils.class);
56  
57      /**
58       * Get the Remote InetAddress
59       *
60       * @param channel
61       * @return the remote InetAddress
62       */
63      public static InetAddress getRemoteInetAddress(Channel channel) {
64          InetSocketAddress socketAddress = (InetSocketAddress) channel
65                  .getRemoteAddress();
66          if (socketAddress == null) {
67              socketAddress = new InetSocketAddress(20);
68          }
69          return socketAddress.getAddress();
70      }
71  
72      /**
73       * Get the Local InetAddress
74       *
75       * @param channel
76       * @return the local InetAddress
77       */
78      public static InetAddress getLocalInetAddress(Channel channel) {
79          InetSocketAddress socketAddress = (InetSocketAddress) channel
80                  .getLocalAddress();
81          return socketAddress.getAddress();
82      }
83  
84      /**
85       * Get the Remote InetSocketAddress
86       *
87       * @param channel
88       * @return the remote InetSocketAddress
89       */
90      public static InetSocketAddress getRemoteInetSocketAddress(Channel channel) {
91          return (InetSocketAddress) channel.getRemoteAddress();
92      }
93  
94      /**
95       * Get the Local InetSocketAddress
96       *
97       * @param channel
98       * @return the local InetSocketAddress
99       */
100     public static InetSocketAddress getLocalInetSocketAddress(Channel channel) {
101         return (InetSocketAddress) channel.getLocalAddress();
102     }
103 
104     /**
105      * Get the InetSocketAddress corresponding to the FTP format of address
106      *
107      * @param arg
108      * @return the InetSocketAddress or null if an error occurs
109      */
110     public static InetSocketAddress getInetSocketAddress(String arg) {
111         String[] elements = arg.split(",");
112         if (elements.length != 6) {
113             return null;
114         }
115         byte[] address = new byte[4];
116         int[] iElements = new int[6];
117         for (int i = 0; i < 6; i ++) {
118             try {
119                 iElements[i] = Integer.parseInt(elements[i]);
120             } catch (NumberFormatException e) {
121                 return null;
122             }
123             if (iElements[i] < 0 || iElements[i] > 255) {
124                 return null;
125             }
126         }
127         for (int i = 0; i < 4; i ++) {
128             address[i] = (byte) iElements[i];
129         }
130         int port = iElements[4] << 8 | iElements[5];
131         InetAddress inetAddress;
132         try {
133             inetAddress = InetAddress.getByAddress(address);
134         } catch (UnknownHostException e) {
135             return null;
136         }
137         return new InetSocketAddress(inetAddress, port);
138     }
139 
140     /**
141      * Return the Address in the format compatible with FTP argument
142      *
143      * @param address
144      * @return the String representation of the address
145      */
146     public static String getAddress(InetSocketAddress address) {
147         InetAddress servAddr = address.getAddress();
148         int servPort = address.getPort();
149         return servAddr.getHostAddress().replace('.', ',') + ',' +
150                 (servPort >> 8) + ',' + (servPort & 0xFF);
151     }
152 
153     /**
154      * Get the (RFC2428) InetSocketAddress corresponding to the FTP format of
155      * address (RFC2428)
156      *
157      * @param arg
158      * @return the InetSocketAddress or null if an error occurs
159      */
160     public static InetSocketAddress get2428InetSocketAddress(String arg) {
161         // Format: #a#net-addr#tcp-port# where a = 1 IPV4 or 2 IPV6, other will
162         // not be supported
163         if (arg == null || arg.length() == 0) {
164             // bad args
165             return null;
166         }
167         String delim = arg.substring(0, 1);
168         String[] infos = arg.split(delim);
169         if (infos.length != 3) {
170             // bad format
171             return null;
172         }
173         boolean isIPV4 = true;
174         if (infos[0].equals("1")) {
175             isIPV4 = true;
176         } else if (infos[0].equals("2")) {
177             isIPV4 = false;
178         } else {
179             // not supported
180             return null;
181         }
182         byte[] address = null;
183         if (isIPV4) {
184             // IPV4
185             address = new byte[4];
186             String[] elements = infos[1].split(".");
187             if (elements.length != 4) {
188                 return null;
189             }
190             for (int i = 0; i < 4; i ++) {
191                 try {
192                     address[i] = (byte) Integer.parseInt(elements[i]);
193                 } catch (NumberFormatException e) {
194                     return null;
195                 }
196             }
197         } else {
198             // IPV6
199             address = new byte[16];
200             int[] value = new int[8];
201             String[] elements = infos[1].split(":");
202             if (elements.length != 8) {
203                 return null;
204             }
205             for (int i = 0, j = 0; i < 8; i ++) {
206                 if (elements[i] == null || elements[i].length() == 0) {
207                     value[i] = 0;
208                 } else {
209                     try {
210                         value[i] = Integer.parseInt(elements[i]);
211                     } catch (NumberFormatException e) {
212                         return null;
213                     }
214                 }
215                 address[j ++] = (byte) (value[i] >> 8);
216                 address[j ++] = (byte) (value[i] & 0xFF);
217             }
218         }
219         int port = 0;
220         try {
221             port = Integer.parseInt(infos[2]);
222         } catch (NumberFormatException e) {
223             return null;
224         }
225         InetAddress inetAddress;
226         try {
227             inetAddress = InetAddress.getByAddress(address);
228         } catch (UnknownHostException e) {
229             return null;
230         }
231         return new InetSocketAddress(inetAddress, port);
232     }
233 
234     /**
235      * Return the (RFC2428) Address in the format compatible with FTP (RFC2428)
236      *
237      * @param address
238      * @return the String representation of the address
239      */
240     public static String get2428Address(InetSocketAddress address) {
241         InetAddress servAddr = address.getAddress();
242         int servPort = address.getPort();
243         StringBuilder builder = new StringBuilder();
244         String hostaddress = servAddr.getHostAddress();
245         builder.append('|');
246         if (hostaddress.contains(":")) {
247             builder.append('2'); // IPV6
248         } else {
249             builder.append('1'); // IPV4
250         }
251         builder.append('|');
252         builder.append(hostaddress);
253         builder.append('|');
254         builder.append(servPort);
255         builder.append('|');
256         return builder.toString();
257     }
258 
259     /**
260      * Finalize resources attached to Control or Data handlers
261      *
262      * @author Frederic Bregier
263      *
264      */
265     private static class FtpChannelGroupFutureListener implements
266             ChannelGroupFutureListener {
267         OrderedMemoryAwareThreadPoolExecutor pool;
268 
269         ChannelFactory channelFactory;
270 
271         ChannelFactory channelFactory2;
272 
273         public FtpChannelGroupFutureListener(
274                 OrderedMemoryAwareThreadPoolExecutor pool,
275                 ChannelFactory channelFactory, ChannelFactory channelFactory2) {
276             this.pool = pool;
277             this.channelFactory = channelFactory;
278             this.channelFactory2 = channelFactory2;
279         }
280 
281         public void operationComplete(ChannelGroupFuture future)
282                 throws Exception {
283             pool.shutdownNow();
284             channelFactory.releaseExternalResources();
285             if (channelFactory2 != null) {
286                 channelFactory2.releaseExternalResources();
287             }
288         }
289     }
290 
291     /**
292      * Terminate all registered command channels
293      *
294      * @param configuration
295      * @return the number of previously registered command channels
296      */
297     static int terminateCommandChannels(FtpConfiguration configuration) {
298         int result = configuration.getFtpInternalConfiguration()
299                 .getCommandChannelGroup().size();
300         configuration.getFtpInternalConfiguration().getCommandChannelGroup()
301                 .close().addListener(
302                         new FtpChannelGroupFutureListener(configuration
303                                 .getFtpInternalConfiguration()
304                                 .getPipelineExecutor(), configuration
305                                 .getFtpInternalConfiguration()
306                                 .getCommandChannelFactory(), null));
307         return result;
308     }
309 
310     /**
311      * Terminate all registered data channels
312      *
313      * @param configuration
314      * @return the number of previously registered data channels
315      */
316     private static int terminateDataChannels(FtpConfiguration configuration) {
317         int result = configuration.getFtpInternalConfiguration()
318                 .getDataChannelGroup().size();
319         configuration.getFtpInternalConfiguration().getDataChannelGroup()
320                 .close().addListener(
321                         new FtpChannelGroupFutureListener(configuration
322                                 .getFtpInternalConfiguration()
323                                 .getDataPipelineExecutor(), configuration
324                                 .getFtpInternalConfiguration()
325                                 .getDataPassiveChannelFactory(), configuration
326                                 .getFtpInternalConfiguration()
327                                 .getDataActiveChannelFactory()));
328         return result;
329     }
330 
331     /**
332      * Return the current number of command connections
333      *
334      * @param configuration
335      * @return the current number of command connections
336      */
337     public static int nbCommandChannels(FtpConfiguration configuration) {
338         return configuration.getFtpInternalConfiguration()
339                 .getCommandChannelGroup().size();
340     }
341 
342     /**
343      * Return the current number of data connections
344      *
345      * @param configuration
346      * @return the current number of data connections
347      */
348     public static int nbDataChannels(FtpConfiguration configuration) {
349         return configuration.getFtpInternalConfiguration()
350                 .getDataChannelGroup().size();
351     }
352 
353     /**
354      * Return the number of still positive command connections
355      *
356      * @param configuration
357      * @return the number of positive command connections
358      */
359     public static int validCommandChannels(FtpConfiguration configuration) {
360         int result = 0;
361         Channel channel = null;
362         Iterator<Channel> iterator = configuration
363                 .getFtpInternalConfiguration().getCommandChannelGroup()
364                 .iterator();
365         while (iterator.hasNext()) {
366             channel = iterator.next();
367             if (channel.getParent() != null) {
368                 // Child Channel
369                 if (channel.isConnected()) {
370                     // Normal channel
371                     result ++;
372                 } else {
373                     Channels.close(channel);
374                 }
375             } else {
376                 // Parent channel
377                 result ++;
378             }
379         }
380         return result;
381     }
382 
383     /**
384      * Exit global ChannelFactory
385      *
386      * @param configuration
387      */
388     private static void exit(FtpConfiguration configuration) {
389         configuration.isShutdown = true;
390         long delay = configuration.TIMEOUTCON;
391         logger.warn("Exit: Give a delay of " + delay + " ms");
392         configuration.inShutdownProcess();
393         try {
394             Thread.sleep(delay);
395         } catch (InterruptedException e) {
396         }
397         configuration.getFtpInternalConfiguration()
398                 .getGlobalTrafficShapingHandler().releaseExternalResources();
399         Timer timer = new Timer(true);
400         FtpTimerTask timerTask = new FtpTimerTask(FtpTimerTask.TIMER_CONTROL);
401         timerTask.configuration = configuration;
402         timer.schedule(timerTask, configuration.TIMEOUTCON/2);
403         configuration.releaseResources();
404         logger.info("Exit Shutdown Data");
405         terminateDataChannels(configuration);
406         logger.warn("Exit end of Data Shutdown");
407         stopLogger();
408     }
409 
410     /**
411      * This function is the top function to be called when the server is to be
412      * shutdown.
413      *
414      * @param configuration
415      */
416     public static void teminateServer(FtpConfiguration configuration) {
417         FtpSignalHandler.terminate(true, configuration);
418     }
419 
420     /**
421      * Add a command channel into the list
422      *
423      * @param channel
424      * @param configuration
425      */
426     public static void addCommandChannel(Channel channel,
427             FtpConfiguration configuration) {
428         // logger.debug("Add Command Channel {}", channel);
429         configuration.getFtpInternalConfiguration().getCommandChannelGroup()
430                 .add(channel);
431     }
432 
433     /**
434      * Add a data channel into the list
435      *
436      * @param channel
437      * @param configuration
438      */
439     public static void addDataChannel(Channel channel,
440             FtpConfiguration configuration) {
441         // logger.debug("Add Data Channel {}", channel);
442         configuration.getFtpInternalConfiguration().getDataChannelGroup().add(
443                 channel);
444     }
445 
446     /**
447      * Used to run Exit command
448      */
449     private FtpConfiguration configuration;
450 
451     public FtpChannelUtils(FtpConfiguration configuration) {
452         this.configuration = configuration;
453     }
454 
455     @Override
456     public void run() {
457         exit(configuration);
458     }
459     public static void stopLogger() {
460         if (GgInternalLoggerFactory.getDefaultFactory() instanceof GgSlf4JLoggerFactory) {
461             LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
462             lc.stop();
463         }
464     }
465 
466 }