1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.context.filesystem;
22
23 import goldengate.common.command.exception.CommandAbstractException;
24 import goldengate.common.exception.FileEndOfTransferException;
25 import goldengate.common.exception.FileTransferException;
26 import goldengate.common.file.DataBlock;
27 import goldengate.common.file.filesystembased.FilesystemBasedDirImpl;
28 import goldengate.common.file.filesystembased.FilesystemBasedFileImpl;
29 import goldengate.common.logging.GgInternalLogger;
30 import goldengate.common.logging.GgInternalLoggerFactory;
31
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.io.FileNotFoundException;
35 import java.io.FileOutputStream;
36 import java.io.IOException;
37 import java.io.RandomAccessFile;
38 import java.nio.channels.FileChannel;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import openr66.context.ErrorCode;
42 import openr66.context.R66Result;
43 import openr66.context.R66Session;
44 import openr66.context.task.exception.OpenR66RunnerErrorException;
45 import openr66.protocol.exception.OpenR66ProtocolPacketException;
46 import openr66.protocol.exception.OpenR66ProtocolSystemException;
47 import openr66.protocol.localhandler.LocalChannelReference;
48 import openr66.protocol.localhandler.RetrieveRunner;
49 import openr66.protocol.utils.ChannelUtils;
50
51 import org.jboss.netty.channel.ChannelFuture;
52
53
54
55
56
57
58
59 public class R66File extends FilesystemBasedFileImpl {
60
61
62
63 private static final GgInternalLogger logger = GgInternalLoggerFactory
64 .getLogger(R66File.class);
65
66
67
68
69 private boolean isExternal = false;
70
71
72
73
74
75
76
77
78 public R66File(R66Session session, R66Dir dir, String path, boolean append)
79 throws CommandAbstractException {
80 super(session, dir, path, append);
81 }
82
83
84
85
86
87
88
89
90 public R66File(R66Session session, R66Dir dir, String path) {
91 super(session, dir, path);
92 isExternal = true;
93 }
94
95
96
97
98
99
100
101
102 public void retrieveBlocking(AtomicBoolean running) throws OpenR66RunnerErrorException,
103 OpenR66ProtocolSystemException {
104 boolean retrieveDone = false;
105 LocalChannelReference localChannelReference = getSession()
106 .getLocalChannelReference();
107 try {
108 if (!isReady) {
109 return;
110 }
111 DataBlock block = null;
112 try {
113 block = readDataBlock();
114 } catch (FileEndOfTransferException e) {
115
116 retrieveDone = true;
117 return;
118 }
119 if (block == null) {
120
121 retrieveDone = true;
122 return;
123 }
124 ChannelFuture future1 = null, future2 = null;
125 if ((block != null && (running.get()))) {
126 future1 = RetrieveRunner.writeWhenPossible(
127 block, localChannelReference);
128 }
129
130 while (block != null && (!block.isEOF()) && (running.get())) {
131 try {
132 block = readDataBlock();
133 } catch (FileEndOfTransferException e) {
134
135 try {
136 future1.await();
137 } catch (InterruptedException e1) {
138 }
139 if (future1.isSuccess()) {
140 retrieveDone = true;
141 }
142 return;
143 }
144 future2 = RetrieveRunner.writeWhenPossible(
145 block, localChannelReference);
146 try {
147 future1.await();
148 } catch (InterruptedException e) {
149 }
150 if (! future1.isSuccess()) {
151 return;
152 }
153 future1 = future2;
154 }
155 if (!running.get()) {
156
157 return;
158 }
159
160 if (future1 != null) {
161 try {
162 future1.await();
163 } catch (InterruptedException e) {
164 }
165 if (! future1.isSuccess()) {
166 return;
167 }
168 }
169 retrieveDone = true;
170 return;
171 } catch (FileTransferException e) {
172
173 getSession().setFinalizeTransfer(
174 false,
175 new R66Result(new OpenR66ProtocolSystemException(e),
176 getSession(), false, ErrorCode.TransferError, getSession().getRunner()));
177 } catch (OpenR66ProtocolPacketException e) {
178
179 getSession()
180 .setFinalizeTransfer(
181 false,
182 new R66Result(e, getSession(), false,
183 ErrorCode.Internal, getSession().getRunner()));
184 } finally {
185 if (retrieveDone) {
186 try {
187 ChannelUtils.writeEndTransfer(localChannelReference);
188 } catch (OpenR66ProtocolPacketException e) {
189
190 getSession().setFinalizeTransfer(
191 false,
192 new R66Result(e, getSession(), false,
193 ErrorCode.Internal, getSession().getRunner()));
194 }
195 } else {
196
197 getSession().setFinalizeTransfer(
198 false,
199 new R66Result(new OpenR66ProtocolSystemException("Transfer in error"),
200 getSession(), false, ErrorCode.TransferError, getSession().getRunner()));
201 }
202 }
203 }
204
205
206
207
208
209
210
211 public File getTrueFile() {
212 if (isExternal) {
213 return new File(currentFile);
214 }
215 try {
216 return getFileFromPath(getFile());
217 } catch (CommandAbstractException e) {
218 logger.info("Exception while getting file", e);
219 return null;
220 }
221 }
222
223
224
225
226 public String getBasename() {
227 return getBasename(currentFile);
228 }
229
230
231
232
233
234 public static String getBasename(String path) {
235 File file = new File(path);
236 return file.getName();
237 }
238
239 @Override
240 public R66Session getSession() {
241 return (R66Session) session;
242 }
243
244
245
246
247
248
249
250 @Override
251 public boolean canRead() throws CommandAbstractException {
252 if (isExternal) {
253 File file = new File(currentFile);
254 return file.canRead();
255 }
256 return super.canRead();
257 }
258
259
260
261
262
263
264
265 @Override
266 public boolean canWrite() throws CommandAbstractException {
267 if (isExternal) {
268 File file = new File(currentFile);
269 if (file.exists()) {
270 return file.canWrite();
271 }
272 return file.getParentFile().canWrite();
273 }
274 return super.canWrite();
275 }
276
277
278
279
280
281
282
283 @Override
284 public boolean delete() throws CommandAbstractException {
285 if (isExternal) {
286 File file = new File(currentFile);
287 checkIdentify();
288 if (!isReady) {
289 return false;
290 }
291 if (!file.exists()) {
292 return true;
293 }
294 closeFile();
295 return file.delete();
296 }
297 return super.delete();
298 }
299
300
301
302
303
304
305
306 @Override
307 public boolean exists() throws CommandAbstractException {
308 if (isExternal) {
309 File file = new File(currentFile);
310 return file.exists();
311 }
312 return super.exists();
313 }
314
315
316
317
318
319
320
321
322 @Override
323 protected FileChannel getFileChannel() {
324 if (!isExternal) {
325 return super.getFileChannel();
326 }
327 if (!isReady) {
328 return null;
329 }
330 File trueFile = getTrueFile();
331 FileChannel fileChannel;
332 try {
333 FileInputStream fileInputStream = new FileInputStream(trueFile);
334 fileChannel = fileInputStream.getChannel();
335 if (getPosition() > 0) {
336 fileChannel = fileChannel.position(getPosition());
337 }
338 } catch (FileNotFoundException e) {
339 logger.error("FileInterface not found in getFileChannel:", e);
340 return null;
341 } catch (IOException e) {
342 logger.error("Change position in getFileChannel:", e);
343 return null;
344 }
345 return fileChannel;
346 }
347
348 @Override
349 protected RandomAccessFile getRandomFile() {
350 if (!isExternal) {
351 return super.getRandomFile();
352 }
353 if (!isReady) {
354 return null;
355 }
356 File trueFile = getTrueFile();
357 RandomAccessFile raf = null;
358 try {
359 raf = new RandomAccessFile(trueFile, "rw");
360 raf.seek(getPosition());
361 } catch (FileNotFoundException e) {
362 logger.error("File not found in getRandomFile:", e);
363 return null;
364 } catch (IOException e) {
365 logger.error("Change position in getRandomFile:", e);
366 return null;
367 }
368 return raf;
369 }
370
371
372
373
374
375 protected FileOutputStream getFileOutputStream(boolean append) {
376 if (!isExternal) {
377 return super.getFileOutputStream(append);
378 }
379 if (!isReady) {
380 return null;
381 }
382 File trueFile = getTrueFile();
383 if (getPosition() > 0) {
384 if (trueFile.length() < getPosition()) {
385 logger.error("Cannot Change position in getFileOutputStream: file is smaller than required position");
386 return null;
387 }
388 RandomAccessFile raf = getRandomFile();
389 try {
390 raf.setLength(getPosition());
391 raf.close();
392 } catch (IOException e) {
393 logger.error("Change position in getFileOutputStream:", e);
394 return null;
395 }
396 }
397 FileOutputStream fos = null;
398 try {
399 fos = new FileOutputStream(trueFile, append);
400 } catch (FileNotFoundException e) {
401 logger.error("File not found in getRandomFile:", e);
402 return null;
403 }
404 return fos;
405 }
406
407
408
409
410
411
412
413 @Override
414 public boolean isDirectory() throws CommandAbstractException {
415 if (isExternal) {
416 File dir = new File(currentFile);
417 return dir.isDirectory();
418 }
419 return super.isDirectory();
420 }
421
422
423
424
425
426
427
428 @Override
429 public boolean isFile() throws CommandAbstractException {
430 if (isExternal) {
431 File file = new File(currentFile);
432 return file.isFile();
433 }
434 return super.isFile();
435 }
436
437
438
439
440
441
442
443 @Override
444 public long length() throws CommandAbstractException {
445 if (isExternal) {
446 File file = new File(currentFile);
447 return file.length();
448 }
449 return super.length();
450 }
451
452
453
454
455
456
457
458
459 @Override
460 public boolean renameTo(String path) throws CommandAbstractException {
461 if (!isExternal) {
462 return super.renameTo(path);
463 }
464 checkIdentify();
465 if (!isReady) {
466 return false;
467 }
468 File file = getTrueFile();
469 if (file.canRead()) {
470 File newFile = getFileFromPath(path);
471 if (newFile.getParentFile().canWrite()) {
472 if (!file.renameTo(newFile)) {
473 FileOutputStream fileOutputStream;
474 try {
475 fileOutputStream = new FileOutputStream(newFile);
476 } catch (FileNotFoundException e) {
477 logger
478 .warn("Cannot find file: " + newFile.getName(),
479 e);
480 return false;
481 }
482 FileChannel fileChannelOut = fileOutputStream.getChannel();
483 if (get(fileChannelOut)) {
484 delete();
485 } else {
486 try {
487 fileChannelOut.close();
488 } catch (IOException e) {
489 }
490 logger.error("Cannot write file: {}", newFile);
491 return false;
492 }
493 }
494 currentFile = getRelativePath(newFile);
495 isExternal = false;
496 isReady = true;
497 return true;
498 }
499 }
500 return false;
501 }
502
503
504
505
506
507
508
509
510
511
512 public boolean renameTo(String path, boolean external)
513 throws CommandAbstractException {
514 if (!external) {
515 return renameTo(path);
516 }
517 checkIdentify();
518 if (!isReady) {
519 return false;
520 }
521 File file = getTrueFile();
522 if (file.canRead()) {
523 File newFile = new File(path);
524 if (newFile.getParentFile().canWrite()) {
525 if (!file.renameTo(newFile)) {
526 FileOutputStream fileOutputStream;
527 try {
528 fileOutputStream = new FileOutputStream(newFile);
529 } catch (FileNotFoundException e) {
530 logger
531 .warn("Cannot find file: " + newFile.getName(),
532 e);
533 return false;
534 }
535 FileChannel fileChannelOut = fileOutputStream.getChannel();
536 if (get(fileChannelOut)) {
537 delete();
538 } else {
539 try {
540 fileChannelOut.close();
541 } catch (IOException e) {
542 }
543 logger.error("Cannot write file: {}", newFile);
544 return false;
545 }
546 }
547 currentFile = FilesystemBasedDirImpl.normalizePath(newFile
548 .getAbsolutePath());
549 isExternal = true;
550 isReady = true;
551 return true;
552 }
553 }
554 return false;
555 }
556
557
558
559
560
561
562
563
564
565 public void replaceFilename(String filename, boolean isExternal)
566 throws CommandAbstractException {
567 closeFile();
568 currentFile = filename;
569 this.isExternal = isExternal;
570 isReady = true;
571 }
572
573
574
575
576
577
578
579
580 @Override
581 public boolean closeFile() throws CommandAbstractException {
582 boolean status = super.closeFile();
583
584 isReady = true;
585 return status;
586 }
587
588
589
590
591
592 public boolean isExternal() {
593 return isExternal;
594 }
595 @Override
596 public String toString() {
597 return "File: " + currentFile + " Ready " + isReady + " " +
598 getPosition();
599 }
600 }