1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.config;
22
23 import goldengate.common.command.exception.Reply425Exception;
24 import goldengate.common.file.DataBlockSizeEstimator;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27 import goldengate.common.utility.GgThreadFactory;
28 import goldengate.ftp.core.control.FtpPipelineFactory;
29 import goldengate.ftp.core.data.handler.FtpDataPipelineFactory;
30 import goldengate.ftp.core.session.FtpSession;
31 import goldengate.ftp.core.session.FtpSessionReference;
32 import goldengate.ftp.core.utils.FtpChannelUtils;
33 import goldengate.ftp.core.utils.FtpSignalHandler;
34
35 import java.net.InetAddress;
36 import java.net.InetSocketAddress;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.TimeUnit;
41
42 import org.jboss.netty.bootstrap.ClientBootstrap;
43 import org.jboss.netty.bootstrap.ServerBootstrap;
44 import org.jboss.netty.channel.Channel;
45 import org.jboss.netty.channel.ChannelException;
46 import org.jboss.netty.channel.ChannelFactory;
47 import org.jboss.netty.channel.Channels;
48 import org.jboss.netty.channel.group.ChannelGroup;
49 import org.jboss.netty.channel.group.DefaultChannelGroup;
50 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
51 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
52 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
53 import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
54 import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler;
55 import org.jboss.netty.logging.InternalLoggerFactory;
56 import org.jboss.netty.util.HashedWheelTimer;
57 import org.jboss.netty.util.ObjectSizeEstimator;
58 import org.jboss.netty.util.Timer;
59
60
61
62
63
64
65
66 public class FtpInternalConfiguration {
67
68
69
70
71 private static final GgInternalLogger logger = GgInternalLoggerFactory
72 .getLogger(FtpInternalConfiguration.class);
73
74
75
76
77
78 public static final long RETRYINMS = 10;
79
80
81
82
83 public static final int RETRYNB = 3;
84
85
86
87
88 public static final long WAITFORNETOP = 1000;
89
90
91
92 public static Boolean ISUNIX = null;
93
94
95
96
97 public static final int BUFFERSIZEDEFAULT = 0x10000;
98
99
100
101
102
103
104 private ChannelGroup commandChannelGroup = null;
105
106
107
108
109 private final ExecutorService execBoss = Executors.newCachedThreadPool();
110
111
112
113
114 private final ExecutorService execWorker = Executors.newCachedThreadPool();
115
116
117
118
119 private ChannelFactory commandChannelFactory = null;
120
121
122
123
124 private volatile OrderedMemoryAwareThreadPoolExecutor pipelineExecutor = null;
125
126
127
128
129 private ServerBootstrap serverBootstrap = null;
130
131
132
133
134
135 private ChannelGroup dataChannelGroup = null;
136
137
138
139
140 private final ExecutorService execPassiveDataBoss = Executors
141 .newCachedThreadPool();
142
143
144
145
146 private final ExecutorService execPassiveDataWorker = Executors
147 .newCachedThreadPool();
148
149
150
151
152 private ChannelFactory dataPassiveChannelFactory = null;
153
154
155
156
157 private final ExecutorService execActiveDataBoss = Executors
158 .newCachedThreadPool();
159
160
161
162
163 private final ExecutorService execActiveDataWorker = Executors
164 .newCachedThreadPool();
165
166
167
168
169 private ChannelFactory dataActiveChannelFactory = null;
170
171
172
173
174 private final FtpSessionReference ftpSessionReference = new FtpSessionReference();
175
176
177
178
179 private volatile OrderedMemoryAwareThreadPoolExecutor pipelineDataExecutor = null;
180
181
182
183
184 private ClientBootstrap activeBootstrap = null;
185
186
187
188
189 private ServerBootstrap passiveBootstrap = null;
190
191
192
193
194 private Timer timerTrafficCounter =
195 new HashedWheelTimer(new GgThreadFactory("TimerTrafficFtp"), 10, TimeUnit.MILLISECONDS, 1024);
196
197
198
199
200 private volatile GlobalTrafficShapingHandler globalTrafficShapingHandler = null;
201
202
203
204
205 private ObjectSizeEstimator objectSizeEstimator = null;
206
207
208
209
210
211
212 public class BindAddress {
213
214
215
216 public final Channel parent;
217
218
219
220
221 public int nbBind = 0;
222
223
224
225
226
227
228 public BindAddress(Channel channel) {
229 parent = channel;
230 nbBind = 0;
231 }
232 }
233
234
235
236
237 private final ConcurrentHashMap<InetSocketAddress, BindAddress> hashBindPassiveDataConn =
238 new ConcurrentHashMap<InetSocketAddress, BindAddress>();
239
240
241
242
243 private final FtpConfiguration configuration;
244
245
246
247
248
249
250 public FtpInternalConfiguration(FtpConfiguration configuration) {
251 this.configuration = configuration;
252 ISUNIX = !System.getProperty("os.name").toLowerCase().startsWith("win");
253 }
254
255
256
257
258
259 public void serverStartup() {
260 InternalLoggerFactory.setDefaultFactory(InternalLoggerFactory
261 .getDefaultFactory());
262
263 commandChannelGroup = new DefaultChannelGroup(configuration.fromClass
264 .getName());
265 commandChannelFactory = new NioServerSocketChannelFactory(execBoss,
266 execWorker, configuration.SERVER_THREAD);
267
268 dataChannelGroup = new DefaultChannelGroup(configuration.fromClass
269 .getName() +
270 ".data");
271 dataPassiveChannelFactory = new NioServerSocketChannelFactory(
272 execPassiveDataBoss, execPassiveDataWorker,
273 configuration.SERVER_THREAD);
274 dataActiveChannelFactory = new NioClientSocketChannelFactory(
275 execActiveDataBoss, execActiveDataWorker, configuration.CLIENT_THREAD);
276
277
278 passiveBootstrap = new ServerBootstrap(dataPassiveChannelFactory);
279 passiveBootstrap.setPipelineFactory(new FtpDataPipelineFactory(
280 configuration.dataBusinessHandler, configuration, false));
281 passiveBootstrap.setOption("connectTimeoutMillis",
282 configuration.TIMEOUTCON);
283 passiveBootstrap.setOption("reuseAddress", true);
284 passiveBootstrap.setOption("tcpNoDelay", true);
285 passiveBootstrap.setOption("child.connectTimeoutMillis",
286 configuration.TIMEOUTCON);
287 passiveBootstrap.setOption("child.tcpNoDelay", true);
288 passiveBootstrap.setOption("child.keepAlive", true);
289 passiveBootstrap.setOption("child.reuseAddress", true);
290
291 activeBootstrap = new ClientBootstrap(dataActiveChannelFactory);
292 activeBootstrap.setPipelineFactory(new FtpDataPipelineFactory(
293 configuration.dataBusinessHandler, configuration, true));
294 activeBootstrap.setOption("connectTimeoutMillis",
295 configuration.TIMEOUTCON);
296 activeBootstrap.setOption("reuseAddress", true);
297 activeBootstrap.setOption("tcpNoDelay", true);
298 activeBootstrap.setOption("child.connectTimeoutMillis",
299 configuration.TIMEOUTCON);
300 activeBootstrap.setOption("child.tcpNoDelay", true);
301 activeBootstrap.setOption("child.keepAlive", true);
302 activeBootstrap.setOption("child.reuseAddress", true);
303
304 serverBootstrap = new ServerBootstrap(getCommandChannelFactory());
305 serverBootstrap.setPipelineFactory(new FtpPipelineFactory(
306 configuration.businessHandler, configuration));
307 serverBootstrap.setOption("child.tcpNoDelay", true);
308 serverBootstrap.setOption("child.keepAlive", true);
309 serverBootstrap.setOption("child.reuseAddress", true);
310 serverBootstrap.setOption("child.connectTimeoutMillis",
311 configuration.TIMEOUTCON);
312 serverBootstrap.setOption("tcpNoDelay", true);
313 serverBootstrap.setOption("reuseAddress", true);
314 serverBootstrap.setOption("connectTimeoutMillis",
315 configuration.TIMEOUTCON);
316
317 FtpChannelUtils.addCommandChannel(serverBootstrap
318 .bind(new InetSocketAddress(configuration.getServerPort())),
319 configuration);
320
321
322 FtpSignalHandler.initSignalHandler(configuration);
323
324 objectSizeEstimator = new DataBlockSizeEstimator();
325 globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
326 objectSizeEstimator, timerTrafficCounter, configuration
327 .getServerGlobalWriteLimit(), configuration
328 .getServerGlobalReadLimit(), configuration
329 .getDelayLimit());
330 pipelineExecutor = new OrderedMemoryAwareThreadPoolExecutor(
331 configuration.CLIENT_THREAD,
332 configuration.maxGlobalMemory / 40,
333 configuration.maxGlobalMemory / 4, 1000,
334 TimeUnit.MILLISECONDS, objectSizeEstimator,
335 new GgThreadFactory("CommandExecutor"));
336 pipelineDataExecutor = new OrderedMemoryAwareThreadPoolExecutor(
337 configuration.CLIENT_THREAD,
338 configuration.maxGlobalMemory / 10,
339 configuration.maxGlobalMemory, 1000,
340 TimeUnit.MILLISECONDS, objectSizeEstimator,
341 new GgThreadFactory("DataExecutor"));
342 }
343
344
345
346
347 public ExecutorService getWorker() {
348 return execWorker;
349 }
350
351
352
353
354
355
356
357 public void setNewFtpSession(InetAddress ipOnly, InetSocketAddress fullIp,
358 FtpSession session) {
359 ftpSessionReference.setNewFtpSession(ipOnly, fullIp, session);
360 }
361
362
363
364
365
366
367
368
369 public FtpSession getFtpSession(Channel channel, boolean active) {
370 if (active) {
371 return ftpSessionReference.getActiveFtpSession(channel);
372 } else {
373 return ftpSessionReference.getPassiveFtpSession(channel);
374 }
375 }
376
377
378
379
380
381
382
383 public void delFtpSession(InetAddress ipOnly, InetSocketAddress fullIp) {
384 ftpSessionReference.delFtpSession(ipOnly, fullIp);
385 }
386
387
388
389
390
391
392
393
394 public boolean hasFtpSession(InetAddress ipOnly, InetSocketAddress fullIp) {
395 return ftpSessionReference.contains(ipOnly, fullIp);
396 }
397
398
399
400
401 public int getNumberSessions() {
402 return ftpSessionReference.sessionsNumber();
403 }
404
405
406
407
408
409
410
411 public void bindPassive(InetSocketAddress address) throws Reply425Exception {
412 configuration.bindLock();
413 try {
414 BindAddress bindAddress = hashBindPassiveDataConn.get(address);
415 if (bindAddress == null) {
416 logger.debug("Bind really to {}", address);
417 Channel parentChannel = null;
418 try {
419 parentChannel = passiveBootstrap.bind(address);
420 } catch (ChannelException e) {
421 logger.warn("Cannot open passive connection {}", e
422 .getMessage());
423 throw new Reply425Exception(
424 "Cannot open a Passive Connection ");
425 }
426 bindAddress = new BindAddress(parentChannel);
427 FtpChannelUtils.addDataChannel(parentChannel, configuration);
428 hashBindPassiveDataConn.put(address, bindAddress);
429 }
430 bindAddress.nbBind++;
431 logger.debug("Bind number to {} is {}", address, bindAddress.nbBind);
432 } finally {
433 configuration.bindUnlock();
434 }
435 }
436
437
438
439
440
441
442
443
444
445 public void unbindPassive(InetSocketAddress address) {
446 configuration.bindLock();
447 try {
448 BindAddress bindAddress = hashBindPassiveDataConn.get(address);
449 if (bindAddress != null) {
450 bindAddress.nbBind--;
451 logger.debug("Bind number to {} left is {}", address, bindAddress.nbBind);
452 if (bindAddress.nbBind == 0) {
453 Channels.close(bindAddress.parent);
454 hashBindPassiveDataConn.remove(address);
455 }
456 } else {
457 logger.warn("No Bind to {}", address);
458 }
459 } finally {
460 configuration.bindUnlock();
461 }
462 }
463
464
465
466
467
468 public int getNbBindedPassive() {
469 return hashBindPassiveDataConn.size();
470 }
471
472
473
474
475
476
477 public OrderedMemoryAwareThreadPoolExecutor getPipelineExecutor() {
478 return pipelineExecutor;
479 }
480
481
482
483
484
485
486 public OrderedMemoryAwareThreadPoolExecutor getDataPipelineExecutor() {
487 return pipelineDataExecutor;
488 }
489
490
491
492
493
494 public ClientBootstrap getActiveBootstrap() {
495 return activeBootstrap;
496 }
497
498
499
500
501 public ChannelFactory getCommandChannelFactory() {
502 return commandChannelFactory;
503 }
504
505
506
507
508 public ChannelGroup getCommandChannelGroup() {
509 return commandChannelGroup;
510 }
511
512
513
514
515 public ChannelFactory getDataPassiveChannelFactory() {
516 return dataPassiveChannelFactory;
517 }
518
519
520
521
522 public ChannelFactory getDataActiveChannelFactory() {
523 return dataActiveChannelFactory;
524 }
525
526
527
528
529 public ChannelGroup getDataChannelGroup() {
530 return dataChannelGroup;
531 }
532
533
534
535
536 public ObjectSizeEstimator getObjectSizeEstimator() {
537 return objectSizeEstimator;
538 }
539
540
541
542
543
544 public GlobalTrafficShapingHandler getGlobalTrafficShapingHandler() {
545 return globalTrafficShapingHandler;
546 }
547
548
549
550
551
552 public ChannelTrafficShapingHandler newChannelTrafficShapingHandler() {
553 if (configuration.getServerChannelWriteLimit() == 0 &&
554 configuration.getServerChannelReadLimit() == 0) {
555 return null;
556 }
557 return new ChannelTrafficShapingHandler(objectSizeEstimator,
558 timerTrafficCounter, configuration.getServerChannelWriteLimit(),
559 configuration.getServerChannelReadLimit(), configuration
560 .getDelayLimit());
561 }
562
563 public void releaseResources() {
564 execBoss.shutdown();
565 execWorker.shutdown();
566 execPassiveDataBoss.shutdown();
567 execPassiveDataWorker.shutdown();
568 execActiveDataBoss.shutdown();
569 execActiveDataWorker.shutdown();
570 timerTrafficCounter.stop();
571 activeBootstrap.releaseExternalResources();
572 passiveBootstrap.releaseExternalResources();
573 serverBootstrap.releaseExternalResources();
574 }
575 }