1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.data;
22
23 import goldengate.common.command.ReplyCode;
24 import goldengate.common.command.exception.CommandAbstractException;
25 import goldengate.common.command.exception.Reply425Exception;
26 import goldengate.common.future.GgChannelFuture;
27 import goldengate.common.future.GgFuture;
28 import goldengate.common.logging.GgInternalLogger;
29 import goldengate.common.logging.GgInternalLoggerFactory;
30 import goldengate.ftp.core.command.FtpCommandCode;
31 import goldengate.ftp.core.command.service.ABOR;
32 import goldengate.ftp.core.config.FtpInternalConfiguration;
33 import goldengate.ftp.core.control.NetworkHandler;
34 import goldengate.ftp.core.data.handler.DataNetworkHandler;
35 import goldengate.ftp.core.exception.FtpNoConnectionException;
36 import goldengate.ftp.core.exception.FtpNoFileException;
37 import goldengate.ftp.core.exception.FtpNoTransferException;
38 import goldengate.ftp.core.file.FtpFile;
39 import goldengate.ftp.core.session.FtpSession;
40
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.util.List;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.TimeUnit;
47
48 import org.jboss.netty.bootstrap.ClientBootstrap;
49 import org.jboss.netty.channel.Channel;
50 import org.jboss.netty.channel.ChannelFuture;
51 import org.jboss.netty.channel.Channels;
52
53
54
55
56
57
58
59 public class FtpTransferControl {
60
61
62
63 private static final GgInternalLogger logger = GgInternalLoggerFactory
64 .getLogger(FtpTransferControl.class);
65
66
67
68
69 private final FtpSession session;
70
71
72
73
74 private volatile boolean isDataNetworkHandlerReady = false;
75
76
77
78
79 private volatile Channel dataChannel = null;
80
81
82
83
84 private volatile GgChannelFuture waitForOpenedDataChannel = new GgChannelFuture(
85 true);
86
87
88
89
90 private volatile GgFuture closedDataChannel = null;
91
92
93
94
95 private volatile boolean isExecutingCommandFinished = true;
96
97
98
99 private volatile GgFuture commandFinishing = null;
100
101
102
103
104 private FtpTransfer executingCommand = null;
105
106
107
108
109 private ExecutorService executorService = null;
110
111
112
113
114
115 private volatile GgFuture endOfCommand = null;
116
117
118
119
120 private volatile boolean isCheckAlreadyCalled = false;
121
122
123
124
125
126 public FtpTransferControl(FtpSession session) {
127 this.session = session;
128 endOfCommand = null;
129 }
130
131
132
133
134
135
136 private void setDataNetworkHandlerReady() {
137 isCheckAlreadyCalled = false;
138 if (isDataNetworkHandlerReady) {
139 return;
140 }
141 isDataNetworkHandlerReady = true;
142 }
143
144
145
146
147
148
149
150
151 public void waitForDataNetworkHandlerReady() throws InterruptedException {
152 if (!isDataNetworkHandlerReady) {
153
154 throw new InterruptedException("Bad initialization");
155 }
156 }
157
158
159
160
161
162
163
164
165 public void setOpenedDataChannel(Channel channel,
166 DataNetworkHandler dataNetworkHandler) {
167 if (channel != null) {
168 session.getDataConn().setDataNetworkHandler(dataNetworkHandler);
169 waitForOpenedDataChannel.setChannel(channel);
170 waitForOpenedDataChannel.setSuccess();
171 } else {
172 waitForOpenedDataChannel.cancel();
173 }
174 }
175
176
177
178
179
180
181
182
183 public Channel waitForOpenedDataChannel() throws InterruptedException {
184 Channel channel = null;
185 if (waitForOpenedDataChannel.await(
186 session.getConfiguration().TIMEOUTCON + 1000,
187 TimeUnit.MILLISECONDS)) {
188 if (waitForOpenedDataChannel.isSuccess()) {
189 channel = waitForOpenedDataChannel.getChannel();
190 } else {
191 logger.warn("data connection is in error");
192 }
193 } else {
194 logger.warn("Timeout occurs during data connection");
195 }
196 waitForOpenedDataChannel = new GgChannelFuture(true);
197 return channel;
198 }
199
200
201
202
203 public void setClosedDataChannel() {
204 if (closedDataChannel != null) {
205 closedDataChannel.setSuccess();
206 }
207 }
208
209
210
211
212
213
214
215
216 public boolean openDataConnection() throws Reply425Exception {
217
218
219
220 closedDataChannel = new GgFuture(true);
221 FtpDataAsyncConn dataAsyncConn = session.getDataConn();
222 if (!dataAsyncConn.isStreamFile()) {
223
224 if (dataAsyncConn.isConnected()) {
225
226
227 session.setReplyCode(
228 ReplyCode.REPLY_125_DATA_CONNECTION_ALREADY_OPEN,
229 dataAsyncConn.getType().name() +
230 " mode data connection already open");
231 return true;
232 }
233 } else {
234
235 if (dataAsyncConn.isConnected()) {
236 logger
237 .error("Connection already open but should not since in Stream mode");
238 setTransferAbortedFromInternal(false);
239 throw new Reply425Exception("Connection already open but should not since in Stream mode");
240 }
241 }
242
243 session.setReplyCode(ReplyCode.REPLY_150_FILE_STATUS_OKAY, "Opening " +
244 dataAsyncConn.getType().name() + " mode data connection");
245 if (dataAsyncConn.isPassiveMode()) {
246 if (!dataAsyncConn.isBind()) {
247
248 throw new Reply425Exception(
249 "No passive data connection prepared");
250 }
251
252
253 try {
254 dataChannel = waitForOpenedDataChannel();
255 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
256 } catch (InterruptedException e) {
257 logger.warn("Connection abort in passive mode", e);
258
259 throw new Reply425Exception(
260 "Cannot open passive data connection");
261 }
262
263 } else {
264
265 InetAddress inetAddress = dataAsyncConn.getLocalAddress()
266 .getAddress();
267 InetSocketAddress inetSocketAddress = dataAsyncConn
268 .getRemoteAddress();
269 if (session.getConfiguration().getFtpInternalConfiguration()
270 .hasFtpSession(inetAddress, inetSocketAddress)) {
271 throw new Reply425Exception(
272 "Cannot open active data connection since remote address is already in use: " +
273 inetSocketAddress);
274 }
275
276 ClientBootstrap clientBootstrap = session.getConfiguration()
277 .getFtpInternalConfiguration().getActiveBootstrap();
278 session.getConfiguration().setNewFtpSession(inetAddress,
279 inetSocketAddress, session);
280
281 String mylog = session.toString();
282 logger.debug("DataConn for: "+session.getCurrentCommand().getCommand()+" to "+inetSocketAddress.toString());
283 ChannelFuture future = clientBootstrap.connect(inetSocketAddress,
284 dataAsyncConn.getLocalAddress());
285 try {
286 future.await();
287 } catch (InterruptedException e1) {
288 }
289 if (!future.isSuccess()) {
290 logger.warn("Connection abort in active mode from future while session: "+
291 session.toString()+
292 "\nTrying connect to: "+inetSocketAddress.toString()+
293 "\nWas: "+mylog,
294 future.getCause());
295
296 session.getConfiguration().delFtpSession(inetAddress,
297 inetSocketAddress);
298 throw new Reply425Exception(
299 "Cannot open active data connection");
300 }
301 try {
302 dataChannel = waitForOpenedDataChannel();
303 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
304 } catch (InterruptedException e) {
305 logger.warn("Connection abort in active mode", e);
306
307 session.getConfiguration().delFtpSession(inetAddress,
308 inetSocketAddress);
309 throw new Reply425Exception(
310 "Cannot open active data connection");
311 }
312
313 }
314 if (dataChannel == null) {
315
316 if (!dataAsyncConn.isPassiveMode()) {
317 session.getConfiguration().getFtpInternalConfiguration()
318 .delFtpSession(
319 dataAsyncConn.getLocalAddress().getAddress(),
320 dataAsyncConn.getRemoteAddress());
321 }
322 throw new Reply425Exception(
323 "Cannot open data connection, shuting down");
324 }
325 return true;
326 }
327
328
329
330
331
332 private void runExecutor() {
333
334 try {
335 session.getDataConn().getDataNetworkHandler().unlockModeCodec();
336 } catch (FtpNoConnectionException e) {
337 setTransferAbortedFromInternal(false);
338 return;
339 }
340
341 if (executorService == null) {
342 executorService = Executors.newSingleThreadExecutor();
343 }
344 endOfCommand = new GgFuture(true);
345 executorService.execute(new FtpTransferExecutor(session,
346 executingCommand));
347 try {
348 commandFinishing.await();
349 } catch (InterruptedException e) {
350 }
351 }
352
353
354
355
356
357
358
359
360
361 public void setNewFtpTransfer(FtpCommandCode command, FtpFile file) {
362 isExecutingCommandFinished = false;
363 commandFinishing = new GgFuture(true);
364
365 setDataNetworkHandlerReady();
366 executingCommand = new FtpTransfer(command, file);
367 runExecutor();
368 commandFinishing = null;
369 }
370
371
372
373
374
375
376
377
378
379
380
381 public void setNewFtpTransfer(FtpCommandCode command, List<String> list,
382 String path) {
383 isExecutingCommandFinished = false;
384 commandFinishing = new GgFuture(true);
385
386 setDataNetworkHandlerReady();
387 executingCommand = new FtpTransfer(command, list, path);
388 runExecutor();
389 commandFinishing = null;
390 }
391
392
393
394
395
396
397
398
399 public boolean isFtpTransferExecuting() {
400 return !isExecutingCommandFinished;
401 }
402
403
404
405
406
407
408 public FtpTransfer getExecutingFtpTransfer() throws FtpNoTransferException {
409 if (executingCommand != null) {
410 return executingCommand;
411 }
412 throw new FtpNoTransferException("No Command currently running");
413 }
414
415
416
417
418
419
420
421
422 private boolean isExecutingRetrLikeTransfer()
423 throws FtpNoTransferException, CommandAbstractException,
424 FtpNoFileException {
425 return FtpCommandCode.isRetrLikeCommand(getExecutingFtpTransfer()
426 .getCommand()) &&
427 getExecutingFtpTransfer().getFtpFile().isInReading();
428 }
429
430
431
432
433
434 public void runTrueRetrieve() {
435 try {
436 if (isExecutingRetrLikeTransfer()) {
437 getExecutingFtpTransfer().getFtpFile().trueRetrieve();
438 }
439 } catch (CommandAbstractException e) {
440 } catch (FtpNoTransferException e) {
441 } catch (FtpNoFileException e) {
442 }
443 }
444
445
446
447
448
449
450
451 private boolean checkFtpTransferStatus() throws FtpNoTransferException {
452 if (isCheckAlreadyCalled) {
453 logger.warn("Check: ALREADY CALLED");
454 return true;
455 }
456 if (isExecutingCommandFinished) {
457
458 logger.warn("Check: already Finished");
459 if (commandFinishing != null) {
460 commandFinishing.cancel();
461 }
462 throw new FtpNoTransferException("No transfer running");
463 }
464 if (!isDataNetworkHandlerReady) {
465
466 logger.warn("Check: already DNH not ready");
467 throw new FtpNoTransferException("No connection");
468 }
469 isCheckAlreadyCalled = true;
470 FtpTransfer executedTransfer = getExecutingFtpTransfer();
471
472
473 if (FtpCommandCode.isListLikeCommand(executedTransfer.getCommand())) {
474 if (executedTransfer.getStatus()) {
475
476
477 closeTransfer();
478 return false;
479 }
480
481 abortTransfer();
482 return false;
483 } else if (FtpCommandCode.isRetrLikeCommand(executedTransfer
484 .getCommand())) {
485 FtpFile file = null;
486 try {
487 file = executedTransfer.getFtpFile();
488 } catch (FtpNoFileException e) {
489
490 abortTransfer();
491 return false;
492 }
493 try {
494 if (file.isInReading()) {
495 logger
496 .debug("Check: Retr FtpFile still in reading KO");
497 abortTransfer();
498 } else {
499 logger
500 .debug("Check: Retr FtpFile no more in reading OK");
501 closeTransfer();
502 }
503 } catch (CommandAbstractException e) {
504 logger.warn("Retr Test is in Reading problem", e);
505 closeTransfer();
506 }
507 return false;
508 } else if (FtpCommandCode.isStoreLikeCommand(executedTransfer
509 .getCommand())) {
510
511 closeTransfer();
512 return false;
513 } else {
514 logger.warn("Check: Unknown command");
515 abortTransfer();
516 }
517 return false;
518 }
519
520
521
522
523 private void abortTransfer() {
524
525 FtpFile file = null;
526 FtpTransfer current = null;
527 try {
528 current = getExecutingFtpTransfer();
529 file = current.getFtpFile();
530 file.abortFile();
531 } catch (FtpNoTransferException e) {
532 logger.warn("Abort problem", e);
533 } catch (FtpNoFileException e) {
534 } catch (CommandAbstractException e) {
535 logger.warn("Abort problem", e);
536 }
537 if (current != null) {
538 current.setStatus(false);
539 }
540 endDataConnection();
541 session.setReplyCode(
542 ReplyCode.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
543 "Transfer aborted for " +
544 (current == null? "Unknown command" : current
545 .toString()));
546 if (current != null) {
547 if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
548 try {
549 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
550 } catch (CommandAbstractException e) {
551 session.setReplyCode(e);
552 }
553 }
554 }
555 finalizeExecution();
556 }
557
558
559
560
561
562 private void closeTransfer() {
563
564 FtpFile file = null;
565 FtpTransfer current = null;
566 try {
567 current = getExecutingFtpTransfer();
568 file = current.getFtpFile();
569 file.closeFile();
570 } catch (FtpNoTransferException e) {
571 logger.warn("Close problem", e);
572 } catch (FtpNoFileException e) {
573 } catch (CommandAbstractException e) {
574 logger.warn("Close problem", e);
575 }
576 if (current != null) {
577 current.setStatus(true);
578 }
579 if (session.getDataConn().isStreamFile()) {
580 endDataConnection();
581 }
582 session.setReplyCode(ReplyCode.REPLY_250_REQUESTED_FILE_ACTION_OKAY,
583 "Transfer correctly finished for " +
584 (current == null? "Unknown command" : current
585 .toString()));
586 if (current != null) {
587 if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
588 try {
589 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
590 } catch (CommandAbstractException e) {
591 session.setReplyCode(e);
592 }
593 } else {
594
595 try {
596 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
597 } catch (InterruptedException e) {
598 }
599 }
600 }
601 finalizeExecution();
602 }
603
604
605
606
607
608
609 public void setEndOfTransfer() {
610 try {
611 checkFtpTransferStatus();
612 } catch (FtpNoTransferException e) {
613 return;
614 }
615 }
616
617
618
619
620
621
622
623
624 public void setTransferAbortedFromInternal(boolean write) {
625
626 abortTransfer();
627 if (write) {
628 session.getNetworkHandler().writeIntermediateAnswer();
629 }
630 if (endOfCommand != null) {
631 endOfCommand.cancel();
632 }
633 }
634
635
636
637
638
639
640 public void setPreEndOfTransfer() {
641 if (endOfCommand != null) {
642 endOfCommand.setSuccess();
643 }
644 }
645
646
647
648
649
650
651
652 public void waitForEndOfTransfer() throws InterruptedException {
653 if (endOfCommand != null) {
654 endOfCommand.await();
655 if (endOfCommand.isFailed()) {
656 throw new InterruptedException("Transfer aborted");
657 }
658 }
659
660 }
661
662
663
664
665
666
667 private void finalizeExecution() {
668
669 isExecutingCommandFinished = true;
670 if (commandFinishing != null) {
671 commandFinishing.setSuccess();
672 }
673 executingCommand = null;
674 }
675
676
677
678
679
680 private void endDataConnection() {
681
682 if (isDataNetworkHandlerReady) {
683 isDataNetworkHandlerReady = false;
684 Channels.close(dataChannel);
685 if (closedDataChannel != null) {
686 try {
687 closedDataChannel.await(session.getConfiguration().TIMEOUTCON,
688 TimeUnit.MILLISECONDS);
689 } catch (InterruptedException e) {
690 }
691 }
692
693 dataChannel = null;
694 }
695 }
696
697
698
699
700
701
702
703 public void clear() {
704
705 endDataConnection();
706 finalizeExecution();
707 if (closedDataChannel != null) {
708 closedDataChannel.cancel();
709 }
710 if (endOfCommand != null) {
711 endOfCommand.cancel();
712 }
713 if (waitForOpenedDataChannel != null) {
714 waitForOpenedDataChannel.cancel();
715 }
716 if (executorService != null) {
717 executorService.shutdownNow();
718 executorService = null;
719 }
720 }
721 }