View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.control;
22  
23  import goldengate.common.command.ReplyCode;
24  import goldengate.common.command.exception.CommandAbstractException;
25  import goldengate.common.command.exception.Reply503Exception;
26  import goldengate.common.logging.GgInternalLogger;
27  import goldengate.common.logging.GgInternalLoggerFactory;
28  import goldengate.ftp.core.command.AbstractCommand;
29  import goldengate.ftp.core.command.FtpCommandCode;
30  import goldengate.ftp.core.command.internal.ConnectionCommand;
31  import goldengate.ftp.core.command.internal.IncorrectCommand;
32  import goldengate.ftp.core.config.FtpInternalConfiguration;
33  import goldengate.ftp.core.data.FtpTransferControl;
34  import goldengate.ftp.core.session.FtpSession;
35  import goldengate.ftp.core.utils.FtpChannelUtils;
36  
37  import java.io.IOException;
38  import java.net.ConnectException;
39  import java.nio.channels.ClosedChannelException;
40  
41  import org.jboss.netty.channel.Channel;
42  import org.jboss.netty.channel.ChannelException;
43  import org.jboss.netty.channel.ChannelFuture;
44  import org.jboss.netty.channel.ChannelFutureListener;
45  import org.jboss.netty.channel.ChannelHandlerContext;
46  import org.jboss.netty.channel.ChannelStateEvent;
47  import org.jboss.netty.channel.Channels;
48  import org.jboss.netty.channel.ExceptionEvent;
49  import org.jboss.netty.channel.MessageEvent;
50  import org.jboss.netty.channel.SimpleChannelHandler;
51  
52  /**
53   * Main Network Handler (Control part) implementing RFC 959, 775, 2389, 2428,
54   * 3659 and supports XCRC and XMD5 commands.
55   *
56   * @author Frederic Bregier
57   *
58   */
59  public class NetworkHandler extends SimpleChannelHandler {
60      /**
61       * Internal Logger
62       */
63      private static final GgInternalLogger logger = GgInternalLoggerFactory
64              .getLogger(NetworkHandler.class);
65  
66      /**
67       * Business Handler
68       */
69      private final BusinessHandler businessHandler;
70  
71      /**
72       * Internal store for the SessionInterface
73       */
74      private final FtpSession session;
75  
76      /**
77       * The associated Channel
78       */
79      private Channel controlChannel = null;
80  
81      /**
82       * Constructor from session
83       *
84       * @param session
85       */
86      public NetworkHandler(FtpSession session) {
87          super();
88          this.session = session;
89          businessHandler = session.getBusinessHandler();
90          businessHandler.setNetworkHandler(this);
91      }
92  
93      /**
94       * @return the businessHandler
95       */
96      public BusinessHandler getBusinessHandler() {
97          return businessHandler;
98      }
99  
100     /**
101      * @return the session
102      */
103     public FtpSession getFtpSession() {
104         return session;
105     }
106 
107     /**
108      *
109      * @return the Control Channel
110      */
111     public Channel getControlChannel() {
112         return controlChannel;
113     }
114 
115     /**
116      * Run firstly executeChannelClosed.
117      *
118      * @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext,
119      *      org.jboss.netty.channel.ChannelStateEvent)
120      */
121     @Override
122     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
123             throws Exception {
124         if (session == null || session.getDataConn() == null ||
125                 session.getDataConn().getFtpTransferControl() == null) {
126             super.channelClosed(ctx, e);
127             return;
128         }
129         // Wait for any command running before closing (bad client sometimes
130         // don't wait for answer)
131         int limit = 100;
132         while (session.getDataConn().getFtpTransferControl()
133                 .isFtpTransferExecuting()) {
134             Thread.sleep(10);
135             limit --;
136             if (limit <= 0) {
137                 logger
138                         .warn("Waiting for transfer finished but 1s is not enough");
139                 break; // wait at most 1s
140             }
141         }
142         businessHandler.executeChannelClosed();
143         // release file and other permanent objects
144         businessHandler.clear();
145         session.clear();
146         super.channelClosed(ctx, e);
147     }
148 
149     /**
150      * Initialiaze the Handler.
151      *
152      * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext,
153      *      org.jboss.netty.channel.ChannelStateEvent)
154      */
155     @Override
156     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
157         Channel channel = e.getChannel();
158         controlChannel = channel;
159         session.setControlConnected();
160         FtpChannelUtils.addCommandChannel(channel, session.getConfiguration());
161         if (isStillAlive()) {
162             // Make the first execution ready
163             AbstractCommand command = new ConnectionCommand(getFtpSession());
164             session.setNextCommand(command);
165             // This command can change the next Command
166             businessHandler.executeChannelConnected(channel);
167             // Answer ready to continue from first command = Connection
168             messageRunAnswer();
169             getFtpSession().setReady(true);
170         }
171     }
172 
173     /**
174      * If the service is going to shutdown, it sends back a 421 message to the
175      * connection
176      *
177      * @return True if the service is alive, else False if the system is going
178      *         down
179      */
180     private boolean isStillAlive() {
181         if (session.getConfiguration().isShutdown) {
182             session.setExitErrorCode("Service is going down: disconnect");
183             writeFinalAnswer();
184             return false;
185         }
186         return true;
187     }
188 
189     /**
190      * Default exception task: close the current connection after calling
191      * exceptionLocalCaught and writing if possible the current replyCode.
192      *
193      * @see org.jboss.netty.channel.SimpleChannelHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext,
194      *      org.jboss.netty.channel.ExceptionEvent)
195      */
196     @Override
197     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
198         Throwable e1 = e.getCause();
199         Channel channel = e.getChannel();
200         if (session == null) {
201             // should not be
202             logger.warn("NO SESSION", e1);
203             return;
204         }
205         if (e1 instanceof ConnectException) {
206             ConnectException e2 = (ConnectException) e1;
207             logger.warn("Connection impossible since {} with Channel {}", e2
208                     .getMessage(), e.getChannel());
209         } else if (e1 instanceof ChannelException) {
210             ChannelException e2 = (ChannelException) e1;
211             logger
212                     .warn(
213                             "Connection (example: timeout) impossible since {} with Channel {}",
214                             e2.getMessage(), e.getChannel());
215         } else if (e1 instanceof ClosedChannelException) {
216             logger.debug("Connection closed before end");
217         } else if (e1 instanceof CommandAbstractException) {
218             // FTP Exception: not close if not necessary
219             CommandAbstractException e2 = (CommandAbstractException) e1;
220             logger.warn("Command Error Reply {}", e2.getMessage());
221             session.setReplyCode(e2);
222             businessHandler.afterRunCommandKo(e2);
223             if (channel.isConnected()) {
224                 writeFinalAnswer();
225             }
226             return;
227         } else if (e1 instanceof NullPointerException) {
228             NullPointerException e2 = (NullPointerException) e1;
229             logger.warn("Null pointer Exception", e2);
230             try {
231                 if (session != null) {
232                     session.setExitErrorCode("Internal error: disconnect");
233                     if (businessHandler != null &&
234                             session.getDataConn() != null) {
235                         businessHandler.exceptionLocalCaught(e);
236                         if (channel.isConnected()) {
237                             writeFinalAnswer();
238                         }
239                     }
240                 }
241             } catch (NullPointerException e3) {
242             }
243             return;
244         } else if (e1 instanceof IOException) {
245             IOException e2 = (IOException) e1;
246             logger.warn("Connection aborted since {} with Channel {}", e2
247                     .getMessage(), e.getChannel());
248         } else {
249             logger.warn("Unexpected exception from downstream" +
250                     " Ref Channel: {}" + e.getChannel().toString(), e1.getMessage());
251         }
252         session.setExitErrorCode("Internal error: disconnect");
253         businessHandler.exceptionLocalCaught(e);
254         if (channel.isConnected()) {
255             writeFinalAnswer();
256         }
257     }
258 
259     /**
260      * Simply call messageRun with the received message
261      *
262      * @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
263      *      org.jboss.netty.channel.MessageEvent)
264      */
265     @Override
266     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
267         if (isStillAlive()) {
268             // First wait for the initialization to be fully done
269             while (!session.isReady()) {
270                 try {
271                     Thread.sleep(10);
272                 } catch (InterruptedException e1) {
273                 }
274             }
275             String message = (String) e.getMessage();
276             AbstractCommand command = FtpCommandCode.getFromLine(
277                     getFtpSession(), message);
278             logger.debug("RECVMSG: {} CMD: {}", message, command.getCommand());
279             // First check if the command is an ABORT, QUIT or STAT
280             if (!FtpCommandCode.isSpecialCommand(command.getCode())) {
281                 // Now check if a transfer is on its way: illegal to have at
282                 // same time two commands (except ABORT). Wait is at most 100
283                 // RETRYINMS=1s
284                 boolean notFinished = true;
285                 FtpTransferControl control = session.getDataConn().getFtpTransferControl();
286                 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 100; i ++) {
287                     if (control.isFtpTransferExecuting() ||
288                             (!session.isCurrentCommandFinished())) {
289                         try {
290                             Thread.sleep(FtpInternalConfiguration.RETRYINMS);
291                         } catch (InterruptedException e1) {
292                             break;
293                         }
294                     } else {
295                         notFinished = false;
296                         break;
297                     }
298                 }
299                 if (notFinished) {
300                     session.setReplyCode(
301                             ReplyCode.REPLY_503_BAD_SEQUENCE_OF_COMMANDS,
302                             "Previous transfer command is not finished yet");
303                     businessHandler.afterRunCommandKo(
304                             new Reply503Exception(session.getReplyCode().getMesg()));
305                     writeIntermediateAnswer();
306                     return;
307                 }
308             }
309             // Default message
310             session.setReplyCode(ReplyCode.REPLY_200_COMMAND_OKAY, null);
311             if (session.getCurrentCommand().isNextCommandValid(command)) {
312                 session.setNextCommand(command);
313                 messageRunAnswer();
314             } else {
315                 command = new IncorrectCommand();
316                 command.setArgs(getFtpSession(), message, null,
317                         FtpCommandCode.IncorrectSequence);
318                 session.setNextCommand(command);
319                 messageRunAnswer();
320             }
321         }
322     }
323 
324     /**
325      * Write the current answer and eventually close channel if necessary (421
326      * or 221)
327      *
328      * @return True if the channel is closed due to the code
329      */
330     private boolean writeFinalAnswer() {
331         if (session.getReplyCode() == ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION ||
332                 session.getReplyCode() == ReplyCode.REPLY_221_CLOSING_CONTROL_CONNECTION) {
333             session.getDataConn().getFtpTransferControl().clear();
334             writeIntermediateAnswer().addListener(ChannelFutureListener.CLOSE);
335             return true;
336         }
337         writeIntermediateAnswer();
338         session.setCurrentCommandFinished();
339         return false;
340     }
341 
342     /**
343      * Write an intermediate Answer from Business before last answer also set by
344      * the Business
345      *
346      * @return the ChannelFuture associated with the write
347      */
348     public ChannelFuture writeIntermediateAnswer() {
349         return Channels.write(controlChannel, session.getAnswer());
350     }
351 
352     /**
353      * Execute one command and write the following answer
354      */
355     private void messageRunAnswer() {
356         boolean error = false;
357         try {
358             businessHandler.beforeRunCommand();
359             AbstractCommand command = session.getCurrentCommand();
360             logger.debug("Run {}", command.getCommand());
361             command.exec();
362             businessHandler.afterRunCommandOk();
363         } catch (CommandAbstractException e) {
364             error = true;
365             session.setReplyCode(e);
366             businessHandler.afterRunCommandKo(e);
367         }
368         if (error || session.getCurrentCommand().getCode() != FtpCommandCode.INTERNALSHUTDOWN) {
369             writeFinalAnswer();
370         }
371     }
372 }