View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.data.handler;
22  
23  import goldengate.common.exception.FileTransferException;
24  import goldengate.common.exception.InvalidArgumentException;
25  import goldengate.common.file.DataBlock;
26  import goldengate.common.logging.GgInternalLogger;
27  import goldengate.common.logging.GgInternalLoggerFactory;
28  import goldengate.ftp.core.config.FtpConfiguration;
29  import goldengate.ftp.core.config.FtpInternalConfiguration;
30  import goldengate.ftp.core.control.NetworkHandler;
31  import goldengate.ftp.core.data.FtpTransferControl;
32  import goldengate.ftp.core.exception.FtpNoConnectionException;
33  import goldengate.ftp.core.exception.FtpNoFileException;
34  import goldengate.ftp.core.exception.FtpNoTransferException;
35  import goldengate.ftp.core.session.FtpSession;
36  import goldengate.ftp.core.utils.FtpChannelUtils;
37  
38  import java.io.IOException;
39  import java.net.BindException;
40  import java.net.ConnectException;
41  import java.nio.channels.CancelledKeyException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.NotYetConnectedException;
44  
45  import org.jboss.netty.buffer.ChannelBuffer;
46  import org.jboss.netty.buffer.ChannelBuffers;
47  import org.jboss.netty.channel.Channel;
48  import org.jboss.netty.channel.ChannelException;
49  import org.jboss.netty.channel.ChannelFuture;
50  import org.jboss.netty.channel.ChannelHandlerContext;
51  import org.jboss.netty.channel.ChannelPipeline;
52  import org.jboss.netty.channel.ChannelStateEvent;
53  import org.jboss.netty.channel.Channels;
54  import org.jboss.netty.channel.ExceptionEvent;
55  import org.jboss.netty.channel.MessageEvent;
56  import org.jboss.netty.channel.SimpleChannelHandler;
57  
58  /**
59   * Network handler for Data connections
60   *
61   * @author Frederic Bregier
62   *
63   */
64  public class DataNetworkHandler extends SimpleChannelHandler {
65      /**
66       * Internal Logger
67       */
68      private static final GgInternalLogger logger = GgInternalLoggerFactory
69              .getLogger(DataNetworkHandler.class);
70  
71      /**
72       * Business Data Handler
73       */
74      private DataBusinessHandler dataBusinessHandler = null;
75  
76      /**
77       * Configuration
78       */
79      private final FtpConfiguration configuration;
80  
81      /**
82       * Is this Data Connection an Active or Passive one
83       */
84      private final boolean isActive;
85  
86      /**
87       * Internal store for the SessionInterface
88       */
89      private FtpSession session = null;
90  
91      /**
92       * The associated Channel
93       */
94      private Channel dataChannel = null;
95  
96      /**
97       * Pipeline
98       */
99      private ChannelPipeline channelPipeline = null;
100 
101     /**
102      * True when the DataNetworkHandler is fully ready (to prevent action before
103      * ready)
104      */
105     private boolean isReady = false;
106 
107     /**
108      * Constructor from DataBusinessHandler
109      *
110      * @param configuration
111      * @param handler
112      * @param active
113      */
114     public DataNetworkHandler(FtpConfiguration configuration,
115             DataBusinessHandler handler, boolean active) {
116         super();
117         this.configuration = configuration;
118         dataBusinessHandler = handler;
119         dataBusinessHandler.setDataNetworkHandler(this);
120         isActive = active;
121     }
122 
123     /**
124      * @return the dataBusinessHandler
125      * @throws FtpNoConnectionException
126      */
127     public DataBusinessHandler getDataBusinessHandler()
128             throws FtpNoConnectionException {
129         if (dataBusinessHandler == null) {
130             throw new FtpNoConnectionException("No Data Connection active");
131         }
132         return dataBusinessHandler;
133     }
134 
135     /**
136      * @return the session
137      */
138     public FtpSession getFtpSession() {
139         return session;
140     }
141 
142     /**
143      *
144      * @return the NetworkHandler associated with the control connection
145      */
146     public NetworkHandler getNetworkHandler() {
147         return session.getBusinessHandler().getNetworkHandler();
148     }
149 
150     /**
151      * Run firstly executeChannelClosed.
152      *
153      * @throws Exception
154      * @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext,
155      *      org.jboss.netty.channel.ChannelStateEvent)
156      */
157     @Override
158     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
159             throws Exception {
160         if (session != null) {
161             session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
162             session.getDataConn().unbindPassive();
163             try {
164                 getDataBusinessHandler().executeChannelClosed();
165                 // release file and other permanent objects
166                 getDataBusinessHandler().clear();
167             } catch (FtpNoConnectionException e1) {
168             }
169             session.getDataConn().getFtpTransferControl()
170                     .setClosedDataChannel();
171             dataBusinessHandler = null;
172             channelPipeline = null;
173             dataChannel = null;
174         }
175         super.channelClosed(ctx, e);
176     }
177 
178     /**
179      * Initialize the Handler.
180      *
181      * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext,
182      *      org.jboss.netty.channel.ChannelStateEvent)
183      */
184     @Override
185     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
186         Channel channel = e.getChannel();
187         // First get the ftpSession from inetaddresses
188         for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i ++) {
189             session = configuration.getFtpSession(channel, isActive);
190             if (session == null) {
191                 logger.warn("Session not found at try " + i);
192                 try {
193                     Thread.sleep(FtpInternalConfiguration.RETRYINMS);
194                 } catch (InterruptedException e1) {
195                     break;
196                 }
197             } else {
198                 break;
199             }
200         }
201         if (session == null) {
202             // Not found !!!
203             logger.error("Session not found!");
204             Channels.close(channel);
205             // Problem: control connection could not be directly informed!!!
206             // Only timeout will occur
207             return;
208         }
209         channelPipeline = ctx.getPipeline();
210         dataChannel = channel;
211         dataBusinessHandler.setFtpSession(getFtpSession());
212         FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
213         if (isStillAlive()) {
214             setCorrectCodec();
215             session.getDataConn().getFtpTransferControl().setOpenedDataChannel(
216                     channel, this);
217         } else {
218             // Cannot continue
219             session.getDataConn().getFtpTransferControl().setOpenedDataChannel(
220                     null, this);
221             return;
222         }
223         isReady = true;
224     }
225 
226     /**
227      * Set the CODEC according to the mode. Must be called after each call of
228      * MODE, STRU or TYPE
229      */
230     public void setCorrectCodec() {
231         FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
232                 .get(FtpDataPipelineFactory.CODEC_MODE);
233         FtpDataTypeCodec typeCodec = (FtpDataTypeCodec) channelPipeline
234                 .get(FtpDataPipelineFactory.CODEC_TYPE);
235         FtpDataStructureCodec structureCodec = (FtpDataStructureCodec) channelPipeline
236                 .get(FtpDataPipelineFactory.CODEC_STRUCTURE);
237         modeCodec.setMode(session.getDataConn().getMode());
238         modeCodec.setStructure(session.getDataConn().getStructure());
239         typeCodec.setFullType(session.getDataConn().getType(), session
240                 .getDataConn().getSubType());
241         structureCodec.setStructure(session.getDataConn().getStructure());
242     }
243 
244     /**
245      * Unlock the Mode Codec from openConnection of {@link FtpTransferControl}
246      *
247      */
248     public void unlockModeCodec() {
249         FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
250                 .get("MODE");
251         modeCodec.setCodecReady();
252     }
253 
254     /**
255      * Default exception task: close the current connection after calling
256      * exceptionLocalCaught.
257      *
258      * @see org.jboss.netty.channel.SimpleChannelHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext,
259      *      org.jboss.netty.channel.ExceptionEvent)
260      */
261     @Override
262     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
263         if (session == null) {
264             logger.warn("Error without any session active {}", e.getCause());
265             return;
266         }
267         Throwable e1 = e.getCause();
268         if (e1 instanceof ConnectException) {
269             ConnectException e2 = (ConnectException) e1;
270             logger.warn("Connection impossible since {}", e2.getMessage());
271         } else if (e1 instanceof ChannelException) {
272             ChannelException e2 = (ChannelException) e1;
273             logger.warn("Connection (example: timeout) impossible since {}", e2
274                     .getMessage());
275         } else if (e1 instanceof ClosedChannelException) {
276             logger.debug("Connection closed before end");
277         } else if (e1 instanceof InvalidArgumentException) {
278             InvalidArgumentException e2 = (InvalidArgumentException) e1;
279             logger.warn("Bad configuration in Codec in {}", e2.getMessage());
280         } else if (e1 instanceof NullPointerException) {
281             NullPointerException e2 = (NullPointerException) e1;
282             logger.warn("Null pointer Exception", e2);
283             try {
284                 if (dataBusinessHandler != null) {
285                     dataBusinessHandler.exceptionLocalCaught(e);
286                     if (session.getDataConn() != null) {
287                         session.getDataConn().getFtpTransferControl()
288                                 .setTransferAbortedFromInternal(true);
289                     }
290                 }
291             } catch (NullPointerException e3) {
292             }
293             return;
294         } else if (e1 instanceof CancelledKeyException) {
295             CancelledKeyException e2 = (CancelledKeyException) e1;
296             logger.warn("Connection aborted since {}", e2.getMessage());
297             // XXX TODO FIXME is it really what we should do ?
298             // No action
299             return;
300         } else if (e1 instanceof IOException) {
301             IOException e2 = (IOException) e1;
302             logger.warn("Connection aborted since {}", e2.getMessage());
303         } else if (e1 instanceof NotYetConnectedException) {
304             NotYetConnectedException e2 = (NotYetConnectedException) e1;
305             logger.debug("Ignore this exception {}", e2.getMessage());
306             return;
307         } else if (e1 instanceof BindException) {
308             BindException e2 = (BindException) e1;
309             logger.warn("Address already in use {}", e2.getMessage());
310         } else if (e1 instanceof ConnectException) {
311             ConnectException e2 = (ConnectException) e1;
312             logger.warn("Timeout occurs {}", e2.getMessage());
313         } else {
314             logger.warn("Unexpected exception from downstream: {}", e1.getMessage());
315         }
316         if (dataBusinessHandler != null) {
317             dataBusinessHandler.exceptionLocalCaught(e);
318         }
319         session.getDataConn().getFtpTransferControl()
320                 .setTransferAbortedFromInternal(true);
321     }
322 
323     /**
324      * To enable continues of Retrieve operation (prevent OOM)
325      *
326      * @see org.jboss.netty.channel.SimpleChannelHandler#channelInterestChanged(org.jboss.netty.channel.ChannelHandlerContext,
327      *      org.jboss.netty.channel.ChannelStateEvent)
328      */
329     @Override
330     public void channelInterestChanged(ChannelHandlerContext arg0,
331             ChannelStateEvent arg1) {
332         int op = arg1.getChannel().getInterestOps();
333         if (op == Channel.OP_NONE || op == Channel.OP_READ) {
334             if (isReady) {
335                 session.getDataConn().getFtpTransferControl().runTrueRetrieve();
336             }
337         }
338     }
339 
340     /**
341      * Act as needed according to the receive DataBlock message
342      *
343      * @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
344      *      org.jboss.netty.channel.MessageEvent)
345      */
346     @Override
347     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
348         if (isStillAlive()) {
349             DataBlock dataBlock = (DataBlock) e.getMessage();
350             try {
351                 session.getDataConn().getFtpTransferControl()
352                         .getExecutingFtpTransfer().getFtpFile().writeDataBlock(
353                                 dataBlock);
354                 /*try {
355                     session.getDataConn().getFtpTransferControl()
356                         .getExecutingFtpTransfer().getFtpFile().flush();
357                 } catch (IOException e1) {
358                     session.getDataConn().getFtpTransferControl()
359                         .setTransferAbortedFromInternal(true);
360                     return;
361                 }*/
362             } catch (FtpNoFileException e1) {
363                 session.getDataConn().getFtpTransferControl()
364                         .setTransferAbortedFromInternal(true);
365                 return;
366             } catch (FtpNoTransferException e1) {
367                 session.getDataConn().getFtpTransferControl()
368                         .setTransferAbortedFromInternal(true);
369                 return;
370             } catch (FileTransferException e1) {
371                 session.getDataConn().getFtpTransferControl()
372                         .setTransferAbortedFromInternal(true);
373             }
374         } else {
375             // Shutdown
376             session.getDataConn().getFtpTransferControl()
377                     .setTransferAbortedFromInternal(true);
378         }
379     }
380 
381     /**
382      * Write a simple message (like LIST) and wait for it
383      *
384      * @param message
385      * @return True if the message is correctly written
386      */
387     public boolean writeMessage(String message) {
388         DataBlock dataBlock = new DataBlock();
389         dataBlock.setEOF(true);
390         ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message.getBytes());
391         dataBlock.setBlock(buffer);
392         ChannelFuture future;
393         try {
394             future = Channels.write(dataChannel, dataBlock).await();
395         } catch (InterruptedException e) {
396             return false;
397         }
398         return future.isSuccess();
399     }
400 
401     /**
402      * If the service is going to shutdown, it sends back a 421 message to the
403      * connection
404      *
405      * @return True if the service is alive, else False if the system is going
406      *         down
407      */
408     private boolean isStillAlive() {
409         if (session.getConfiguration().isShutdown) {
410             session.setExitErrorCode("Service is going down: disconnect");
411             return false;
412         }
413         return true;
414     }
415 }