1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.common.file.filesystembased;
22
23 import goldengate.common.command.exception.CommandAbstractException;
24 import goldengate.common.command.exception.Reply502Exception;
25 import goldengate.common.command.exception.Reply530Exception;
26 import goldengate.common.command.exception.Reply550Exception;
27 import goldengate.common.exception.FileEndOfTransferException;
28 import goldengate.common.exception.FileTransferException;
29 import goldengate.common.exception.NoRestartException;
30 import goldengate.common.file.DataBlock;
31 import goldengate.common.file.DirInterface;
32 import goldengate.common.file.Restart;
33 import goldengate.common.file.SessionInterface;
34 import goldengate.common.logging.GgInternalLogger;
35 import goldengate.common.logging.GgInternalLoggerFactory;
36
37 import java.io.File;
38 import java.io.FileInputStream;
39 import java.io.FileNotFoundException;
40 import java.io.FileOutputStream;
41 import java.io.IOException;
42 import java.io.RandomAccessFile;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.ClosedChannelException;
45 import java.nio.channels.FileChannel;
46
47 import org.jboss.netty.buffer.ChannelBuffer;
48 import org.jboss.netty.buffer.ChannelBuffers;
49
50
51
52
53
54
55
56 public abstract class FilesystemBasedFileImpl implements
57 goldengate.common.file.FileInterface {
58
59
60
61 private static final GgInternalLogger logger = GgInternalLoggerFactory
62 .getLogger(FilesystemBasedFileImpl.class);
63
64
65
66
67 protected final SessionInterface session;
68
69
70
71
72
73 private final FilesystemBasedDirImpl dir;
74
75
76
77
78 private final FilesystemBasedAuthImpl auth;
79
80
81
82
83 protected String currentFile = null;
84
85
86
87
88 protected boolean isReady = false;
89
90
91
92
93 protected boolean isAppend = false;
94
95
96
97
98
99
100
101
102
103 public FilesystemBasedFileImpl(SessionInterface session,
104 FilesystemBasedDirImpl dir, String path, boolean append)
105 throws CommandAbstractException {
106 this.session = session;
107 auth = (FilesystemBasedAuthImpl) session.getAuth();
108 this.dir = dir;
109 currentFile = path;
110 isAppend = append;
111 File file = getFileFromPath(path);
112 if (append) {
113 try {
114 setPosition(file.length());
115 } catch (IOException e) {
116
117 return;
118 }
119 } else {
120 try {
121 setPosition(0);
122 } catch (IOException e) {
123 }
124 }
125 isReady = true;
126 }
127
128
129
130
131
132
133
134
135 public FilesystemBasedFileImpl(SessionInterface session,
136 FilesystemBasedDirImpl dir, String path) {
137 this.session = session;
138 auth = (FilesystemBasedAuthImpl) session.getAuth();
139 this.dir = dir;
140 currentFile = path;
141 isReady = true;
142 isAppend = false;
143 position = 0;
144 }
145
146 public void clear() throws CommandAbstractException {
147 closeFile();
148 isReady = false;
149 currentFile = null;
150 isAppend = false;
151 }
152
153 public void checkIdentify() throws Reply530Exception {
154 if (!getSession().getAuth().isIdentified()) {
155 throw new Reply530Exception("User not authentified");
156 }
157 }
158
159 public SessionInterface getSession() {
160 return session;
161 }
162
163 public DirInterface getDir() {
164 return dir;
165 }
166
167
168
169
170
171
172
173
174 protected File getFileFromPath(String path) throws CommandAbstractException {
175 String newdir = getDir().validatePath(path);
176 String truedir = auth.getAbsolutePath(newdir);
177 return new File(truedir);
178 }
179
180
181
182
183
184
185
186 protected String getRelativePath(File file) {
187 return auth.getRelativePath(FilesystemBasedDirImpl.normalizePath(file
188 .getAbsolutePath()));
189 }
190
191 public boolean isDirectory() throws CommandAbstractException {
192 checkIdentify();
193 File dir1 = getFileFromPath(currentFile);
194 return dir1.isDirectory();
195 }
196
197 public boolean isFile() throws CommandAbstractException {
198 checkIdentify();
199 return getFileFromPath(currentFile).isFile();
200 }
201
202 public String getFile() throws CommandAbstractException {
203 checkIdentify();
204 return currentFile;
205 }
206
207 public boolean closeFile() throws CommandAbstractException {
208 if (bfileChannelIn != null) {
209 try {
210 bfileChannelIn.close();
211 } catch (IOException e) {
212 }
213 bfileChannelIn = null;
214 bbyteBuffer = null;
215 }
216 if (fileOutputStream != null) {
217
218
219
220
221
222 try {
223 fileOutputStream.flush();
224 fileOutputStream.close();
225 } catch (ClosedChannelException e) {
226
227 } catch (IOException e) {
228 throw new Reply550Exception("Close in error");
229 }
230 fileOutputStream = null;
231 }
232 position = 0;
233 isReady = false;
234
235 return true;
236 }
237
238 public boolean abortFile() throws CommandAbstractException {
239 if (isInWriting() &&
240 ((FilesystemBasedFileParameterImpl) getSession()
241 .getFileParameter()).deleteOnAbort) {
242 delete();
243 }
244 closeFile();
245 return true;
246 }
247
248 public long length() throws CommandAbstractException {
249 checkIdentify();
250 if (!isReady) {
251 return -1;
252 }
253 if (!exists()) {
254 return -1;
255 }
256 return getFileFromPath(currentFile).length();
257 }
258
259 public boolean isInReading() throws CommandAbstractException {
260 if (!isReady) {
261 return false;
262 }
263 return bfileChannelIn != null;
264 }
265
266 public boolean isInWriting() throws CommandAbstractException {
267 if (!isReady) {
268 return false;
269 }
270 return fileOutputStream != null;
271 }
272
273 public boolean canRead() throws CommandAbstractException {
274 checkIdentify();
275 if (!isReady) {
276 return false;
277 }
278 return getFileFromPath(currentFile).canRead();
279 }
280
281 public boolean canWrite() throws CommandAbstractException {
282 checkIdentify();
283 if (!isReady) {
284 return false;
285 }
286 File file = getFileFromPath(currentFile);
287 if (file.exists()) {
288 return file.canWrite();
289 }
290 return file.getParentFile().canWrite();
291 }
292
293 public boolean exists() throws CommandAbstractException {
294 checkIdentify();
295 if (!isReady) {
296 return false;
297 }
298 return getFileFromPath(currentFile).exists();
299 }
300
301 public boolean delete() throws CommandAbstractException {
302 checkIdentify();
303 if (!isReady) {
304 return false;
305 }
306 if (!exists()) {
307 return true;
308 }
309 closeFile();
310 return getFileFromPath(currentFile).delete();
311 }
312
313 public boolean renameTo(String path) throws CommandAbstractException {
314 checkIdentify();
315 if (!isReady) {
316 return false;
317 }
318 File file = getFileFromPath(currentFile);
319 if (file.canRead()) {
320 File newFile = getFileFromPath(path);
321 if (newFile.getParentFile().canWrite()) {
322 if (!file.renameTo(newFile)) {
323 FileOutputStream fileOutputStream;
324 try {
325 fileOutputStream = new FileOutputStream(newFile);
326 } catch (FileNotFoundException e) {
327 logger
328 .warn("Cannot find file: " + newFile.getName(),
329 e);
330 return false;
331 }
332 FileChannel fileChannelOut = fileOutputStream.getChannel();
333 if (get(fileChannelOut)) {
334 delete();
335 } else {
336 logger.warn("Cannot write file: {}", newFile);
337 return false;
338 }
339 }
340 currentFile = getRelativePath(newFile);
341 isReady = true;
342 return true;
343 }
344 }
345 return false;
346 }
347
348 public DataBlock getMarker() throws CommandAbstractException {
349 throw new Reply502Exception("No marker implemented");
350 }
351
352 public boolean restartMarker(Restart restart)
353 throws CommandAbstractException {
354 try {
355 long newposition = ((FilesystemBasedRestartImpl) restart)
356 .getPosition();
357 try {
358 setPosition(newposition);
359 } catch (IOException e) {
360 throw new Reply502Exception("Cannot set the marker position");
361 }
362 return true;
363 } catch (NoRestartException e) {
364 }
365 return false;
366 }
367
368 public boolean retrieve() throws CommandAbstractException {
369 checkIdentify();
370 if (isReady) {
371 restartMarker(getSession().getRestart());
372 return canRead();
373 }
374 return false;
375 }
376
377 public boolean store() throws CommandAbstractException {
378 checkIdentify();
379 if (isReady) {
380 restartMarker(getSession().getRestart());
381 return canWrite();
382 }
383 return false;
384 }
385
386 public DataBlock readDataBlock() throws FileTransferException,
387 FileEndOfTransferException {
388 if (isReady) {
389 DataBlock dataBlock = new DataBlock();
390 ChannelBuffer buffer = null;
391 buffer = getBlock(getSession().getBlockSize());
392 if (buffer != null) {
393 dataBlock.setBlock(buffer);
394 if (dataBlock.getByteCount() < getSession().getBlockSize()) {
395 dataBlock.setEOF(true);
396 }
397 return dataBlock;
398 }
399 }
400 throw new FileTransferException("No file is ready");
401 }
402
403 public void writeDataBlock(DataBlock dataBlock)
404 throws FileTransferException {
405 if (isReady) {
406 if (dataBlock.isEOF()) {
407 writeBlockEnd(dataBlock.getBlock());
408 return;
409 }
410 writeBlock(dataBlock.getBlock());
411 return;
412 }
413 throw new FileTransferException("No file is ready");
414 }
415
416
417
418
419 private long position = 0;
420
421
422
423
424 private FileOutputStream fileOutputStream = null;
425
426
427
428 private FileChannel bfileChannelIn = null;
429
430
431
432
433 private ByteBuffer bbyteBuffer = null;
434
435
436
437
438
439
440
441 public long getPosition() {
442 return position;
443 }
444
445
446
447
448
449
450
451
452 public void setPosition(long position) throws IOException {
453 this.position = position;
454 if (bfileChannelIn != null) {
455 bfileChannelIn = bfileChannelIn.position(position);
456 }
457
458
459
460 if (fileOutputStream != null) {
461 fileOutputStream.flush();
462 fileOutputStream.close();
463 fileOutputStream = getFileOutputStream(true);
464 if (fileOutputStream == null) {
465 throw new IOException("File cannot changed of Position");
466 }
467 }
468 }
469
470
471
472
473
474
475
476
477
478
479
480 private void writeBlock(ChannelBuffer buffer) throws FileTransferException {
481 if (!isReady) {
482 throw new FileTransferException("No file is ready");
483 }
484
485 if (buffer == null) {
486 return;
487 }
488 if (fileOutputStream == null) {
489
490 fileOutputStream = getFileOutputStream(position > 0);
491 }
492 if (fileOutputStream == null) {
493 throw new FileTransferException("Internal error, file is not ready");
494 }
495 int bufferSize = buffer.readableBytes();
496 int start = 0;
497 byte []newbuf;
498 if (buffer.hasArray()) {
499 start = buffer.arrayOffset();
500 newbuf = buffer.array();
501 if (newbuf.length > start+bufferSize) {
502 byte[] temp = new byte[bufferSize];
503 System.arraycopy(newbuf, start, temp, 0, bufferSize);
504 start = 0;
505 newbuf = temp;
506 buffer.readerIndex(start+bufferSize);
507 } else {
508 buffer.readerIndex(start+bufferSize);
509 }
510 } else {
511 newbuf = new byte[bufferSize];
512 buffer.readBytes(newbuf);
513 }
514 try {
515 fileOutputStream.write(newbuf, start, bufferSize);
516 } catch (IOException e2) {
517 logger.error("Error during write:", e2);
518 try {
519 closeFile();
520 } catch (CommandAbstractException e1) {
521 }
522
523
524 throw new FileTransferException("Internal error, file is not ready");
525 }
526 position += bufferSize;
527 }
528
529
530
531
532
533
534
535
536
537
538 private void writeBlockEnd(ChannelBuffer buffer)
539 throws FileTransferException {
540 writeBlock(buffer);
541 try {
542 closeFile();
543 } catch (CommandAbstractException e) {
544 throw new FileTransferException("Close in error", e);
545 }
546 }
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561 private ChannelBuffer getBlock(int sizeblock) throws FileTransferException,
562 FileEndOfTransferException {
563 if (!isReady) {
564 throw new FileTransferException("No file is ready");
565 }
566 if (bfileChannelIn == null) {
567 bfileChannelIn = getFileChannel();
568 if (bfileChannelIn != null) {
569 if (bbyteBuffer != null) {
570 if (bbyteBuffer.capacity() != sizeblock) {
571 bbyteBuffer = null;
572 bbyteBuffer = ByteBuffer.allocateDirect(sizeblock);
573 }
574 } else {
575 bbyteBuffer = ByteBuffer.allocateDirect(sizeblock);
576 }
577 }
578 }
579 if (bfileChannelIn == null) {
580 throw new FileTransferException("Internal error, file is not ready");
581 }
582 int sizeout = 0;
583 while (sizeout < sizeblock) {
584 try {
585 int sizeread = bfileChannelIn.read(bbyteBuffer);
586 if (sizeread <= 0) {
587 break;
588 }
589 sizeout += sizeread;
590 } catch (IOException e) {
591 logger.error("Error during get:", e);
592 try {
593 closeFile();
594 } catch (CommandAbstractException e1) {
595 }
596 throw new FileTransferException("Internal error, file is not ready");
597 }
598 }
599 if (sizeout <= 0) {
600 try {
601 closeFile();
602 } catch (CommandAbstractException e1) {
603 }
604 isReady = false;
605 throw new FileEndOfTransferException("End of file");
606 }
607 bbyteBuffer.flip();
608 position += sizeout;
609 ChannelBuffer buffer = ChannelBuffers.copiedBuffer(bbyteBuffer);
610 bbyteBuffer.clear();
611 if (sizeout < sizeblock) {
612 try {
613 closeFile();
614 } catch (CommandAbstractException e1) {
615 }
616 isReady = false;
617 }
618 return buffer;
619 }
620
621
622
623
624
625
626
627
628
629
630
631 protected boolean get(FileChannel fileChannelOut) {
632 if (!isReady) {
633 return false;
634 }
635 FileChannel fileChannelIn = getFileChannel();
636 if (fileChannelIn == null) {
637 return false;
638 }
639 long size = 0;
640 long transfert = 0;
641 try {
642 size = fileChannelIn.size();
643 int chunkSize = this.session.getBlockSize();
644 while (transfert < size) {
645 if (chunkSize < size - transfert) {
646 chunkSize = (int) (size - transfert);
647 }
648 transfert += fileChannelOut.transferFrom(fileChannelIn, transfert, chunkSize);
649 }
650 fileChannelOut.force(true);
651 fileChannelIn.close();
652 fileChannelIn = null;
653 fileChannelOut.close();
654 } catch (IOException e) {
655 logger.error("Error during get:", e);
656 if (fileChannelIn != null) {
657 try {
658 fileChannelIn.close();
659 } catch (IOException e1) {
660 }
661 }
662 return false;
663 }
664 if (transfert == size) {
665 position += size;
666 }
667 return transfert == size;
668 }
669
670
671
672
673
674
675 protected FileChannel getFileChannel() {
676 if (!isReady) {
677 return null;
678 }
679 File trueFile;
680 try {
681 trueFile = getFileFromPath(currentFile);
682 } catch (CommandAbstractException e1) {
683 return null;
684 }
685 FileChannel fileChannel;
686 try {
687 FileInputStream fileInputStream = new FileInputStream(trueFile);
688 fileChannel = fileInputStream.getChannel();
689 if (position != 0) {
690 fileChannel = fileChannel.position(position);
691 }
692 } catch (FileNotFoundException e) {
693 logger.error("File not found in getFileChannel:", e);
694 return null;
695 } catch (IOException e) {
696 logger.error("Change position in getFileChannel:", e);
697 return null;
698 }
699 return fileChannel;
700 }
701
702
703
704
705
706 protected RandomAccessFile getRandomFile() {
707 if (!isReady) {
708 return null;
709 }
710 File trueFile;
711 try {
712 trueFile = getFileFromPath(currentFile);
713 } catch (CommandAbstractException e1) {
714 return null;
715 }
716 RandomAccessFile raf = null;
717 try {
718 raf = new RandomAccessFile(trueFile, "rw");
719 raf.seek(position);
720 } catch (FileNotFoundException e) {
721 logger.error("File not found in getRandomFile:", e);
722 return null;
723 } catch (IOException e) {
724 logger.error("Change position in getRandomFile:", e);
725 return null;
726 }
727 return raf;
728 }
729
730
731
732
733
734 protected FileOutputStream getFileOutputStream(boolean append) {
735 if (!isReady) {
736 return null;
737 }
738 File trueFile;
739 try {
740 trueFile = getFileFromPath(currentFile);
741 } catch (CommandAbstractException e1) {
742 return null;
743 }
744 if (position > 0) {
745 if (trueFile.length() < position) {
746 logger.error("Cannot Change position in getFileOutputStream: file is smaller than required position");
747 return null;
748 }
749 RandomAccessFile raf = getRandomFile();
750 try {
751 raf.setLength(position);
752 raf.close();
753 } catch (IOException e) {
754 logger.error("Change position in getFileOutputStream:", e);
755 return null;
756 }
757 logger.debug("New size: "+trueFile.length()+" : "+position);
758 }
759 FileOutputStream fos = null;
760 try {
761 fos = new FileOutputStream(trueFile, append);
762 } catch (FileNotFoundException e) {
763 logger.error("File not found in getRandomFile:", e);
764 return null;
765 }
766 return fos;
767 }
768 }