1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.control;
22
23 import goldengate.common.command.ReplyCode;
24 import goldengate.common.command.exception.CommandAbstractException;
25 import goldengate.common.command.exception.Reply503Exception;
26 import goldengate.common.logging.GgInternalLogger;
27 import goldengate.common.logging.GgInternalLoggerFactory;
28 import goldengate.ftp.core.command.AbstractCommand;
29 import goldengate.ftp.core.command.FtpCommandCode;
30 import goldengate.ftp.core.command.internal.ConnectionCommand;
31 import goldengate.ftp.core.command.internal.IncorrectCommand;
32 import goldengate.ftp.core.config.FtpInternalConfiguration;
33 import goldengate.ftp.core.data.FtpTransferControl;
34 import goldengate.ftp.core.session.FtpSession;
35 import goldengate.ftp.core.utils.FtpChannelUtils;
36
37 import java.io.IOException;
38 import java.net.ConnectException;
39 import java.nio.channels.ClosedChannelException;
40
41 import org.jboss.netty.channel.Channel;
42 import org.jboss.netty.channel.ChannelException;
43 import org.jboss.netty.channel.ChannelFuture;
44 import org.jboss.netty.channel.ChannelFutureListener;
45 import org.jboss.netty.channel.ChannelHandlerContext;
46 import org.jboss.netty.channel.ChannelStateEvent;
47 import org.jboss.netty.channel.Channels;
48 import org.jboss.netty.channel.ExceptionEvent;
49 import org.jboss.netty.channel.MessageEvent;
50 import org.jboss.netty.channel.SimpleChannelHandler;
51
52
53
54
55
56
57
58
59 public class NetworkHandler extends SimpleChannelHandler {
60
61
62
63 private static final GgInternalLogger logger = GgInternalLoggerFactory
64 .getLogger(NetworkHandler.class);
65
66
67
68
69 private final BusinessHandler businessHandler;
70
71
72
73
74 private final FtpSession session;
75
76
77
78
79 private Channel controlChannel = null;
80
81
82
83
84
85
86 public NetworkHandler(FtpSession session) {
87 super();
88 this.session = session;
89 businessHandler = session.getBusinessHandler();
90 businessHandler.setNetworkHandler(this);
91 }
92
93
94
95
96 public BusinessHandler getBusinessHandler() {
97 return businessHandler;
98 }
99
100
101
102
103 public FtpSession getFtpSession() {
104 return session;
105 }
106
107
108
109
110
111 public Channel getControlChannel() {
112 return controlChannel;
113 }
114
115
116
117
118
119
120
121 @Override
122 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
123 throws Exception {
124 if (session == null || session.getDataConn() == null ||
125 session.getDataConn().getFtpTransferControl() == null) {
126 super.channelClosed(ctx, e);
127 return;
128 }
129
130
131 int limit = 100;
132 while (session.getDataConn().getFtpTransferControl()
133 .isFtpTransferExecuting()) {
134 Thread.sleep(10);
135 limit --;
136 if (limit <= 0) {
137 logger
138 .warn("Waiting for transfer finished but 1s is not enough");
139 break;
140 }
141 }
142 businessHandler.executeChannelClosed();
143
144 businessHandler.clear();
145 session.clear();
146 super.channelClosed(ctx, e);
147 }
148
149
150
151
152
153
154
155 @Override
156 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
157 Channel channel = e.getChannel();
158 controlChannel = channel;
159 session.setControlConnected();
160 FtpChannelUtils.addCommandChannel(channel, session.getConfiguration());
161 if (isStillAlive()) {
162
163 AbstractCommand command = new ConnectionCommand(getFtpSession());
164 session.setNextCommand(command);
165
166 businessHandler.executeChannelConnected(channel);
167
168 messageRunAnswer();
169 getFtpSession().setReady(true);
170 }
171 }
172
173
174
175
176
177
178
179
180 private boolean isStillAlive() {
181 if (session.getConfiguration().isShutdown) {
182 session.setExitErrorCode("Service is going down: disconnect");
183 writeFinalAnswer();
184 return false;
185 }
186 return true;
187 }
188
189
190
191
192
193
194
195
196 @Override
197 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
198 Throwable e1 = e.getCause();
199 Channel channel = e.getChannel();
200 if (session == null) {
201
202 logger.warn("NO SESSION", e1);
203 return;
204 }
205 if (e1 instanceof ConnectException) {
206 ConnectException e2 = (ConnectException) e1;
207 logger.warn("Connection impossible since {} with Channel {}", e2
208 .getMessage(), e.getChannel());
209 } else if (e1 instanceof ChannelException) {
210 ChannelException e2 = (ChannelException) e1;
211 logger
212 .warn(
213 "Connection (example: timeout) impossible since {} with Channel {}",
214 e2.getMessage(), e.getChannel());
215 } else if (e1 instanceof ClosedChannelException) {
216 logger.debug("Connection closed before end");
217 } else if (e1 instanceof CommandAbstractException) {
218
219 CommandAbstractException e2 = (CommandAbstractException) e1;
220 logger.warn("Command Error Reply {}", e2.getMessage());
221 session.setReplyCode(e2);
222 businessHandler.afterRunCommandKo(e2);
223 if (channel.isConnected()) {
224 writeFinalAnswer();
225 }
226 return;
227 } else if (e1 instanceof NullPointerException) {
228 NullPointerException e2 = (NullPointerException) e1;
229 logger.warn("Null pointer Exception", e2);
230 try {
231 if (session != null) {
232 session.setExitErrorCode("Internal error: disconnect");
233 if (businessHandler != null &&
234 session.getDataConn() != null) {
235 businessHandler.exceptionLocalCaught(e);
236 if (channel.isConnected()) {
237 writeFinalAnswer();
238 }
239 }
240 }
241 } catch (NullPointerException e3) {
242 }
243 return;
244 } else if (e1 instanceof IOException) {
245 IOException e2 = (IOException) e1;
246 logger.warn("Connection aborted since {} with Channel {}", e2
247 .getMessage(), e.getChannel());
248 } else {
249 logger.warn("Unexpected exception from downstream" +
250 " Ref Channel: {}" + e.getChannel().toString(), e1.getMessage());
251 }
252 session.setExitErrorCode("Internal error: disconnect");
253 businessHandler.exceptionLocalCaught(e);
254 if (channel.isConnected()) {
255 writeFinalAnswer();
256 }
257 }
258
259
260
261
262
263
264
265 @Override
266 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
267 if (isStillAlive()) {
268
269 while (!session.isReady()) {
270 try {
271 Thread.sleep(10);
272 } catch (InterruptedException e1) {
273 }
274 }
275 String message = (String) e.getMessage();
276 AbstractCommand command = FtpCommandCode.getFromLine(
277 getFtpSession(), message);
278 logger.debug("RECVMSG: {} CMD: {}", message, command.getCommand());
279
280 if (!FtpCommandCode.isSpecialCommand(command.getCode())) {
281
282
283
284 boolean notFinished = true;
285 FtpTransferControl control = session.getDataConn().getFtpTransferControl();
286 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 100; i ++) {
287 if (control.isFtpTransferExecuting() ||
288 (!session.isCurrentCommandFinished())) {
289 try {
290 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
291 } catch (InterruptedException e1) {
292 break;
293 }
294 } else {
295 notFinished = false;
296 break;
297 }
298 }
299 if (notFinished) {
300 session.setReplyCode(
301 ReplyCode.REPLY_503_BAD_SEQUENCE_OF_COMMANDS,
302 "Previous transfer command is not finished yet");
303 businessHandler.afterRunCommandKo(
304 new Reply503Exception(session.getReplyCode().getMesg()));
305 writeIntermediateAnswer();
306 return;
307 }
308 }
309
310 session.setReplyCode(ReplyCode.REPLY_200_COMMAND_OKAY, null);
311 if (session.getCurrentCommand().isNextCommandValid(command)) {
312 session.setNextCommand(command);
313 messageRunAnswer();
314 } else {
315 command = new IncorrectCommand();
316 command.setArgs(getFtpSession(), message, null,
317 FtpCommandCode.IncorrectSequence);
318 session.setNextCommand(command);
319 messageRunAnswer();
320 }
321 }
322 }
323
324
325
326
327
328
329
330 private boolean writeFinalAnswer() {
331 if (session.getReplyCode() == ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION ||
332 session.getReplyCode() == ReplyCode.REPLY_221_CLOSING_CONTROL_CONNECTION) {
333 session.getDataConn().getFtpTransferControl().clear();
334 writeIntermediateAnswer().addListener(ChannelFutureListener.CLOSE);
335 return true;
336 }
337 writeIntermediateAnswer();
338 session.setCurrentCommandFinished();
339 return false;
340 }
341
342
343
344
345
346
347
348 public ChannelFuture writeIntermediateAnswer() {
349 return Channels.write(controlChannel, session.getAnswer());
350 }
351
352
353
354
355 private void messageRunAnswer() {
356 boolean error = false;
357 try {
358 businessHandler.beforeRunCommand();
359 AbstractCommand command = session.getCurrentCommand();
360 logger.debug("Run {}", command.getCommand());
361 command.exec();
362 businessHandler.afterRunCommandOk();
363 } catch (CommandAbstractException e) {
364 error = true;
365 session.setReplyCode(e);
366 businessHandler.afterRunCommandKo(e);
367 }
368 if (error || session.getCurrentCommand().getCode() != FtpCommandCode.INTERNALSHUTDOWN) {
369 writeFinalAnswer();
370 }
371 }
372 }