View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.data;
22  
23  import goldengate.common.command.ReplyCode;
24  import goldengate.common.command.exception.CommandAbstractException;
25  import goldengate.common.command.exception.Reply425Exception;
26  import goldengate.common.future.GgChannelFuture;
27  import goldengate.common.future.GgFuture;
28  import goldengate.common.logging.GgInternalLogger;
29  import goldengate.common.logging.GgInternalLoggerFactory;
30  import goldengate.ftp.core.command.FtpCommandCode;
31  import goldengate.ftp.core.command.service.ABOR;
32  import goldengate.ftp.core.config.FtpInternalConfiguration;
33  import goldengate.ftp.core.control.NetworkHandler;
34  import goldengate.ftp.core.data.handler.DataNetworkHandler;
35  import goldengate.ftp.core.exception.FtpNoConnectionException;
36  import goldengate.ftp.core.exception.FtpNoFileException;
37  import goldengate.ftp.core.exception.FtpNoTransferException;
38  import goldengate.ftp.core.file.FtpFile;
39  import goldengate.ftp.core.session.FtpSession;
40  
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.util.List;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.TimeUnit;
47  
48  import org.jboss.netty.bootstrap.ClientBootstrap;
49  import org.jboss.netty.channel.Channel;
50  import org.jboss.netty.channel.ChannelFuture;
51  import org.jboss.netty.channel.Channels;
52  
53  /**
54   * Main class that handles transfers and their execution
55   *
56   * @author Frederic Bregier
57   *
58   */
59  public class FtpTransferControl {
60      /**
61       * Internal Logger
62       */
63      private static final GgInternalLogger logger = GgInternalLoggerFactory
64              .getLogger(FtpTransferControl.class);
65  
66      /**
67       * SessionInterface
68       */
69      private final FtpSession session;
70  
71      /**
72       * Step in order to wait that the DataNetworkHandler is ready
73       */
74      private volatile boolean isDataNetworkHandlerReady = false;
75  
76      /**
77       * The associated DataChannel
78       */
79      private volatile Channel dataChannel = null;
80  
81      /**
82       * Waiter for the dataChannel to be opened
83       */
84      private volatile GgChannelFuture waitForOpenedDataChannel = new GgChannelFuture(
85              true);
86  
87      /**
88       * Waiter for the dataChannel to be closed
89       */
90      private volatile GgFuture closedDataChannel = null;
91  
92      /**
93       * Is the current Command Finished (or previously current command)
94       */
95      private volatile boolean isExecutingCommandFinished = true;
96      /**
97       * Waiter for the Command finishing
98       */
99      private volatile GgFuture commandFinishing = null;
100 
101     /**
102      * Current command executed
103      */
104     private FtpTransfer executingCommand = null;
105 
106     /**
107      * Thread pool for execution of transfer command
108      */
109     private ExecutorService executorService = null;
110 
111     /**
112      * Blocking step for the Executor in order to wait for the end of the
113      * command (internal wait, not to be used outside).
114      */
115     private volatile GgFuture endOfCommand = null;
116 
117     /**
118      * A boolean to know if Check was called once
119      */
120     private volatile boolean isCheckAlreadyCalled = false;
121 
122     /**
123      *
124      * @param session
125      */
126     public FtpTransferControl(FtpSession session) {
127         this.session = session;
128         endOfCommand = null;
129     }
130 
131     // XXX DataNetworkHandler functions
132     /**
133      * The DataNetworkHandler is ready (from setNewFtpExecuteTransfer)
134      *
135      */
136     private void setDataNetworkHandlerReady() {
137         isCheckAlreadyCalled = false;
138         if (isDataNetworkHandlerReady) {
139             return;
140         }
141         isDataNetworkHandlerReady = true;
142     }
143 
144     /**
145      * Wait for the DataNetworkHandler to be ready (from trueRetrieve of
146      * {@link FtpFile})
147      *
148      * @throws InterruptedException
149      *
150      */
151     public void waitForDataNetworkHandlerReady() throws InterruptedException {
152         if (!isDataNetworkHandlerReady) {
153             // logger.debug("Wait for DataNetwork Ready over {}");
154             throw new InterruptedException("Bad initialization");
155         }
156     }
157 
158     /**
159      * Set the new opened Channel (from channelConnected of
160      * {@link DataNetworkHandler})
161      *
162      * @param channel
163      * @param dataNetworkHandler
164      */
165     public void setOpenedDataChannel(Channel channel,
166             DataNetworkHandler dataNetworkHandler) {
167         if (channel != null) {
168             session.getDataConn().setDataNetworkHandler(dataNetworkHandler);
169             waitForOpenedDataChannel.setChannel(channel);
170             waitForOpenedDataChannel.setSuccess();
171         } else {
172             waitForOpenedDataChannel.cancel();
173         }
174     }
175 
176     /**
177      * Wait that the new opened connection is ready (same method in
178      * {@link FtpDataAsyncConn} from openConnection)
179      *
180      * @return the new opened Channel
181      * @throws InterruptedException
182      */
183     public Channel waitForOpenedDataChannel() throws InterruptedException {
184         Channel channel = null;
185         if (waitForOpenedDataChannel.await(
186                 session.getConfiguration().TIMEOUTCON + 1000,
187                 TimeUnit.MILLISECONDS)) {
188             if (waitForOpenedDataChannel.isSuccess()) {
189                 channel = waitForOpenedDataChannel.getChannel();
190             } else {
191                 logger.warn("data connection is in error");
192             }
193         } else {
194             logger.warn("Timeout occurs during data connection");
195         }
196         waitForOpenedDataChannel = new GgChannelFuture(true);
197         return channel;
198     }
199 
200     /**
201      * Set the closed Channel (from channelClosed of {@link DataNetworkHandler})
202      */
203     public void setClosedDataChannel() {
204         if (closedDataChannel != null) {
205             closedDataChannel.setSuccess();
206         }
207     }
208 
209     /**
210      * Wait for the client to be connected (Passive) or Wait for the server to
211      * be connected to the client (Active)
212      *
213      * @return True if the connection is OK
214      * @throws Reply425Exception
215      */
216     public boolean openDataConnection() throws Reply425Exception {
217         // Prepare this Data channel to be closed ;-)
218         // In fact, prepare the future close op which should occur since it is
219         // now opened
220         closedDataChannel = new GgFuture(true);
221         FtpDataAsyncConn dataAsyncConn = session.getDataConn();
222         if (!dataAsyncConn.isStreamFile()) {
223             // FIXME isConnected or isDNHReady ?
224             if (dataAsyncConn.isConnected()) {
225                 // Already connected
226                 // logger.debug("Connection already open");
227                 session.setReplyCode(
228                         ReplyCode.REPLY_125_DATA_CONNECTION_ALREADY_OPEN,
229                         dataAsyncConn.getType().name() +
230                                 " mode data connection already open");
231                 return true;
232             }
233         } else {
234             // Stream, Data Connection should not be opened
235             if (dataAsyncConn.isConnected()) {
236                 logger
237                         .error("Connection already open but should not since in Stream mode");
238                 setTransferAbortedFromInternal(false);
239                 throw new Reply425Exception("Connection already open but should not since in Stream mode");
240             }
241         }
242         // Need to open connection
243         session.setReplyCode(ReplyCode.REPLY_150_FILE_STATUS_OKAY, "Opening " +
244                 dataAsyncConn.getType().name() + " mode data connection");
245         if (dataAsyncConn.isPassiveMode()) {
246             if (!dataAsyncConn.isBind()) {
247                 // No passive connection prepared
248                 throw new Reply425Exception(
249                         "No passive data connection prepared");
250             }
251             // Wait for the connection to be done by the client
252             // logger.debug("Passive mode standby");
253             try {
254                 dataChannel = waitForOpenedDataChannel();
255                 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
256             } catch (InterruptedException e) {
257                 logger.warn("Connection abort in passive mode", e);
258                 // Cannot open connection
259                 throw new Reply425Exception(
260                         "Cannot open passive data connection");
261             }
262             // logger.debug("Passive mode connected");
263         } else {
264             // Wait for the server to be connected to the client
265             InetAddress inetAddress = dataAsyncConn.getLocalAddress()
266                     .getAddress();
267             InetSocketAddress inetSocketAddress = dataAsyncConn
268                     .getRemoteAddress();
269             if (session.getConfiguration().getFtpInternalConfiguration()
270                     .hasFtpSession(inetAddress, inetSocketAddress)) {
271                 throw new Reply425Exception(
272                         "Cannot open active data connection since remote address is already in use: " +
273                                 inetSocketAddress);
274             }
275             // logger.debug("Active mode standby");
276             ClientBootstrap clientBootstrap = session.getConfiguration()
277                     .getFtpInternalConfiguration().getActiveBootstrap();
278             session.getConfiguration().setNewFtpSession(inetAddress,
279                     inetSocketAddress, session);
280             // Set the session for the future dataChannel
281             String mylog = session.toString();
282             logger.debug("DataConn for: "+session.getCurrentCommand().getCommand()+" to "+inetSocketAddress.toString());
283             ChannelFuture future = clientBootstrap.connect(inetSocketAddress, 
284                     dataAsyncConn.getLocalAddress());
285             try {
286                 future.await();
287             } catch (InterruptedException e1) {
288             }
289             if (!future.isSuccess()) {
290                 logger.warn("Connection abort in active mode from future while session: "+
291                         session.toString()+
292                         "\nTrying connect to: "+inetSocketAddress.toString()+
293                         "\nWas: "+mylog,
294                         future.getCause());
295                 // Cannot open connection
296                 session.getConfiguration().delFtpSession(inetAddress,
297                         inetSocketAddress);
298                 throw new Reply425Exception(
299                         "Cannot open active data connection");
300             }
301             try {
302                 dataChannel = waitForOpenedDataChannel();
303                 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
304             } catch (InterruptedException e) {
305                 logger.warn("Connection abort in active mode", e);
306                 // Cannot open connection
307                 session.getConfiguration().delFtpSession(inetAddress,
308                         inetSocketAddress);
309                 throw new Reply425Exception(
310                         "Cannot open active data connection");
311             }
312             // logger.debug("Active mode connected");
313         }
314         if (dataChannel == null) {
315             // Cannot have a new Data connection since shutdown
316             if (!dataAsyncConn.isPassiveMode()) {
317                 session.getConfiguration().getFtpInternalConfiguration()
318                         .delFtpSession(
319                                 dataAsyncConn.getLocalAddress().getAddress(),
320                                 dataAsyncConn.getRemoteAddress());
321             }
322             throw new Reply425Exception(
323                     "Cannot open data connection, shuting down");
324         }
325         return true;
326     }
327 
328     // XXX FtpTransfer functions
329     /**
330      * Run the command from an executor
331      */
332     private void runExecutor() {
333         // Unlock Mode Codec
334         try {
335             session.getDataConn().getDataNetworkHandler().unlockModeCodec();
336         } catch (FtpNoConnectionException e) {
337             setTransferAbortedFromInternal(false);
338             return;
339         }
340         // Run the command
341         if (executorService == null) {
342             executorService = Executors.newSingleThreadExecutor();
343         }
344         endOfCommand = new GgFuture(true);
345         executorService.execute(new FtpTransferExecutor(session,
346                 executingCommand));
347         try {
348             commandFinishing.await();
349         } catch (InterruptedException e) {
350         }
351     }
352 
353     /**
354      * Add a new transfer to be executed. This is to be called from Command
355      * after connection is opened and before answering to the client that
356      * command is ready to be executed (for Store or Retrieve like operations).
357      *
358      * @param command
359      * @param file
360      */
361     public void setNewFtpTransfer(FtpCommandCode command, FtpFile file) {
362         isExecutingCommandFinished = false;
363         commandFinishing = new GgFuture(true);
364         // logger.debug("setNewCommand: {}", command);
365         setDataNetworkHandlerReady();
366         executingCommand = new FtpTransfer(command, file);
367         runExecutor();
368         commandFinishing = null;
369     }
370 
371     /**
372      * Add a new transfer to be executed. This is to be called from Command
373      * after connection is opened and before answering to the client that
374      * command is ready to be executed (for List like operations).
375      *
376      * @param command
377      * @param list
378      * @param path
379      *            as Original Path
380      */
381     public void setNewFtpTransfer(FtpCommandCode command, List<String> list,
382             String path) {
383         isExecutingCommandFinished = false;
384         commandFinishing = new GgFuture(true);
385         // logger.debug("setNewCommand: {}", command);
386         setDataNetworkHandlerReady();
387         executingCommand = new FtpTransfer(command, list, path);
388         runExecutor();
389         commandFinishing = null;
390     }
391 
392     /**
393      * Is a command currently executing (called from {@link NetworkHandler} when
394      * a message is received to see if another transfer command is already in
395      * execution, which is not allowed)
396      *
397      * @return True if a command is currently executing
398      */
399     public boolean isFtpTransferExecuting() {
400         return !isExecutingCommandFinished;
401     }
402 
403     /**
404      *
405      * @return the current executing FtpTransfer
406      * @throws FtpNoTransferException
407      */
408     public FtpTransfer getExecutingFtpTransfer() throws FtpNoTransferException {
409         if (executingCommand != null) {
410             return executingCommand;
411         }
412         throw new FtpNoTransferException("No Command currently running");
413     }
414 
415     /**
416      *
417      * @return True if the current FtpTransfer is a Retrieve like transfer
418      * @throws FtpNoTransferException
419      * @throws CommandAbstractException
420      * @throws FtpNoFileException
421      */
422     private boolean isExecutingRetrLikeTransfer()
423             throws FtpNoTransferException, CommandAbstractException,
424             FtpNoFileException {
425         return FtpCommandCode.isRetrLikeCommand(getExecutingFtpTransfer()
426                 .getCommand()) &&
427                 getExecutingFtpTransfer().getFtpFile().isInReading();
428     }
429 
430     /**
431      * Run the retrieve operation if necessary (called from
432      * channelInterestChanged in {@link DataNetworkHandler})
433      */
434     public void runTrueRetrieve() {
435         try {
436             if (isExecutingRetrLikeTransfer()) {
437                 getExecutingFtpTransfer().getFtpFile().trueRetrieve();
438             }
439         } catch (CommandAbstractException e) {
440         } catch (FtpNoTransferException e) {
441         } catch (FtpNoFileException e) {
442         }
443     }
444 
445     /**
446      * Called when a transfer is finished from setEndOfTransfer
447      *
448      * @return True if it was already called before
449      * @throws FtpNoTransferException
450      */
451     private boolean checkFtpTransferStatus() throws FtpNoTransferException {
452         if (isCheckAlreadyCalled) {
453             logger.warn("Check: ALREADY CALLED");
454             return true;
455         }
456         if (isExecutingCommandFinished) {
457             // already done
458             logger.warn("Check: already Finished");
459             if (commandFinishing != null) {
460                 commandFinishing.cancel();
461             }
462             throw new FtpNoTransferException("No transfer running");
463         }
464         if (!isDataNetworkHandlerReady) {
465             // already done
466             logger.warn("Check: already DNH not ready");
467             throw new FtpNoTransferException("No connection");
468         }
469         isCheckAlreadyCalled = true;
470         FtpTransfer executedTransfer = getExecutingFtpTransfer();
471         // logger.debug("Check: command {}", executedTransfer.getCommand());
472         // DNH is ready and Transfer is running
473         if (FtpCommandCode.isListLikeCommand(executedTransfer.getCommand())) {
474             if (executedTransfer.getStatus()) {
475                 // Special status for List Like command
476                 // logger.debug("Check: List OK");
477                 closeTransfer();
478                 return false;
479             }
480             // logger.debug("Check: List Ko");
481             abortTransfer();
482             return false;
483         } else if (FtpCommandCode.isRetrLikeCommand(executedTransfer
484                 .getCommand())) {
485             FtpFile file = null;
486             try {
487                 file = executedTransfer.getFtpFile();
488             } catch (FtpNoFileException e) {
489                 // logger.debug("Check: Retr no FtpFile for Retr");
490                 abortTransfer();
491                 return false;
492             }
493             try {
494                 if (file.isInReading()) {
495                     logger
496                             .debug("Check: Retr FtpFile still in reading KO");
497                     abortTransfer();
498                 } else {
499                     logger
500                             .debug("Check: Retr FtpFile no more in reading OK");
501                     closeTransfer();
502                 }
503             } catch (CommandAbstractException e) {
504                 logger.warn("Retr Test is in Reading problem", e);
505                 closeTransfer();
506             }
507             return false;
508         } else if (FtpCommandCode.isStoreLikeCommand(executedTransfer
509                 .getCommand())) {
510             // logger.debug("Check: Store OK");
511             closeTransfer();
512             return false;
513         } else {
514             logger.warn("Check: Unknown command");
515             abortTransfer();
516         }
517         return false;
518     }
519 
520     /**
521      * Abort the current transfer
522      */
523     private void abortTransfer() {
524         // logger.debug("Will abort transfer and write: ", write);
525         FtpFile file = null;
526         FtpTransfer current = null;
527         try {
528             current = getExecutingFtpTransfer();
529             file = current.getFtpFile();
530             file.abortFile();
531         } catch (FtpNoTransferException e) {
532             logger.warn("Abort problem", e);
533         } catch (FtpNoFileException e) {
534         } catch (CommandAbstractException e) {
535             logger.warn("Abort problem", e);
536         }
537         if (current != null) {
538             current.setStatus(false);
539         }
540         endDataConnection();
541         session.setReplyCode(
542                 ReplyCode.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
543                 "Transfer aborted for " +
544                         (current == null? "Unknown command" : current
545                                 .toString()));
546         if (current != null) {
547             if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
548                 try {
549                     session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
550                 } catch (CommandAbstractException e) {
551                     session.setReplyCode(e);
552                 }
553             }
554         }
555         finalizeExecution();
556     }
557 
558     /**
559      * Finish correctly a transfer
560      *
561      */
562     private void closeTransfer() {
563         // logger.debug("Will close transfer and write: {}", write);
564         FtpFile file = null;
565         FtpTransfer current = null;
566         try {
567             current = getExecutingFtpTransfer();
568             file = current.getFtpFile();
569             file.closeFile();
570         } catch (FtpNoTransferException e) {
571             logger.warn("Close problem", e);
572         } catch (FtpNoFileException e) {
573         } catch (CommandAbstractException e) {
574             logger.warn("Close problem", e);
575         }
576         if (current != null) {
577             current.setStatus(true);
578         }
579         if (session.getDataConn().isStreamFile()) {
580             endDataConnection();
581         }
582         session.setReplyCode(ReplyCode.REPLY_250_REQUESTED_FILE_ACTION_OKAY,
583                 "Transfer correctly finished for " +
584                         (current == null? "Unknown command" : current
585                                 .toString()));
586         if (current != null) {
587             if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
588                 try {
589                     session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
590                 } catch (CommandAbstractException e) {
591                     session.setReplyCode(e);
592                 }
593             } else {
594                 // Special wait to prevent fast LIST following by STOR or RETR command
595                 try {
596                     Thread.sleep(FtpInternalConfiguration.RETRYINMS);
597                 } catch (InterruptedException e) {
598                 }
599             }
600         }
601         finalizeExecution();
602     }
603 
604     /**
605      * Set the current transfer as finished. Called from
606      * {@link FtpTransferExecutor} when a transfer is over.
607      *
608      */
609     public void setEndOfTransfer() {
610         try {
611             checkFtpTransferStatus();
612         } catch (FtpNoTransferException e) {
613             return;
614         }
615     }
616 
617     /**
618      * To enable abort from internal error
619      *
620      * @param write
621      *            True means the message is write back to the control command,
622      *            false it is only prepared
623      */
624     public void setTransferAbortedFromInternal(boolean write) {
625         // logger.debug("Set transfer aborted internal {}", write);
626         abortTransfer();
627         if (write) {
628             session.getNetworkHandler().writeIntermediateAnswer();
629         }
630         if (endOfCommand != null) {
631             endOfCommand.cancel();
632         }
633     }
634 
635     /**
636      * Called by messageReceived, channelClosed (from {@link DataNetworkHandler}
637      * ) and trueRetrieve (from {@link FtpFile}) when the transfer is over
638      * or by channelClosed
639      */
640     public void setPreEndOfTransfer() {
641         if (endOfCommand != null) {
642             endOfCommand.setSuccess();
643         }
644     }
645 
646     /**
647      * Wait for the current transfer to finish, called from
648      * {@link FtpTransferExecutor}
649      *
650      * @throws InterruptedException
651      */
652     public void waitForEndOfTransfer() throws InterruptedException {
653         if (endOfCommand != null) {
654             endOfCommand.await();
655             if (endOfCommand.isFailed()) {
656                 throw new InterruptedException("Transfer aborted");
657             }
658         }
659         // logger.debug("waitEndOfCommand over");
660     }
661 
662     // XXX ExecutorHandler functions
663     /**
664      * Finalize execution
665      *
666      */
667     private void finalizeExecution() {
668         // logger.debug("Finalize execution");
669         isExecutingCommandFinished = true;
670         if (commandFinishing != null) {
671             commandFinishing.setSuccess();
672         }
673         executingCommand = null;
674     }
675 
676     // XXX Finalize of Transfer
677     /**
678      * End the data connection if any
679      */
680     private void endDataConnection() {
681         // logger.debug("End Data connection");
682         if (isDataNetworkHandlerReady) {
683             isDataNetworkHandlerReady = false;
684             Channels.close(dataChannel);
685             if (closedDataChannel != null) {
686                  try {
687                     closedDataChannel.await(session.getConfiguration().TIMEOUTCON,
688                             TimeUnit.MILLISECONDS);
689                 } catch (InterruptedException e) {
690                 }
691             }
692             // logger.debug("waitForClosedDataChannel over");
693             dataChannel = null;
694         }
695     }
696 
697     /**
698      * Clear the FtpTransferControl (called when the data connection must be
699      * over like from clear of {@link FtpDataAsyncConn}, abort from {@link ABOR}
700      * or ending control connection from {@link NetworkHandler}.
701      *
702      */
703     public void clear() {
704         // logger.debug("Clear Ftp Transfer Control");
705         endDataConnection();
706         finalizeExecution();
707         if (closedDataChannel != null) {
708             closedDataChannel.cancel();
709         }
710         if (endOfCommand != null) {
711             endOfCommand.cancel();
712         }
713         if (waitForOpenedDataChannel != null) {
714             waitForOpenedDataChannel.cancel();
715         }
716         if (executorService != null) {
717             executorService.shutdownNow();
718             executorService = null;
719         }
720     }
721 }