View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.common.file.filesystembased;
22  
23  import goldengate.common.command.exception.CommandAbstractException;
24  import goldengate.common.command.exception.Reply502Exception;
25  import goldengate.common.command.exception.Reply530Exception;
26  import goldengate.common.command.exception.Reply550Exception;
27  import goldengate.common.exception.FileEndOfTransferException;
28  import goldengate.common.exception.FileTransferException;
29  import goldengate.common.exception.NoRestartException;
30  import goldengate.common.file.DataBlock;
31  import goldengate.common.file.DirInterface;
32  import goldengate.common.file.Restart;
33  import goldengate.common.file.SessionInterface;
34  import goldengate.common.logging.GgInternalLogger;
35  import goldengate.common.logging.GgInternalLoggerFactory;
36  
37  import java.io.File;
38  import java.io.FileInputStream;
39  import java.io.FileNotFoundException;
40  import java.io.FileOutputStream;
41  import java.io.IOException;
42  import java.io.RandomAccessFile;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.ClosedChannelException;
45  import java.nio.channels.FileChannel;
46  
47  import org.jboss.netty.buffer.ChannelBuffer;
48  import org.jboss.netty.buffer.ChannelBuffers;
49  
50  /**
51   * File implementation for Filesystem Based
52   *
53   * @author Frederic Bregier
54   *
55   */
56  public abstract class FilesystemBasedFileImpl implements
57          goldengate.common.file.FileInterface {
58      /**
59       * Internal Logger
60       */
61      private static final GgInternalLogger logger = GgInternalLoggerFactory
62              .getLogger(FilesystemBasedFileImpl.class);
63  
64      /**
65       * SessionInterface
66       */
67      protected final SessionInterface session;
68  
69      /**
70       * DirInterface associated with this file at creation. It is not necessary
71       * the directory that owns this file.
72       */
73      private final FilesystemBasedDirImpl dir;
74  
75      /**
76       * {@link FilesystemBasedAuthImpl}
77       */
78      private final FilesystemBasedAuthImpl auth;
79  
80      /**
81       * Current file if any
82       */
83      protected String currentFile = null;
84  
85      /**
86       * Is this Document ready to be accessed
87       */
88      protected boolean isReady = false;
89  
90      /**
91       * Is this file in append mode
92       */
93      protected boolean isAppend = false;
94  
95      /**
96       * @param session
97       * @param dir
98       *            It is not necessary the directory that owns this file.
99       * @param path
100      * @param append
101      * @throws CommandAbstractException
102      */
103     public FilesystemBasedFileImpl(SessionInterface session,
104             FilesystemBasedDirImpl dir, String path, boolean append)
105             throws CommandAbstractException {
106         this.session = session;
107         auth = (FilesystemBasedAuthImpl) session.getAuth();
108         this.dir = dir;
109         currentFile = path;
110         isAppend = append;
111         File file = getFileFromPath(path);
112         if (append) {
113             try {
114                 setPosition(file.length());
115             } catch (IOException e) {
116                 // not ready
117                 return;
118             }
119         } else {
120             try {
121                 setPosition(0);
122             } catch (IOException e) {
123             }
124         }
125         isReady = true;
126     }
127 
128     /**
129      * Special constructor for possibly external file
130      * @param session
131      * @param dir
132      *            It is not necessary the directory that owns this file.
133      * @param path
134      */
135     public FilesystemBasedFileImpl(SessionInterface session,
136             FilesystemBasedDirImpl dir, String path) {
137         this.session = session;
138         auth = (FilesystemBasedAuthImpl) session.getAuth();
139         this.dir = dir;
140         currentFile = path;
141         isReady = true;
142         isAppend = false;
143         position = 0;
144     }
145 
146     public void clear() throws CommandAbstractException {
147         closeFile();
148         isReady = false;
149         currentFile = null;
150         isAppend = false;
151     }
152 
153     public void checkIdentify() throws Reply530Exception {
154         if (!getSession().getAuth().isIdentified()) {
155             throw new Reply530Exception("User not authentified");
156         }
157     }
158 
159     public SessionInterface getSession() {
160         return session;
161     }
162 
163     public DirInterface getDir() {
164         return dir;
165     }
166 
167     /**
168      * Get the File from this path, checking first its validity
169      *
170      * @param path
171      * @return the FileInterface
172      * @throws CommandAbstractException
173      */
174     protected File getFileFromPath(String path) throws CommandAbstractException {
175         String newdir = getDir().validatePath(path);
176         String truedir = auth.getAbsolutePath(newdir);
177         return new File(truedir);
178     }
179 
180     /**
181      * Get the relative path (without mount point)
182      *
183      * @param file
184      * @return the relative path
185      */
186     protected String getRelativePath(File file) {
187         return auth.getRelativePath(FilesystemBasedDirImpl.normalizePath(file
188                 .getAbsolutePath()));
189     }
190 
191     public boolean isDirectory() throws CommandAbstractException {
192         checkIdentify();
193         File dir1 = getFileFromPath(currentFile);
194         return dir1.isDirectory();
195     }
196 
197     public boolean isFile() throws CommandAbstractException {
198         checkIdentify();
199         return getFileFromPath(currentFile).isFile();
200     }
201 
202     public String getFile() throws CommandAbstractException {
203         checkIdentify();
204         return currentFile;
205     }
206 
207     public boolean closeFile() throws CommandAbstractException {
208         if (bfileChannelIn != null) {
209             try {
210                 bfileChannelIn.close();
211             } catch (IOException e) {
212             }
213             bfileChannelIn = null;
214             bbyteBuffer = null;
215         }
216         if (fileOutputStream != null) {
217             /*try {
218                 rafOut.getFD().sync();
219             } catch (SyncFailedException e1) {
220             } catch (IOException e1) {
221             }*/
222             try {
223                 fileOutputStream.flush();
224                 fileOutputStream.close();
225             } catch (ClosedChannelException e) {
226                 // ignore
227             } catch (IOException e) {
228                 throw new Reply550Exception("Close in error");
229             }
230             fileOutputStream = null;
231         }
232         position = 0;
233         isReady = false;
234         // Do not clear the filename itself
235         return true;
236     }
237 
238     public boolean abortFile() throws CommandAbstractException {
239         if (isInWriting() &&
240                 ((FilesystemBasedFileParameterImpl) getSession()
241                         .getFileParameter()).deleteOnAbort) {
242             delete();
243         }
244         closeFile();
245         return true;
246     }
247 
248     public long length() throws CommandAbstractException {
249         checkIdentify();
250         if (!isReady) {
251             return -1;
252         }
253         if (!exists()) {
254             return -1;
255         }
256         return getFileFromPath(currentFile).length();
257     }
258 
259     public boolean isInReading() throws CommandAbstractException {
260         if (!isReady) {
261             return false;
262         }
263         return bfileChannelIn != null;
264     }
265 
266     public boolean isInWriting() throws CommandAbstractException {
267         if (!isReady) {
268             return false;
269         }
270         return fileOutputStream != null;
271     }
272 
273     public boolean canRead() throws CommandAbstractException {
274         checkIdentify();
275         if (!isReady) {
276             return false;
277         }
278         return getFileFromPath(currentFile).canRead();
279     }
280 
281     public boolean canWrite() throws CommandAbstractException {
282         checkIdentify();
283         if (!isReady) {
284             return false;
285         }
286         File file = getFileFromPath(currentFile);
287         if (file.exists()) {
288             return file.canWrite();
289         }
290         return file.getParentFile().canWrite();
291     }
292 
293     public boolean exists() throws CommandAbstractException {
294         checkIdentify();
295         if (!isReady) {
296             return false;
297         }
298         return getFileFromPath(currentFile).exists();
299     }
300 
301     public boolean delete() throws CommandAbstractException {
302         checkIdentify();
303         if (!isReady) {
304             return false;
305         }
306         if (!exists()) {
307             return true;
308         }
309         closeFile();
310         return getFileFromPath(currentFile).delete();
311     }
312 
313     public boolean renameTo(String path) throws CommandAbstractException {
314         checkIdentify();
315         if (!isReady) {
316             return false;
317         }
318         File file = getFileFromPath(currentFile);
319         if (file.canRead()) {
320             File newFile = getFileFromPath(path);
321             if (newFile.getParentFile().canWrite()) {
322                 if (!file.renameTo(newFile)) {
323                     FileOutputStream fileOutputStream;
324                     try {
325                         fileOutputStream = new FileOutputStream(newFile);
326                     } catch (FileNotFoundException e) {
327                         logger
328                                 .warn("Cannot find file: " + newFile.getName(),
329                                         e);
330                         return false;
331                     }
332                     FileChannel fileChannelOut = fileOutputStream.getChannel();
333                     if (get(fileChannelOut)) {
334                         delete();
335                     } else {
336                         logger.warn("Cannot write file: {}", newFile);
337                         return false;
338                     }
339                 }
340                 currentFile = getRelativePath(newFile);
341                 isReady = true;
342                 return true;
343             }
344         }
345         return false;
346     }
347 
348     public DataBlock getMarker() throws CommandAbstractException {
349         throw new Reply502Exception("No marker implemented");
350     }
351 
352     public boolean restartMarker(Restart restart)
353             throws CommandAbstractException {
354         try {
355             long newposition = ((FilesystemBasedRestartImpl) restart)
356                     .getPosition();
357             try {
358                 setPosition(newposition);
359             } catch (IOException e) {
360                 throw new Reply502Exception("Cannot set the marker position");
361             }
362             return true;
363         } catch (NoRestartException e) {
364         }
365         return false;
366     }
367 
368     public boolean retrieve() throws CommandAbstractException {
369         checkIdentify();
370         if (isReady) {
371             restartMarker(getSession().getRestart());
372             return canRead();
373         }
374         return false;
375     }
376 
377     public boolean store() throws CommandAbstractException {
378         checkIdentify();
379         if (isReady) {
380             restartMarker(getSession().getRestart());
381             return canWrite();
382         }
383         return false;
384     }
385 
386     public DataBlock readDataBlock() throws FileTransferException,
387             FileEndOfTransferException {
388         if (isReady) {
389             DataBlock dataBlock = new DataBlock();
390             ChannelBuffer buffer = null;
391             buffer = getBlock(getSession().getBlockSize());
392             if (buffer != null) {
393                 dataBlock.setBlock(buffer);
394                 if (dataBlock.getByteCount() < getSession().getBlockSize()) {
395                     dataBlock.setEOF(true);
396                 }
397                 return dataBlock;
398             }
399         }
400         throw new FileTransferException("No file is ready");
401     }
402 
403     public void writeDataBlock(DataBlock dataBlock)
404             throws FileTransferException {
405         if (isReady) {
406             if (dataBlock.isEOF()) {
407                 writeBlockEnd(dataBlock.getBlock());
408                 return;
409             }
410             writeBlock(dataBlock.getBlock());
411             return;
412         }
413         throw new FileTransferException("No file is ready");
414     }
415 
416     /**
417      * Valid Position of this file
418      */
419     private long position = 0;
420 
421     /**
422      * FileOutputStream Out
423      */
424     private FileOutputStream fileOutputStream = null;
425     /**
426      * FileChannel In
427      */
428     private FileChannel bfileChannelIn = null;
429 
430     /**
431      * Associated ByteBuffer
432      */
433     private ByteBuffer bbyteBuffer = null;
434 
435     /**
436      * Return the current position in the FileInterface. In write mode, it is
437      * the current file length.
438      *
439      * @return the position
440      */
441     public long getPosition() {
442         return position;
443     }
444 
445     /**
446      * Change the position in the file.
447      *
448      * @param position
449      *            the position to set
450      * @throws IOException
451      */
452     public void setPosition(long position) throws IOException {
453         this.position = position;
454         if (bfileChannelIn != null) {
455             bfileChannelIn = bfileChannelIn.position(position);
456         }
457         /*if (rafOut != null) {
458             rafOut.seek(position);
459         }*/
460         if (fileOutputStream != null) {
461             fileOutputStream.flush();
462             fileOutputStream.close();
463             fileOutputStream = getFileOutputStream(true);
464             if (fileOutputStream == null) {
465                 throw new IOException("File cannot changed of Position");
466             }
467         }
468     }
469     /**
470      * Write the current FileInterface with the given ChannelBuffer. The file is
471      * not limited to 2^32 bytes since this write operation is in add mode.
472      *
473      * In case of error, the current already written blocks are maintained and
474      * the position is not changed.
475      *
476      * @param buffer
477      *            added to the file
478      * @throws FileTransferException
479      */
480     private void writeBlock(ChannelBuffer buffer) throws FileTransferException {
481         if (!isReady) {
482             throw new FileTransferException("No file is ready");
483         }
484         // An empty buffer is allowed
485         if (buffer == null) {
486             return;// could do FileEndOfTransfer ?
487         }
488         if (fileOutputStream == null) {
489             //rafOut = getRandomFile();
490             fileOutputStream = getFileOutputStream(position > 0);
491         }
492         if (fileOutputStream == null) {
493             throw new FileTransferException("Internal error, file is not ready");
494         }
495         int bufferSize = buffer.readableBytes();
496         int start = 0;
497         byte []newbuf;
498         if (buffer.hasArray()) {
499             start = buffer.arrayOffset();
500             newbuf = buffer.array();
501             if (newbuf.length > start+bufferSize) {
502                 byte[] temp = new byte[bufferSize];
503                 System.arraycopy(newbuf, start, temp, 0, bufferSize);
504                 start = 0;
505                 newbuf = temp;
506                 buffer.readerIndex(start+bufferSize);
507             } else {
508                 buffer.readerIndex(start+bufferSize);
509             }
510         } else {
511             newbuf = new byte[bufferSize];
512             buffer.readBytes(newbuf);
513         }
514         try {
515             fileOutputStream.write(newbuf, start, bufferSize);
516         } catch (IOException e2) {
517             logger.error("Error during write:", e2);
518             try {
519                 closeFile();
520             } catch (CommandAbstractException e1) {
521             }
522             // NO this.realFile.delete(); NO DELETE SINCE BY BLOCK IT CAN BE
523             // REDO
524             throw new FileTransferException("Internal error, file is not ready");
525         }
526         position += bufferSize;
527     }
528 
529     /**
530      * End the Write of the current FileInterface with the given ChannelBuffer.
531      * The file is not limited to 2^32 bytes since this write operation is in
532      * add mode.
533      *
534      * @param buffer
535      *            added to the file
536      * @throws FileTransferException
537      */
538     private void writeBlockEnd(ChannelBuffer buffer)
539             throws FileTransferException {
540         writeBlock(buffer);
541         try {
542             closeFile();
543         } catch (CommandAbstractException e) {
544             throw new FileTransferException("Close in error", e);
545         }
546     }
547 
548     /**
549      * Get the current block ChannelBuffer of the current FileInterface. There
550      * is therefore no limitation of the file size to 2^32 bytes.
551      *
552      * The returned block is limited to sizeblock. If the returned block is less
553      * than sizeblock length, it is the last block to read.
554      *
555      * @param sizeblock
556      *            is the limit size for the block array
557      * @return the resulting block ChannelBuffer (even empty)
558      * @throws FileTransferException
559      * @throws FileEndOfTransferException
560      */
561     private ChannelBuffer getBlock(int sizeblock) throws FileTransferException,
562             FileEndOfTransferException {
563         if (!isReady) {
564             throw new FileTransferException("No file is ready");
565         }
566         if (bfileChannelIn == null) {
567             bfileChannelIn = getFileChannel();
568             if (bfileChannelIn != null) {
569                 if (bbyteBuffer != null) {
570                     if (bbyteBuffer.capacity() != sizeblock) {
571                         bbyteBuffer = null;
572                         bbyteBuffer = ByteBuffer.allocateDirect(sizeblock);
573                     }
574                 } else {
575                     bbyteBuffer = ByteBuffer.allocateDirect(sizeblock);
576                 }
577             }
578         }
579         if (bfileChannelIn == null) {
580             throw new FileTransferException("Internal error, file is not ready");
581         }
582         int sizeout = 0;
583         while (sizeout < sizeblock) {
584             try {
585                 int sizeread = bfileChannelIn.read(bbyteBuffer);
586                 if (sizeread <= 0) {
587                     break;
588                 }
589                 sizeout += sizeread;
590             } catch (IOException e) {
591                 logger.error("Error during get:", e);
592                 try {
593                     closeFile();
594                 } catch (CommandAbstractException e1) {
595                 }
596                 throw new FileTransferException("Internal error, file is not ready");
597             }
598         }
599         if (sizeout <= 0) {
600             try {
601                 closeFile();
602             } catch (CommandAbstractException e1) {
603             }
604             isReady = false;
605             throw new FileEndOfTransferException("End of file");
606         }
607         bbyteBuffer.flip();
608         position += sizeout;
609         ChannelBuffer buffer = ChannelBuffers.copiedBuffer(bbyteBuffer);
610         bbyteBuffer.clear();
611         if (sizeout < sizeblock) {// last block
612             try {
613                 closeFile();
614             } catch (CommandAbstractException e1) {
615             }
616             isReady = false;
617         }
618         return buffer;
619     }
620 
621     /**
622      * Write the FileInterface to the fileChannelOut, thus bypassing the
623      * limitation of the file size to 2^32 bytes.
624      *
625      * This call closes the fileChannelOut with fileChannelOut.close() if the
626      * operation is in success.
627      *
628      * @param fileChannelOut
629      * @return True if OK, False in error.
630      */
631     protected boolean get(FileChannel fileChannelOut) {
632         if (!isReady) {
633             return false;
634         }
635         FileChannel fileChannelIn = getFileChannel();
636         if (fileChannelIn == null) {
637             return false;
638         }
639         long size = 0;
640         long transfert = 0;
641         try {
642             size = fileChannelIn.size();
643             int chunkSize = this.session.getBlockSize();
644             while (transfert < size) {
645                 if (chunkSize < size - transfert) {
646                     chunkSize = (int) (size - transfert);
647                 }
648                 transfert += fileChannelOut.transferFrom(fileChannelIn, transfert, chunkSize);
649             }
650             fileChannelOut.force(true);
651             fileChannelIn.close();
652             fileChannelIn = null;
653             fileChannelOut.close();
654         } catch (IOException e) {
655             logger.error("Error during get:", e);
656             if (fileChannelIn != null) {
657                 try {
658                     fileChannelIn.close();
659                 } catch (IOException e1) {
660                 }
661             }
662             return false;
663         }
664         if (transfert == size) {
665             position += size;
666         }
667         return transfert == size;
668     }
669 
670     /**
671      * Returns the FileChannel in In mode associated with the current file.
672      *
673      * @return the FileChannel (IN mode)
674      */
675     protected FileChannel getFileChannel() {
676         if (!isReady) {
677             return null;
678         }
679         File trueFile;
680         try {
681             trueFile = getFileFromPath(currentFile);
682         } catch (CommandAbstractException e1) {
683             return null;
684         }
685         FileChannel fileChannel;
686         try {
687             FileInputStream fileInputStream = new FileInputStream(trueFile);
688             fileChannel = fileInputStream.getChannel();
689             if (position != 0) {
690                 fileChannel = fileChannel.position(position);
691             }
692         } catch (FileNotFoundException e) {
693             logger.error("File not found in getFileChannel:", e);
694             return null;
695         } catch (IOException e) {
696             logger.error("Change position in getFileChannel:", e);
697             return null;
698         }
699         return fileChannel;
700     }
701     /**
702      * Returns the RandomAccessFile in Out mode associated with the current file.
703      *
704      * @return the RandomAccessFile (OUT="rw")
705      */
706     protected RandomAccessFile getRandomFile() {
707         if (!isReady) {
708             return null;
709         }
710         File trueFile;
711         try {
712             trueFile = getFileFromPath(currentFile);
713         } catch (CommandAbstractException e1) {
714             return null;
715         }
716         RandomAccessFile raf = null;
717         try {
718             raf = new RandomAccessFile(trueFile, "rw");
719             raf.seek(position);
720         } catch (FileNotFoundException e) {
721             logger.error("File not found in getRandomFile:", e);
722             return null;
723         } catch (IOException e) {
724             logger.error("Change position in getRandomFile:", e);
725             return null;
726         }
727         return raf;
728     }
729     /**
730      * Returns the FileOutputStream in Out mode associated with the current file.
731      * @param append True if the FileOutputStream should be in append mode
732      * @return the FileOutputStream (OUT)
733      */
734     protected FileOutputStream getFileOutputStream(boolean append) {
735         if (!isReady) {
736             return null;
737         }
738         File trueFile;
739         try {
740             trueFile = getFileFromPath(currentFile);
741         } catch (CommandAbstractException e1) {
742             return null;
743         }
744         if (position > 0) {
745             if (trueFile.length() < position) {
746                 logger.error("Cannot Change position in getFileOutputStream: file is smaller than required position");
747                 return null;
748             }
749             RandomAccessFile raf = getRandomFile();
750             try {
751                 raf.setLength(position);
752                 raf.close();
753             } catch (IOException e) {
754                 logger.error("Change position in getFileOutputStream:", e);
755                 return null;
756             }
757             logger.debug("New size: "+trueFile.length()+" : "+position);
758         }
759         FileOutputStream fos = null;
760         try {
761             fos = new FileOutputStream(trueFile, append);
762         } catch (FileNotFoundException e) {
763             logger.error("File not found in getRandomFile:", e);
764             return null;
765         }
766         return fos;
767     }
768 }