View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.config;
22  
23  import goldengate.common.command.exception.Reply425Exception;
24  import goldengate.common.file.DataBlockSizeEstimator;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  import goldengate.common.utility.GgThreadFactory;
28  import goldengate.ftp.core.control.FtpPipelineFactory;
29  import goldengate.ftp.core.data.handler.FtpDataPipelineFactory;
30  import goldengate.ftp.core.session.FtpSession;
31  import goldengate.ftp.core.session.FtpSessionReference;
32  import goldengate.ftp.core.utils.FtpChannelUtils;
33  import goldengate.ftp.core.utils.FtpSignalHandler;
34  
35  import java.net.InetAddress;
36  import java.net.InetSocketAddress;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.TimeUnit;
41  
42  import org.jboss.netty.bootstrap.ClientBootstrap;
43  import org.jboss.netty.bootstrap.ServerBootstrap;
44  import org.jboss.netty.channel.Channel;
45  import org.jboss.netty.channel.ChannelException;
46  import org.jboss.netty.channel.ChannelFactory;
47  import org.jboss.netty.channel.Channels;
48  import org.jboss.netty.channel.group.ChannelGroup;
49  import org.jboss.netty.channel.group.DefaultChannelGroup;
50  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
51  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
52  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
53  import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
54  import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler;
55  import org.jboss.netty.logging.InternalLoggerFactory;
56  import org.jboss.netty.util.HashedWheelTimer;
57  import org.jboss.netty.util.ObjectSizeEstimator;
58  import org.jboss.netty.util.Timer;
59  
60  /**
61   * Internal configuration of the FTP server, related to Netty
62   *
63   * @author Frederic Bregier
64   *
65   */
66  public class FtpInternalConfiguration {
67      // Static values
68      /**
69       * Internal Logger
70       */
71      private static final GgInternalLogger logger = GgInternalLoggerFactory
72              .getLogger(FtpInternalConfiguration.class);
73  
74      // Network Internals
75      /**
76       * Time elapse for retry in ms
77       */
78      public static final long RETRYINMS = 10;
79  
80      /**
81       * Number of retry before error
82       */
83      public static final int RETRYNB = 3;
84  
85      /**
86       * Time elapse for WRITE OR CLOSE WAIT elaps in ms
87       */
88      public static final long WAITFORNETOP = 1000;
89      /**
90       * Hack to say Windows or Unix (USR1 not OK on Windows)
91       */
92      public static Boolean ISUNIX = null;
93  
94      /**
95       * Default size for buffers (NIO)
96       */
97      public static final int BUFFERSIZEDEFAULT = 0x10000; // 64K
98  
99      // Dynamic values
100     /**
101      * List of all Command Channels to enable the close call on them using Netty
102      * ChannelGroup
103      */
104     private ChannelGroup commandChannelGroup = null;
105 
106     /**
107      * ExecutorService Boss
108      */
109     private final ExecutorService execBoss = Executors.newCachedThreadPool();
110 
111     /**
112      * ExecutorService Worker
113      */
114     private final ExecutorService execWorker = Executors.newCachedThreadPool();
115 
116     /**
117      * ChannelFactory for Command part
118      */
119     private ChannelFactory commandChannelFactory = null;
120 
121     /**
122      * ThreadPoolExecutor for command
123      */
124     private volatile OrderedMemoryAwareThreadPoolExecutor pipelineExecutor = null;
125 
126     /**
127      * Bootstrap for Command server
128      */
129     private ServerBootstrap serverBootstrap = null;
130 
131     /**
132      * List of all Data Channels to enable the close call on them using Netty
133      * ChannelGroup
134      */
135     private ChannelGroup dataChannelGroup = null;
136 
137     /**
138      * ExecutorService Data Passive Boss
139      */
140     private final ExecutorService execPassiveDataBoss = Executors
141             .newCachedThreadPool();
142 
143     /**
144      * ExecutorService Data Passive Worker
145      */
146     private final ExecutorService execPassiveDataWorker = Executors
147             .newCachedThreadPool();
148 
149     /**
150      * ChannelFactory for Data Passive part
151      */
152     private ChannelFactory dataPassiveChannelFactory = null;
153 
154     /**
155      * ExecutorService Data Active Boss
156      */
157     private final ExecutorService execActiveDataBoss = Executors
158             .newCachedThreadPool();
159 
160     /**
161      * ExecutorService Data Active Worker
162      */
163     private final ExecutorService execActiveDataWorker = Executors
164             .newCachedThreadPool();
165 
166     /**
167      * ChannelFactory for Data Active part
168      */
169     private ChannelFactory dataActiveChannelFactory = null;
170 
171     /**
172      * FtpSession references used by Data Connection process
173      */
174     private final FtpSessionReference ftpSessionReference = new FtpSessionReference();
175 
176     /**
177      * ThreadPoolExecutor for data
178      */
179     private volatile OrderedMemoryAwareThreadPoolExecutor pipelineDataExecutor = null;
180 
181     /**
182      * ServerBootStrap for Active connections
183      */
184     private ClientBootstrap activeBootstrap = null;
185 
186     /**
187      * ClientBootStrap for Passive connections
188      */
189     private ServerBootstrap passiveBootstrap = null;
190 
191     /**
192      * Timer for TrafficCounter
193      */
194     private Timer timerTrafficCounter = 
195         new HashedWheelTimer(new GgThreadFactory("TimerTrafficFtp"), 10, TimeUnit.MILLISECONDS, 1024);
196     
197     /**
198      * Global TrafficCounter (set from global configuration)
199      */
200     private volatile GlobalTrafficShapingHandler globalTrafficShapingHandler = null;
201 
202     /**
203      * ObjectSizeEstimator
204      */
205     private ObjectSizeEstimator objectSizeEstimator = null;
206 
207     /**
208      *
209      * @author Frederic Bregier goldengate.ftp.core.config BindAddress
210      *
211      */
212     public class BindAddress {
213         /**
214          * Parent passive channel
215          */
216         public final Channel parent;
217 
218         /**
219          * Number of binded Data connections
220          */
221         public int nbBind = 0;
222 
223         /**
224          * Constructor
225          *
226          * @param channel
227          */
228         public BindAddress(Channel channel) {
229             parent = channel;
230             nbBind = 0;
231         }
232     }
233 
234     /**
235      * List of already bind local addresses for Passive connections
236      */
237     private final ConcurrentHashMap<InetSocketAddress, BindAddress> hashBindPassiveDataConn =
238         new ConcurrentHashMap<InetSocketAddress, BindAddress>();
239 
240     /**
241      * Global Configuration
242      */
243     private final FtpConfiguration configuration;
244 
245     /**
246      * Constructor
247      *
248      * @param configuration
249      */
250     public FtpInternalConfiguration(FtpConfiguration configuration) {
251         this.configuration = configuration;
252         ISUNIX = !System.getProperty("os.name").toLowerCase().startsWith("win");
253     }
254 
255     /**
256      * Startup the server
257      *
258      */
259     public void serverStartup() {
260         InternalLoggerFactory.setDefaultFactory(InternalLoggerFactory
261                 .getDefaultFactory());
262         // Command
263         commandChannelGroup = new DefaultChannelGroup(configuration.fromClass
264                 .getName());
265         commandChannelFactory = new NioServerSocketChannelFactory(execBoss,
266                 execWorker, configuration.SERVER_THREAD);
267         // Data
268         dataChannelGroup = new DefaultChannelGroup(configuration.fromClass
269                 .getName() +
270                 ".data");
271         dataPassiveChannelFactory = new NioServerSocketChannelFactory(
272                 execPassiveDataBoss, execPassiveDataWorker,
273                 configuration.SERVER_THREAD);
274         dataActiveChannelFactory = new NioClientSocketChannelFactory(
275                 execActiveDataBoss, execActiveDataWorker, configuration.CLIENT_THREAD);
276 
277         // Passive Data Connections
278         passiveBootstrap = new ServerBootstrap(dataPassiveChannelFactory);
279         passiveBootstrap.setPipelineFactory(new FtpDataPipelineFactory(
280                 configuration.dataBusinessHandler, configuration, false));
281         passiveBootstrap.setOption("connectTimeoutMillis",
282                 configuration.TIMEOUTCON);
283         passiveBootstrap.setOption("reuseAddress", true);
284         passiveBootstrap.setOption("tcpNoDelay", true);
285         passiveBootstrap.setOption("child.connectTimeoutMillis",
286                 configuration.TIMEOUTCON);
287         passiveBootstrap.setOption("child.tcpNoDelay", true);
288         passiveBootstrap.setOption("child.keepAlive", true);
289         passiveBootstrap.setOption("child.reuseAddress", true);
290         // Active Data Connections
291         activeBootstrap = new ClientBootstrap(dataActiveChannelFactory);
292         activeBootstrap.setPipelineFactory(new FtpDataPipelineFactory(
293                 configuration.dataBusinessHandler, configuration, true));
294         activeBootstrap.setOption("connectTimeoutMillis",
295                 configuration.TIMEOUTCON);
296         activeBootstrap.setOption("reuseAddress", true);
297         activeBootstrap.setOption("tcpNoDelay", true);
298         activeBootstrap.setOption("child.connectTimeoutMillis",
299                 configuration.TIMEOUTCON);
300         activeBootstrap.setOption("child.tcpNoDelay", true);
301         activeBootstrap.setOption("child.keepAlive", true);
302         activeBootstrap.setOption("child.reuseAddress", true);
303         // Main Command server
304         serverBootstrap = new ServerBootstrap(getCommandChannelFactory());
305         serverBootstrap.setPipelineFactory(new FtpPipelineFactory(
306                 configuration.businessHandler, configuration));
307         serverBootstrap.setOption("child.tcpNoDelay", true);
308         serverBootstrap.setOption("child.keepAlive", true);
309         serverBootstrap.setOption("child.reuseAddress", true);
310         serverBootstrap.setOption("child.connectTimeoutMillis",
311                 configuration.TIMEOUTCON);
312         serverBootstrap.setOption("tcpNoDelay", true);
313         serverBootstrap.setOption("reuseAddress", true);
314         serverBootstrap.setOption("connectTimeoutMillis",
315                 configuration.TIMEOUTCON);
316 
317         FtpChannelUtils.addCommandChannel(serverBootstrap
318                 .bind(new InetSocketAddress(configuration.getServerPort())),
319                 configuration);
320 
321         // Init signal handler
322         FtpSignalHandler.initSignalHandler(configuration);
323         // Factory for TrafficShapingHandler
324         objectSizeEstimator = new DataBlockSizeEstimator();
325         globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
326                 objectSizeEstimator, timerTrafficCounter, configuration
327                         .getServerGlobalWriteLimit(), configuration
328                         .getServerGlobalReadLimit(), configuration
329                         .getDelayLimit());
330         pipelineExecutor = new OrderedMemoryAwareThreadPoolExecutor(
331                 configuration.CLIENT_THREAD,
332                 configuration.maxGlobalMemory / 40,
333                 configuration.maxGlobalMemory / 4, 1000,
334                 TimeUnit.MILLISECONDS, objectSizeEstimator,
335                 new GgThreadFactory("CommandExecutor"));
336         pipelineDataExecutor = new OrderedMemoryAwareThreadPoolExecutor(
337                 configuration.CLIENT_THREAD,
338                 configuration.maxGlobalMemory / 10,
339                 configuration.maxGlobalMemory, 1000,
340                 TimeUnit.MILLISECONDS, objectSizeEstimator,
341                 new GgThreadFactory("DataExecutor"));
342     }
343     /**
344      * 
345      * @return an ExecutorService
346      */
347     public ExecutorService getWorker() {
348         return execWorker;
349     }
350     /**
351      * Add a session from a couple of addresses
352      *
353      * @param ipOnly
354      * @param fullIp
355      * @param session
356      */
357     public void setNewFtpSession(InetAddress ipOnly, InetSocketAddress fullIp,
358             FtpSession session) {
359         ftpSessionReference.setNewFtpSession(ipOnly, fullIp, session);
360     }
361 
362     /**
363      * Return and remove the FtpSession
364      *
365      * @param channel
366      * @param active
367      * @return the FtpSession if it exists associated to this channel
368      */
369     public FtpSession getFtpSession(Channel channel, boolean active) {
370         if (active) {
371             return ftpSessionReference.getActiveFtpSession(channel);
372         } else {
373             return ftpSessionReference.getPassiveFtpSession(channel);
374         }
375     }
376 
377     /**
378      * Remove the FtpSession
379      *
380      * @param ipOnly
381      * @param fullIp
382      */
383     public void delFtpSession(InetAddress ipOnly, InetSocketAddress fullIp) {
384         ftpSessionReference.delFtpSession(ipOnly, fullIp);
385     }
386 
387     /**
388      * Test if the couple of addresses is already in the context
389      *
390      * @param ipOnly
391      * @param fullIp
392      * @return True if the couple is present
393      */
394     public boolean hasFtpSession(InetAddress ipOnly, InetSocketAddress fullIp) {
395         return ftpSessionReference.contains(ipOnly, fullIp);
396     }
397     /**
398      * 
399      * @return the number of Active Sessions
400      */
401     public int getNumberSessions() {
402         return ftpSessionReference.sessionsNumber();
403     }
404     /**
405      * Try to add a Passive Channel listening to the specified local address
406      *
407      * @param address
408      * @throws Reply425Exception
409      *             in case the channel cannot be opened
410      */
411     public void bindPassive(InetSocketAddress address) throws Reply425Exception {
412         configuration.bindLock();
413         try {
414             BindAddress bindAddress = hashBindPassiveDataConn.get(address);
415             if (bindAddress == null) {
416                 logger.debug("Bind really to {}", address);
417                 Channel parentChannel = null;
418                 try {
419                     parentChannel = passiveBootstrap.bind(address);
420                 } catch (ChannelException e) {
421                     logger.warn("Cannot open passive connection {}", e
422                             .getMessage());
423                     throw new Reply425Exception(
424                             "Cannot open a Passive Connection ");
425                 }
426                 bindAddress = new BindAddress(parentChannel);
427                 FtpChannelUtils.addDataChannel(parentChannel, configuration);
428                 hashBindPassiveDataConn.put(address, bindAddress);
429             }
430             bindAddress.nbBind++;
431             logger.debug("Bind number to {} is {}", address, bindAddress.nbBind);
432         } finally {
433             configuration.bindUnlock();
434         }
435     }
436 
437     /**
438      * Try to unbind (closing the parent channel) the Passive Channel listening
439      * to the specified local address if the last one. It returns only when the
440      * underlying parent channel is closed if this was the last session that
441      * wants to open on this local address.
442      *
443      * @param address
444      */
445     public void unbindPassive(InetSocketAddress address) {
446         configuration.bindLock();
447         try {
448             BindAddress bindAddress = hashBindPassiveDataConn.get(address);
449             if (bindAddress != null) {
450                 bindAddress.nbBind--;
451                 logger.debug("Bind number to {} left is {}", address, bindAddress.nbBind);
452                 if (bindAddress.nbBind == 0) {
453                     Channels.close(bindAddress.parent);
454                     hashBindPassiveDataConn.remove(address);
455                 }
456             } else {
457                 logger.warn("No Bind to {}", address);
458             }
459         } finally {
460             configuration.bindUnlock();
461         }
462     }
463 
464     /**
465      *
466      * @return the number of Binded Passive Connections
467      */
468     public int getNbBindedPassive() {
469         return hashBindPassiveDataConn.size();
470     }
471 
472     /**
473      * Return the associated PipelineExecutor for Command Pipeline
474      *
475      * @return the Command Pipeline Executor
476      */
477     public OrderedMemoryAwareThreadPoolExecutor getPipelineExecutor() {
478         return pipelineExecutor;
479     }
480 
481     /**
482      * Return the associated PipelineExecutor for Data Pipeline
483      *
484      * @return the Data Pipeline Executor
485      */
486     public OrderedMemoryAwareThreadPoolExecutor getDataPipelineExecutor() {
487         return pipelineDataExecutor;
488     }
489 
490     /**
491      *
492      * @return the ActiveBootstrap
493      */
494     public ClientBootstrap getActiveBootstrap() {
495         return activeBootstrap;
496     }
497 
498     /**
499      * @return the commandChannelFactory
500      */
501     public ChannelFactory getCommandChannelFactory() {
502         return commandChannelFactory;
503     }
504 
505     /**
506      * @return the commandChannelGroup
507      */
508     public ChannelGroup getCommandChannelGroup() {
509         return commandChannelGroup;
510     }
511 
512     /**
513      * @return the dataPassiveChannelFactory
514      */
515     public ChannelFactory getDataPassiveChannelFactory() {
516         return dataPassiveChannelFactory;
517     }
518 
519     /**
520      * @return the dataActiveChannelFactory
521      */
522     public ChannelFactory getDataActiveChannelFactory() {
523         return dataActiveChannelFactory;
524     }
525 
526     /**
527      * @return the dataChannelGroup
528      */
529     public ChannelGroup getDataChannelGroup() {
530         return dataChannelGroup;
531     }
532 
533     /**
534      * @return the objectSizeEstimator
535      */
536     public ObjectSizeEstimator getObjectSizeEstimator() {
537         return objectSizeEstimator;
538     }
539 
540     /**
541      *
542      * @return The TrafficCounterFactory
543      */
544     public GlobalTrafficShapingHandler getGlobalTrafficShapingHandler() {
545         return globalTrafficShapingHandler;
546     }
547 
548     /**
549      *
550      * @return a new ChannelTrafficShapingHandler
551      */
552     public ChannelTrafficShapingHandler newChannelTrafficShapingHandler() {
553         if (configuration.getServerChannelWriteLimit() == 0 &&
554                 configuration.getServerChannelReadLimit() == 0) {
555             return null;
556         }
557         return new ChannelTrafficShapingHandler(objectSizeEstimator,
558                 timerTrafficCounter, configuration.getServerChannelWriteLimit(),
559                 configuration.getServerChannelReadLimit(), configuration
560                         .getDelayLimit());
561     }
562     
563     public void releaseResources() {
564         execBoss.shutdown();
565         execWorker.shutdown();
566         execPassiveDataBoss.shutdown();
567         execPassiveDataWorker.shutdown();
568         execActiveDataBoss.shutdown();
569         execActiveDataWorker.shutdown();
570         timerTrafficCounter.stop();
571         activeBootstrap.releaseExternalResources();
572         passiveBootstrap.releaseExternalResources();
573         serverBootstrap.releaseExternalResources();
574     }
575 }