1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.data.handler;
22
23 import goldengate.common.exception.InvalidArgumentException;
24 import goldengate.common.file.DataBlock;
25 import goldengate.common.future.GgFuture;
26 import goldengate.ftp.core.command.FtpArgumentCode.TransferMode;
27 import goldengate.ftp.core.command.FtpArgumentCode.TransferStructure;
28 import goldengate.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
29
30 import org.jboss.netty.buffer.ChannelBuffer;
31 import org.jboss.netty.buffer.ChannelBuffers;
32 import org.jboss.netty.channel.Channel;
33 import org.jboss.netty.channel.ChannelDownstreamHandler;
34 import org.jboss.netty.channel.ChannelEvent;
35 import org.jboss.netty.channel.ChannelHandlerContext;
36 import org.jboss.netty.channel.Channels;
37 import org.jboss.netty.channel.MessageEvent;
38 import org.jboss.netty.handler.codec.frame.FrameDecoder;
39
40
41
42
43
44
45
46
47
48
49 public class FtpDataModeCodec extends FrameDecoder implements
50 ChannelDownstreamHandler {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 private TransferMode mode = null;
133
134
135
136
137 private TransferStructure structure = null;
138
139
140
141
142 private DataBlock dataBlock = null;
143
144
145
146
147 private int lastbyte = 0;
148
149
150
151
152 private volatile boolean isReady = false;
153
154
155
156
157
158 private final GgFuture codecLocked = new GgFuture();
159
160
161
162
163
164 public FtpDataModeCodec(TransferMode mode, TransferStructure structure) {
165 super();
166 this.mode = mode;
167 this.structure = structure;
168 }
169
170
171
172
173
174
175 public void setCodecReady() {
176 codecLocked.setSuccess();
177 }
178
179 protected Object decodeRecordStandard(ChannelBuffer buf, int length) {
180 ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(length);
181 if (lastbyte == 0xFF) {
182 int nextbyte = buf.readByte();
183 if (nextbyte == 0xFF) {
184 newbuf.writeByte((byte) (lastbyte & 0xFF));
185 } else {
186 if (nextbyte == 1) {
187 dataBlock.setEOR(true);
188 } else if (nextbyte == 2) {
189 dataBlock.setEOF(true);
190 } else if (nextbyte == 3) {
191 dataBlock.setEOR(true);
192 dataBlock.setEOF(true);
193 }
194 lastbyte = 0;
195 }
196 }
197 try {
198 while (true) {
199 lastbyte = buf.readByte();
200 if (lastbyte == 0xFF) {
201 int nextbyte = buf.readByte();
202 if (nextbyte == 0xFF) {
203 newbuf.writeByte((byte) (lastbyte & 0xFF));
204 } else {
205 if (nextbyte == 1) {
206 dataBlock.setEOR(true);
207 } else if (nextbyte == 2) {
208 dataBlock.setEOF(true);
209 } else if (nextbyte == 3) {
210 dataBlock.setEOR(true);
211 dataBlock.setEOF(true);
212 }
213 }
214 } else {
215 newbuf.writeByte((byte) (lastbyte & 0xFF));
216 }
217 lastbyte = 0;
218 }
219 } catch (IndexOutOfBoundsException e) {
220
221 }
222 dataBlock.setBlock(newbuf);
223 return dataBlock;
224 }
225
226 protected Object decodeRecord(ChannelBuffer buf, int length) {
227 FtpSeekAheadData sad = null;
228 try {
229 sad = new FtpSeekAheadData(buf);
230 } catch (SeekAheadNoBackArrayException e1) {
231 return decodeRecordStandard(buf, length);
232 }
233 ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(length);
234 if (lastbyte == 0xFF) {
235 int nextbyte = sad.bytes[sad.pos++];
236 if (nextbyte == 0xFF) {
237 newbuf.writeByte((byte) (lastbyte & 0xFF));
238 } else {
239 if (nextbyte == 1) {
240 dataBlock.setEOR(true);
241 } else if (nextbyte == 2) {
242 dataBlock.setEOF(true);
243 } else if (nextbyte == 3) {
244 dataBlock.setEOR(true);
245 dataBlock.setEOF(true);
246 }
247 lastbyte = 0;
248 }
249 }
250 try {
251 while (sad.pos < sad.limit) {
252 lastbyte = sad.bytes[sad.pos++];
253 if (lastbyte == 0xFF) {
254 int nextbyte = sad.bytes[sad.pos++];
255 if (nextbyte == 0xFF) {
256 newbuf.writeByte((byte) (lastbyte & 0xFF));
257 } else {
258 if (nextbyte == 1) {
259 dataBlock.setEOR(true);
260 } else if (nextbyte == 2) {
261 dataBlock.setEOF(true);
262 } else if (nextbyte == 3) {
263 dataBlock.setEOR(true);
264 dataBlock.setEOF(true);
265 }
266 }
267 } else {
268 newbuf.writeByte((byte) (lastbyte & 0xFF));
269 }
270 lastbyte = 0;
271 }
272 } catch (IndexOutOfBoundsException e) {
273
274 }
275 sad.setReadPosition(0);
276 dataBlock.setBlock(newbuf);
277 return dataBlock;
278 }
279
280
281
282
283
284
285
286
287
288 @Override
289 protected Object decode(ChannelHandlerContext ctx, Channel channel,
290 ChannelBuffer buf) throws Exception {
291
292
293
294 if (!isReady) {
295 codecLocked.await();
296 isReady = true;
297 }
298
299 if (mode == TransferMode.STREAM) {
300 dataBlock = new DataBlock();
301 int length = buf.readableBytes();
302
303 if (structure == TransferStructure.RECORD) {
304 return decodeRecord(buf, length);
305 }
306
307 dataBlock.setBlock(buf.readBytes(length));
308 return dataBlock;
309 } else if (mode == TransferMode.BLOCK) {
310
311
312 if (buf.readableBytes() < 3) {
313
314
315
316 return null;
317 }
318
319
320
321
322
323
324
325 buf.markReaderIndex();
326
327 if (dataBlock == null) {
328 dataBlock = new DataBlock();
329 }
330
331 dataBlock.setDescriptor(buf.readByte());
332
333
334 byte upper = buf.readByte();
335 byte lower = buf.readByte();
336 dataBlock.setByteCount(upper, lower);
337
338
339 if (buf.readableBytes() < dataBlock.getByteCount()) {
340
341
342
343
344
345
346 buf.resetReaderIndex();
347
348 return null;
349 }
350 if (dataBlock.getByteCount() > 0) {
351
352 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
353 }
354 DataBlock returnDataBlock = dataBlock;
355
356 dataBlock = null;
357
358 return returnDataBlock;
359 }
360
361 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
362 }
363
364 protected ChannelBuffer encodeRecordStandard(DataBlock msg, ChannelBuffer buffer) {
365 ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(msg
366 .getByteCount());
367 int newbyte = 0;
368 try {
369 while (true) {
370 newbyte = buffer.readByte();
371 if (newbyte == 0xFF) {
372 newbuf.writeByte((byte) (newbyte & 0xFF));
373 }
374 newbuf.writeByte((byte) (newbyte & 0xFF));
375 }
376 } catch (IndexOutOfBoundsException e) {
377
378 }
379 int value = 0;
380 if (msg.isEOF()) {
381 value += 2;
382 }
383 if (msg.isEOR()) {
384 value += 1;
385 }
386 if (value > 0) {
387 newbuf.writeByte((byte) 0xFF);
388 newbuf.writeByte((byte) (value & 0xFF));
389 }
390 msg.clear();
391 return newbuf;
392 }
393
394 protected ChannelBuffer encodeRecord(DataBlock msg, ChannelBuffer buffer) {
395 FtpSeekAheadData sad = null;
396 try {
397 sad = new FtpSeekAheadData(buffer);
398 } catch (SeekAheadNoBackArrayException e1) {
399 return encodeRecordStandard(msg, buffer);
400 }
401 ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(msg
402 .getByteCount());
403 int newbyte = 0;
404 try {
405 while (sad.pos < sad.limit) {
406 newbyte = sad.bytes[sad.pos++];
407 if (newbyte == 0xFF) {
408 newbuf.writeByte((byte) (newbyte & 0xFF));
409 }
410 newbuf.writeByte((byte) (newbyte & 0xFF));
411 }
412 } catch (IndexOutOfBoundsException e) {
413
414 }
415 int value = 0;
416 if (msg.isEOF()) {
417 value += 2;
418 }
419 if (msg.isEOR()) {
420 value += 1;
421 }
422 if (value > 0) {
423 newbuf.writeByte((byte) 0xFF);
424 newbuf.writeByte((byte) (value & 0xFF));
425 }
426 msg.clear();
427 sad.setReadPosition(0);
428 return newbuf;
429 }
430
431
432
433
434
435
436
437 protected ChannelBuffer encode(DataBlock msg)
438 throws InvalidArgumentException {
439 if (msg.isCleared()) {
440 return null;
441 }
442 ChannelBuffer buffer = msg.getBlock();
443 if (mode == TransferMode.STREAM) {
444
445 if (structure == TransferStructure.RECORD) {
446 return encodeRecord(msg, buffer);
447 }
448 msg.clear();
449 return buffer;
450 } else if (mode == TransferMode.BLOCK) {
451 int length = msg.getByteCount();
452 ChannelBuffer newbuf = ChannelBuffers
453 .dynamicBuffer(length > 0xFFFF? 0xFFFF + 3 : length + 3);
454 byte[] header = new byte[3];
455
456 if (length == 0) {
457
458 if (msg.isEOF() || msg.isEOR()) {
459 header[0] = msg.getDescriptor();
460 header[1] = 0;
461 header[2] = 0;
462 newbuf.writeBytes(header);
463
464 msg.clear();
465
466 return newbuf;
467 }
468
469 msg.clear();
470
471 return null;
472 }
473
474 if (msg.isRESTART()) {
475 header[0] = msg.getDescriptor();
476 header[1] = msg.getByteCountUpper();
477 header[2] = msg.getByteCountLower();
478 newbuf.writeBytes(header);
479 newbuf.writeBytes(msg.getByteMarkers());
480
481 msg.clear();
482
483 return newbuf;
484 }
485
486
487 if (length > 0xFFFF) {
488 header[0] = 0;
489 header[1] = (byte) 0xFF;
490 header[2] = (byte) 0xFF;
491 newbuf.writeBytes(header);
492
493 newbuf.writeBytes(msg.getBlock(), 0xFFFF);
494 length -= 0xFFFF;
495 msg.setByteCount(length);
496
497 return newbuf;
498 }
499
500 header[0] = msg.getDescriptor();
501 header[1] = msg.getByteCountUpper();
502 header[2] = msg.getByteCountLower();
503 newbuf.writeBytes(header);
504
505 newbuf.writeBytes(buffer, length);
506
507 msg.clear();
508
509 return newbuf;
510 }
511
512 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
513 }
514
515
516
517
518 public TransferMode getMode() {
519 return mode;
520 }
521
522
523
524
525
526 public void setMode(TransferMode mode) {
527 this.mode = mode;
528 }
529
530
531
532
533 public TransferStructure getStructure() {
534 return structure;
535 }
536
537
538
539
540
541 public void setStructure(TransferStructure structure) {
542 this.structure = structure;
543 }
544
545 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
546 throws Exception {
547 if (e instanceof MessageEvent) {
548 writeRequested(ctx, (MessageEvent) e);
549 } else {
550 ctx.sendDownstream(e);
551 }
552 }
553
554
555
556
557
558
559
560
561 private void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
562 throws Exception {
563 if (!(evt.getMessage() instanceof DataBlock)) {
564 throw new InvalidArgumentException("Incorrect write object: " +
565 evt.getMessage().getClass().getName());
566 }
567
568
569
570 if (!isReady) {
571 codecLocked.await();
572 isReady = true;
573 }
574 DataBlock newDataBlock = (DataBlock) evt.getMessage();
575 ChannelBuffer next = encode(newDataBlock);
576
577 while (next != null) {
578 Channels.write(ctx, evt.getFuture(), next);
579 next = encode(newDataBlock);
580 }
581 }
582 }