1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.data.handler;
22
23 import goldengate.common.exception.FileTransferException;
24 import goldengate.common.exception.InvalidArgumentException;
25 import goldengate.common.file.DataBlock;
26 import goldengate.common.logging.GgInternalLogger;
27 import goldengate.common.logging.GgInternalLoggerFactory;
28 import goldengate.ftp.core.config.FtpConfiguration;
29 import goldengate.ftp.core.config.FtpInternalConfiguration;
30 import goldengate.ftp.core.control.NetworkHandler;
31 import goldengate.ftp.core.data.FtpTransferControl;
32 import goldengate.ftp.core.exception.FtpNoConnectionException;
33 import goldengate.ftp.core.exception.FtpNoFileException;
34 import goldengate.ftp.core.exception.FtpNoTransferException;
35 import goldengate.ftp.core.session.FtpSession;
36 import goldengate.ftp.core.utils.FtpChannelUtils;
37
38 import java.io.IOException;
39 import java.net.BindException;
40 import java.net.ConnectException;
41 import java.nio.channels.CancelledKeyException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.NotYetConnectedException;
44
45 import org.jboss.netty.buffer.ChannelBuffer;
46 import org.jboss.netty.buffer.ChannelBuffers;
47 import org.jboss.netty.channel.Channel;
48 import org.jboss.netty.channel.ChannelException;
49 import org.jboss.netty.channel.ChannelFuture;
50 import org.jboss.netty.channel.ChannelHandlerContext;
51 import org.jboss.netty.channel.ChannelPipeline;
52 import org.jboss.netty.channel.ChannelStateEvent;
53 import org.jboss.netty.channel.Channels;
54 import org.jboss.netty.channel.ExceptionEvent;
55 import org.jboss.netty.channel.MessageEvent;
56 import org.jboss.netty.channel.SimpleChannelHandler;
57
58
59
60
61
62
63
64 public class DataNetworkHandler extends SimpleChannelHandler {
65
66
67
68 private static final GgInternalLogger logger = GgInternalLoggerFactory
69 .getLogger(DataNetworkHandler.class);
70
71
72
73
74 private DataBusinessHandler dataBusinessHandler = null;
75
76
77
78
79 private final FtpConfiguration configuration;
80
81
82
83
84 private final boolean isActive;
85
86
87
88
89 private FtpSession session = null;
90
91
92
93
94 private Channel dataChannel = null;
95
96
97
98
99 private ChannelPipeline channelPipeline = null;
100
101
102
103
104
105 private boolean isReady = false;
106
107
108
109
110
111
112
113
114 public DataNetworkHandler(FtpConfiguration configuration,
115 DataBusinessHandler handler, boolean active) {
116 super();
117 this.configuration = configuration;
118 dataBusinessHandler = handler;
119 dataBusinessHandler.setDataNetworkHandler(this);
120 isActive = active;
121 }
122
123
124
125
126
127 public DataBusinessHandler getDataBusinessHandler()
128 throws FtpNoConnectionException {
129 if (dataBusinessHandler == null) {
130 throw new FtpNoConnectionException("No Data Connection active");
131 }
132 return dataBusinessHandler;
133 }
134
135
136
137
138 public FtpSession getFtpSession() {
139 return session;
140 }
141
142
143
144
145
146 public NetworkHandler getNetworkHandler() {
147 return session.getBusinessHandler().getNetworkHandler();
148 }
149
150
151
152
153
154
155
156
157 @Override
158 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
159 throws Exception {
160 if (session != null) {
161 session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
162 session.getDataConn().unbindPassive();
163 try {
164 getDataBusinessHandler().executeChannelClosed();
165
166 getDataBusinessHandler().clear();
167 } catch (FtpNoConnectionException e1) {
168 }
169 session.getDataConn().getFtpTransferControl()
170 .setClosedDataChannel();
171 dataBusinessHandler = null;
172 channelPipeline = null;
173 dataChannel = null;
174 }
175 super.channelClosed(ctx, e);
176 }
177
178
179
180
181
182
183
184 @Override
185 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
186 Channel channel = e.getChannel();
187
188 for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i ++) {
189 session = configuration.getFtpSession(channel, isActive);
190 if (session == null) {
191 logger.warn("Session not found at try " + i);
192 try {
193 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
194 } catch (InterruptedException e1) {
195 break;
196 }
197 } else {
198 break;
199 }
200 }
201 if (session == null) {
202
203 logger.error("Session not found!");
204 Channels.close(channel);
205
206
207 return;
208 }
209 channelPipeline = ctx.getPipeline();
210 dataChannel = channel;
211 dataBusinessHandler.setFtpSession(getFtpSession());
212 FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
213 if (isStillAlive()) {
214 setCorrectCodec();
215 session.getDataConn().getFtpTransferControl().setOpenedDataChannel(
216 channel, this);
217 } else {
218
219 session.getDataConn().getFtpTransferControl().setOpenedDataChannel(
220 null, this);
221 return;
222 }
223 isReady = true;
224 }
225
226
227
228
229
230 public void setCorrectCodec() {
231 FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
232 .get(FtpDataPipelineFactory.CODEC_MODE);
233 FtpDataTypeCodec typeCodec = (FtpDataTypeCodec) channelPipeline
234 .get(FtpDataPipelineFactory.CODEC_TYPE);
235 FtpDataStructureCodec structureCodec = (FtpDataStructureCodec) channelPipeline
236 .get(FtpDataPipelineFactory.CODEC_STRUCTURE);
237 modeCodec.setMode(session.getDataConn().getMode());
238 modeCodec.setStructure(session.getDataConn().getStructure());
239 typeCodec.setFullType(session.getDataConn().getType(), session
240 .getDataConn().getSubType());
241 structureCodec.setStructure(session.getDataConn().getStructure());
242 }
243
244
245
246
247
248 public void unlockModeCodec() {
249 FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
250 .get("MODE");
251 modeCodec.setCodecReady();
252 }
253
254
255
256
257
258
259
260
261 @Override
262 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
263 if (session == null) {
264 logger.warn("Error without any session active {}", e.getCause());
265 return;
266 }
267 Throwable e1 = e.getCause();
268 if (e1 instanceof ConnectException) {
269 ConnectException e2 = (ConnectException) e1;
270 logger.warn("Connection impossible since {}", e2.getMessage());
271 } else if (e1 instanceof ChannelException) {
272 ChannelException e2 = (ChannelException) e1;
273 logger.warn("Connection (example: timeout) impossible since {}", e2
274 .getMessage());
275 } else if (e1 instanceof ClosedChannelException) {
276 logger.debug("Connection closed before end");
277 } else if (e1 instanceof InvalidArgumentException) {
278 InvalidArgumentException e2 = (InvalidArgumentException) e1;
279 logger.warn("Bad configuration in Codec in {}", e2.getMessage());
280 } else if (e1 instanceof NullPointerException) {
281 NullPointerException e2 = (NullPointerException) e1;
282 logger.warn("Null pointer Exception", e2);
283 try {
284 if (dataBusinessHandler != null) {
285 dataBusinessHandler.exceptionLocalCaught(e);
286 if (session.getDataConn() != null) {
287 session.getDataConn().getFtpTransferControl()
288 .setTransferAbortedFromInternal(true);
289 }
290 }
291 } catch (NullPointerException e3) {
292 }
293 return;
294 } else if (e1 instanceof CancelledKeyException) {
295 CancelledKeyException e2 = (CancelledKeyException) e1;
296 logger.warn("Connection aborted since {}", e2.getMessage());
297
298
299 return;
300 } else if (e1 instanceof IOException) {
301 IOException e2 = (IOException) e1;
302 logger.warn("Connection aborted since {}", e2.getMessage());
303 } else if (e1 instanceof NotYetConnectedException) {
304 NotYetConnectedException e2 = (NotYetConnectedException) e1;
305 logger.debug("Ignore this exception {}", e2.getMessage());
306 return;
307 } else if (e1 instanceof BindException) {
308 BindException e2 = (BindException) e1;
309 logger.warn("Address already in use {}", e2.getMessage());
310 } else if (e1 instanceof ConnectException) {
311 ConnectException e2 = (ConnectException) e1;
312 logger.warn("Timeout occurs {}", e2.getMessage());
313 } else {
314 logger.warn("Unexpected exception from downstream: {}", e1.getMessage());
315 }
316 if (dataBusinessHandler != null) {
317 dataBusinessHandler.exceptionLocalCaught(e);
318 }
319 session.getDataConn().getFtpTransferControl()
320 .setTransferAbortedFromInternal(true);
321 }
322
323
324
325
326
327
328
329 @Override
330 public void channelInterestChanged(ChannelHandlerContext arg0,
331 ChannelStateEvent arg1) {
332 int op = arg1.getChannel().getInterestOps();
333 if (op == Channel.OP_NONE || op == Channel.OP_READ) {
334 if (isReady) {
335 session.getDataConn().getFtpTransferControl().runTrueRetrieve();
336 }
337 }
338 }
339
340
341
342
343
344
345
346 @Override
347 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
348 if (isStillAlive()) {
349 DataBlock dataBlock = (DataBlock) e.getMessage();
350 try {
351 session.getDataConn().getFtpTransferControl()
352 .getExecutingFtpTransfer().getFtpFile().writeDataBlock(
353 dataBlock);
354
355
356
357
358
359
360
361
362 } catch (FtpNoFileException e1) {
363 session.getDataConn().getFtpTransferControl()
364 .setTransferAbortedFromInternal(true);
365 return;
366 } catch (FtpNoTransferException e1) {
367 session.getDataConn().getFtpTransferControl()
368 .setTransferAbortedFromInternal(true);
369 return;
370 } catch (FileTransferException e1) {
371 session.getDataConn().getFtpTransferControl()
372 .setTransferAbortedFromInternal(true);
373 }
374 } else {
375
376 session.getDataConn().getFtpTransferControl()
377 .setTransferAbortedFromInternal(true);
378 }
379 }
380
381
382
383
384
385
386
387 public boolean writeMessage(String message) {
388 DataBlock dataBlock = new DataBlock();
389 dataBlock.setEOF(true);
390 ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message.getBytes());
391 dataBlock.setBlock(buffer);
392 ChannelFuture future;
393 try {
394 future = Channels.write(dataChannel, dataBlock).await();
395 } catch (InterruptedException e) {
396 return false;
397 }
398 return future.isSuccess();
399 }
400
401
402
403
404
405
406
407
408 private boolean isStillAlive() {
409 if (session.getConfiguration().isShutdown) {
410 session.setExitErrorCode("Service is going down: disconnect");
411 return false;
412 }
413 return true;
414 }
415 }