View Javadoc
1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.ftp.core.data.handler;
22  
23  import goldengate.common.exception.InvalidArgumentException;
24  import goldengate.common.file.DataBlock;
25  import goldengate.common.future.GgFuture;
26  import goldengate.ftp.core.command.FtpArgumentCode.TransferMode;
27  import goldengate.ftp.core.command.FtpArgumentCode.TransferStructure;
28  import goldengate.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
29  
30  import org.jboss.netty.buffer.ChannelBuffer;
31  import org.jboss.netty.buffer.ChannelBuffers;
32  import org.jboss.netty.channel.Channel;
33  import org.jboss.netty.channel.ChannelDownstreamHandler;
34  import org.jboss.netty.channel.ChannelEvent;
35  import org.jboss.netty.channel.ChannelHandlerContext;
36  import org.jboss.netty.channel.Channels;
37  import org.jboss.netty.channel.MessageEvent;
38  import org.jboss.netty.handler.codec.frame.FrameDecoder;
39  
40  /**
41   * First CODEC :<br>
42   * - encode : takes a {@link DataBlock} and transforms it to a ChannelBuffer<br>
43   * - decode : takes a ChannelBuffer and transforms it to a {@link DataBlock}<br>
44   * STREAM and BLOCK mode are implemented. COMPRESSED mode is not implemented.
45   *
46   * @author Frederic Bregier
47   *
48   */
49  public class FtpDataModeCodec extends FrameDecoder implements
50          ChannelDownstreamHandler {
51      /*
52       *
53       * 3.4.1. STREAM MODE
54       *
55       * The data is transmitted as a stream of bytes. There is no restriction on
56       * the representation type used; record structures are allowed.
57       *
58       * In a record structured file EOR and EOF will each be indicated by a
59       * two-byte control code. The first byte of the control code will be all
60       * ones, the escape character. The second byte will have the low order bit
61       * on and zeros elsewhere for EOR and the second low order bit on for EOF;
62       * that is, the byte will have value 1 for EOR and value 2 for EOF. EOR and
63       * EOF may be indicated together on the last byte transmitted by turning
64       * both low order bits on (i.e., the value 3). If a byte of all ones was
65       * intended to be sent as data, it should be repeated in the second byte of
66       * the control code.
67       *
68       * If the structure is a file structure, the EOF is indicated by the sending
69       * host closing the data connection and all bytes are data bytes.
70       *
71       * 3.4.2. BLOCK MODE
72       *
73       * The file is transmitted as a series of data blocks preceded by one or
74       * more header bytes. The header bytes contain a count field, and descriptor
75       * code. The count field indicates the total length of the data block in
76       * bytes, thus marking the beginning of the next data block (there are no
77       * filler bits). The descriptor code defines: last block in the file (EOF)
78       * last block in the record (EOR), restart marker (see the Section on Error
79       * Recovery and Restart) or suspect data (i.e., the data being transferred
80       * is suspected of errors and is not reliable). This last code is NOT
81       * intended for error control within FTP. It is motivated by the desire of
82       * sites exchanging certain types of data (e.g., seismic or weather data) to
83       * send and receive all the data despite local errors (such as "magnetic
84       * tape read errors"), but to indicate in the transmission that certain
85       * portions are suspect). Record structures are allowed in this mode, and
86       * any representation type may be used.
87       *
88       * The header consists of the three bytes. Of the 24 bits of header
89       * information, the 16 low order bits shall represent byte count, and the 8
90       * high order bits shall represent descriptor codes as shown below.
91       *
92       *
93       * Block Header
94       *
95       * +----------------+----------------+----------------+ | Descriptor | Byte
96       * Count | | 8 bits | 16 bits |
97       * +----------------+----------------+----------------+
98       *
99       *
100      * The descriptor codes are indicated by bit flags in the descriptor byte.
101      * Four codes have been assigned, where each code number is the decimal
102      * value of the corresponding bit in the byte.
103      *
104      * Code Meaning
105      *
106      * 128 End of data block is EOR 64 End of data block is EOF 32 Suspected
107      * errors in data block 16 Data block is a restart marker
108      *
109      * With this encoding, more than one descriptor coded condition may exist
110      * for a particular block. As many bits as necessary may be flagged.
111      *
112      * The restart marker is embedded in the data stream as an integral number
113      * of 8-bit bytes representing printable characters in the language being
114      * used over the control connection (e.g., default--NVT-ASCII). <SP> (Space,
115      * in the appropriate language) must not be used WITHIN a restart marker.
116      *
117      * For example, to transmit a six-character marker, the following would be
118      * sent:
119      *
120      * +--------+--------+--------+ |Descrptr| Byte count | |code= 16| = 6 |
121      * +--------+--------+--------+
122      *
123      * +--------+--------+--------+ | Marker | Marker | Marker | | 8 bits | 8
124      * bits | 8 bits | +--------+--------+--------+
125      *
126      * +--------+--------+--------+ | Marker | Marker | Marker | | 8 bits | 8
127      * bits | 8 bits | +--------+--------+--------+
128      */
129     /**
130      * Transfer Mode
131      */
132     private TransferMode mode = null;
133 
134     /**
135      * Structure Mode
136      */
137     private TransferStructure structure = null;
138 
139     /**
140      * Ftp Data Block
141      */
142     private DataBlock dataBlock = null;
143 
144     /**
145      * Last byte for STREAM+RECORD
146      */
147     private int lastbyte = 0;
148 
149     /**
150      * Is the underlying DataNetworkHandler ready to receive block
151      */
152     private volatile boolean isReady = false;
153 
154     /**
155      * Blocking step between DataNetworkHandler and this Codec in order to wait
156      * that the DataNetworkHandler is ready
157      */
158     private final GgFuture codecLocked = new GgFuture();
159 
160     /**
161      * @param mode
162      * @param structure
163      */
164     public FtpDataModeCodec(TransferMode mode, TransferStructure structure) {
165         super();
166         this.mode = mode;
167         this.structure = structure;
168     }
169 
170     /**
171      * Inform the Codec that DataNetworkHandler is ready (called from
172      * DataNetworkHandler after setCorrectCodec).
173      *
174      */
175     public void setCodecReady() {
176         codecLocked.setSuccess();
177     }
178 
179     protected Object decodeRecordStandard(ChannelBuffer buf, int length) {
180         ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(length);
181         if (lastbyte == 0xFF) {
182             int nextbyte = buf.readByte();
183             if (nextbyte == 0xFF) {
184                 newbuf.writeByte((byte) (lastbyte & 0xFF));
185             } else {
186                 if (nextbyte == 1) {
187                     dataBlock.setEOR(true);
188                 } else if (nextbyte == 2) {
189                     dataBlock.setEOF(true);
190                 } else if (nextbyte == 3) {
191                     dataBlock.setEOR(true);
192                     dataBlock.setEOF(true);
193                 }
194                 lastbyte = 0;
195             }
196         }
197         try {
198             while (true) {
199                 lastbyte = buf.readByte();
200                 if (lastbyte == 0xFF) {
201                     int nextbyte = buf.readByte();
202                     if (nextbyte == 0xFF) {
203                         newbuf.writeByte((byte) (lastbyte & 0xFF));
204                     } else {
205                         if (nextbyte == 1) {
206                             dataBlock.setEOR(true);
207                         } else if (nextbyte == 2) {
208                             dataBlock.setEOF(true);
209                         } else if (nextbyte == 3) {
210                             dataBlock.setEOR(true);
211                             dataBlock.setEOF(true);
212                         }
213                     }
214                 } else {
215                     newbuf.writeByte((byte) (lastbyte & 0xFF));
216                 }
217                 lastbyte = 0;
218             }
219         } catch (IndexOutOfBoundsException e) {
220             // End of read
221         }
222         dataBlock.setBlock(newbuf);
223         return dataBlock;
224     }
225 
226     protected Object decodeRecord(ChannelBuffer buf, int length) {
227         FtpSeekAheadData sad = null;
228         try {
229             sad = new FtpSeekAheadData(buf);
230         } catch (SeekAheadNoBackArrayException e1) {
231             return decodeRecordStandard(buf, length);
232         }
233         ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(length);
234         if (lastbyte == 0xFF) {
235             int nextbyte = sad.bytes[sad.pos++];
236             if (nextbyte == 0xFF) {
237                 newbuf.writeByte((byte) (lastbyte & 0xFF));
238             } else {
239                 if (nextbyte == 1) {
240                     dataBlock.setEOR(true);
241                 } else if (nextbyte == 2) {
242                     dataBlock.setEOF(true);
243                 } else if (nextbyte == 3) {
244                     dataBlock.setEOR(true);
245                     dataBlock.setEOF(true);
246                 }
247                 lastbyte = 0;
248             }
249         }
250         try {
251             while (sad.pos < sad.limit) {
252                 lastbyte = sad.bytes[sad.pos++];
253                 if (lastbyte == 0xFF) {
254                     int nextbyte = sad.bytes[sad.pos++];
255                     if (nextbyte == 0xFF) {
256                         newbuf.writeByte((byte) (lastbyte & 0xFF));
257                     } else {
258                         if (nextbyte == 1) {
259                             dataBlock.setEOR(true);
260                         } else if (nextbyte == 2) {
261                             dataBlock.setEOF(true);
262                         } else if (nextbyte == 3) {
263                             dataBlock.setEOR(true);
264                             dataBlock.setEOF(true);
265                         }
266                     }
267                 } else {
268                     newbuf.writeByte((byte) (lastbyte & 0xFF));
269                 }
270                 lastbyte = 0;
271             }
272         } catch (IndexOutOfBoundsException e) {
273             // End of read
274         }
275         sad.setReadPosition(0);
276         dataBlock.setBlock(newbuf);
277         return dataBlock;        
278     }
279     
280     /*
281      * (non-Javadoc)
282      *
283      * @see
284      * org.jboss.netty.handler.codec.frame.FrameDecoder#decode(org.jboss.netty
285      * .channel.ChannelHandlerContext, org.jboss.netty.channel.Channel,
286      * org.jboss.netty.buffer.ChannelBuffer)
287      */
288     @Override
289     protected Object decode(ChannelHandlerContext ctx, Channel channel,
290             ChannelBuffer buf) throws Exception {
291         // First test if the connection is fully ready (block might be
292         // transfered
293         // by client before connection is ready)
294         if (!isReady) {
295             codecLocked.await();
296             isReady = true;
297         }
298         // If STREAM Mode, no task to do, just next filter
299         if (mode == TransferMode.STREAM) {
300             dataBlock = new DataBlock();
301             int length = buf.readableBytes();
302             // Except if RECORD Structure!
303             if (structure == TransferStructure.RECORD) {
304                 return decodeRecord(buf, length);
305             }
306 
307             dataBlock.setBlock(buf.readBytes(length));
308             return dataBlock;
309         } else if (mode == TransferMode.BLOCK) {
310             // Now we are in BLOCK Mode
311             // Make sure if the length field was received.
312             if (buf.readableBytes() < 3) {
313                 // The length field was not received yet - return null.
314                 // This method will be invoked again when more packets are
315                 // received and appended to the buffer.
316                 return null;
317             }
318 
319             // The length field is in the buffer.
320 
321             // Mark the current buffer position before reading the length field
322             // because the whole frame might not be in the buffer yet.
323             // We will reset the buffer position to the marked position if
324             // there's not enough bytes in the buffer.
325             buf.markReaderIndex();
326 
327             if (dataBlock == null) {
328                 dataBlock = new DataBlock();
329             }
330             // Read the descriptor
331             dataBlock.setDescriptor(buf.readByte());
332 
333             // Read the length field.
334             byte upper = buf.readByte();
335             byte lower = buf.readByte();
336             dataBlock.setByteCount(upper, lower);
337 
338             // Make sure if there's enough bytes in the buffer.
339             if (buf.readableBytes() < dataBlock.getByteCount()) {
340                 // The whole bytes were not received yet - return null.
341                 // This method will be invoked again when more packets are
342                 // received and appended to the buffer.
343 
344                 // Reset to the marked position to read the length field again
345                 // next time.
346                 buf.resetReaderIndex();
347 
348                 return null;
349             }
350             if (dataBlock.getByteCount() > 0) {
351                 // There's enough bytes in the buffer. Read it.
352                 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
353             }
354             DataBlock returnDataBlock = dataBlock;
355             // Free the datablock for next frame
356             dataBlock = null;
357             // Successfully decoded a frame. Return the decoded frame.
358             return returnDataBlock;
359         }
360         // Type unimplemented
361         throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
362     }
363 
364     protected ChannelBuffer encodeRecordStandard(DataBlock msg, ChannelBuffer buffer) {
365         ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(msg
366                 .getByteCount());
367         int newbyte = 0;
368         try {
369             while (true) {
370                 newbyte = buffer.readByte();
371                 if (newbyte == 0xFF) {
372                     newbuf.writeByte((byte) (newbyte & 0xFF));
373                 }
374                 newbuf.writeByte((byte) (newbyte & 0xFF));
375             }
376         } catch (IndexOutOfBoundsException e) {
377             // end of read
378         }
379         int value = 0;
380         if (msg.isEOF()) {
381             value += 2;
382         }
383         if (msg.isEOR()) {
384             value += 1;
385         }
386         if (value > 0) {
387             newbuf.writeByte((byte) 0xFF);
388             newbuf.writeByte((byte) (value & 0xFF));
389         }
390         msg.clear();
391         return newbuf;
392     }
393 
394     protected ChannelBuffer encodeRecord(DataBlock msg, ChannelBuffer buffer) {
395         FtpSeekAheadData sad = null;
396         try {
397             sad = new FtpSeekAheadData(buffer);
398         } catch (SeekAheadNoBackArrayException e1) {
399             return encodeRecordStandard(msg, buffer);
400         }
401         ChannelBuffer newbuf = ChannelBuffers.dynamicBuffer(msg
402                 .getByteCount());
403         int newbyte = 0;
404         try {
405             while (sad.pos < sad.limit) {
406                 newbyte = sad.bytes[sad.pos++];
407                 if (newbyte == 0xFF) {
408                     newbuf.writeByte((byte) (newbyte & 0xFF));
409                 }
410                 newbuf.writeByte((byte) (newbyte & 0xFF));
411             }
412         } catch (IndexOutOfBoundsException e) {
413             // end of read
414         }
415         int value = 0;
416         if (msg.isEOF()) {
417             value += 2;
418         }
419         if (msg.isEOR()) {
420             value += 1;
421         }
422         if (value > 0) {
423             newbuf.writeByte((byte) 0xFF);
424             newbuf.writeByte((byte) (value & 0xFF));
425         }
426         msg.clear();
427         sad.setReadPosition(0);
428         return newbuf;        
429     }
430     /**
431      * Encode a DataBlock in the correct format for Mode
432      *
433      * @param msg
434      * @return the ChannelBuffer or null when the last block is already done
435      * @throws InvalidArgumentException
436      */
437     protected ChannelBuffer encode(DataBlock msg)
438             throws InvalidArgumentException {
439         if (msg.isCleared()) {
440             return null;
441         }
442         ChannelBuffer buffer = msg.getBlock();
443         if (mode == TransferMode.STREAM) {
444             // If record structure, special attention
445             if (structure == TransferStructure.RECORD) {
446                 return encodeRecord(msg, buffer);
447             }
448             msg.clear();
449             return buffer;
450         } else if (mode == TransferMode.BLOCK) {
451             int length = msg.getByteCount();
452             ChannelBuffer newbuf = ChannelBuffers
453                     .dynamicBuffer(length > 0xFFFF? 0xFFFF + 3 : length + 3);
454             byte[] header = new byte[3];
455             // Is there any data left
456             if (length == 0) {
457                 // It could be an empty block for EOR or EOF
458                 if (msg.isEOF() || msg.isEOR()) {
459                     header[0] = msg.getDescriptor();
460                     header[1] = 0;
461                     header[2] = 0;
462                     newbuf.writeBytes(header);
463                     // Next call will be the last one
464                     msg.clear();
465                     // return the last block
466                     return newbuf;
467                 }
468                 // This was the very last call
469                 msg.clear();
470                 // return the end of encode
471                 return null;
472             }
473             // Is this a Restart so only Markers
474             if (msg.isRESTART()) {
475                 header[0] = msg.getDescriptor();
476                 header[1] = msg.getByteCountUpper();
477                 header[2] = msg.getByteCountLower();
478                 newbuf.writeBytes(header);
479                 newbuf.writeBytes(msg.getByteMarkers());
480                 // Next call will be the last one
481                 msg.clear();
482                 // return the last block
483                 return newbuf;
484             }
485             // Work on sub block, ignoring descriptor since it is not the last
486             // one
487             if (length > 0xFFFF) {
488                 header[0] = 0;
489                 header[1] = (byte) 0xFF;
490                 header[2] = (byte) 0xFF;
491                 newbuf.writeBytes(header);
492                 // Now take the first 0xFFFF bytes from buffer
493                 newbuf.writeBytes(msg.getBlock(), 0xFFFF);
494                 length -= 0xFFFF;
495                 msg.setByteCount(length);
496                 // return the sub block
497                 return newbuf;
498             }
499             // Last final block, using the descriptor
500             header[0] = msg.getDescriptor();
501             header[1] = msg.getByteCountUpper();
502             header[2] = msg.getByteCountLower();
503             newbuf.writeBytes(header);
504             // real data
505             newbuf.writeBytes(buffer, length);
506             // Next call will be the last one
507             msg.clear();
508             // return the last block
509             return newbuf;
510         }
511         // Mode unimplemented
512         throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
513     }
514 
515     /**
516      * @return the mode
517      */
518     public TransferMode getMode() {
519         return mode;
520     }
521 
522     /**
523      * @param mode
524      *            the mode to set
525      */
526     public void setMode(TransferMode mode) {
527         this.mode = mode;
528     }
529 
530     /**
531      * @return the structure
532      */
533     public TransferStructure getStructure() {
534         return structure;
535     }
536 
537     /**
538      * @param structure
539      *            the structure to set
540      */
541     public void setStructure(TransferStructure structure) {
542         this.structure = structure;
543     }
544 
545     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
546             throws Exception {
547         if (e instanceof MessageEvent) {
548             writeRequested(ctx, (MessageEvent) e);
549         } else {
550             ctx.sendDownstream(e);
551         }
552     }
553 
554     /**
555      * Coder part, taking a DataBlock and converting it to ChannelBuffer
556      *
557      * @param ctx
558      * @param evt
559      * @throws Exception
560      */
561     private void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
562             throws Exception {
563         if (!(evt.getMessage() instanceof DataBlock)) {
564             throw new InvalidArgumentException("Incorrect write object: " +
565                     evt.getMessage().getClass().getName());
566         }
567         // First test if the connection is fully ready (block might be
568         // transfered
569         // by client before connection is ready)
570         if (!isReady) {
571             codecLocked.await();
572             isReady = true;
573         }
574         DataBlock newDataBlock = (DataBlock) evt.getMessage();
575         ChannelBuffer next = encode(newDataBlock);
576         // Could be splitten in several block
577         while (next != null) {
578             Channels.write(ctx, evt.getFuture(), next);
579             next = encode(newDataBlock);
580         }
581     }
582 }