1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.context;
22
23 import goldengate.common.command.exception.CommandAbstractException;
24 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
25 import goldengate.common.database.exception.GoldenGateDatabaseException;
26 import goldengate.common.exception.IllegalFiniteStateException;
27 import goldengate.common.exception.NoRestartException;
28 import goldengate.common.file.SessionInterface;
29 import goldengate.common.file.filesystembased.FilesystemBasedFileParameterImpl;
30 import goldengate.common.logging.GgInternalLogger;
31 import goldengate.common.logging.GgInternalLoggerFactory;
32 import goldengate.common.state.MachineState;
33
34 import java.net.InetSocketAddress;
35 import java.net.SocketAddress;
36
37 import openr66.context.authentication.R66Auth;
38 import openr66.context.filesystem.R66Dir;
39 import openr66.context.filesystem.R66File;
40 import openr66.context.filesystem.R66Restart;
41 import openr66.context.task.exception.OpenR66RunnerErrorException;
42 import openr66.database.data.DbTaskRunner;
43 import openr66.database.data.DbTaskRunner.TASKSTEP;
44 import openr66.protocol.configuration.Configuration;
45 import openr66.protocol.exception.OpenR66ProtocolSystemException;
46 import openr66.protocol.localhandler.LocalChannelReference;
47
48
49
50
51
52
53
54 public class R66Session implements SessionInterface {
55
56
57
58 private static final GgInternalLogger logger = GgInternalLoggerFactory
59 .getLogger(R66Session.class);
60
61
62
63
64 private int blockSize = Configuration.configuration.BLOCKSIZE;
65
66
67
68 private LocalChannelReference localChannelReference;
69
70
71
72 private final R66Auth auth;
73
74
75
76 private SocketAddress raddress;
77
78
79
80 private SocketAddress laddress;
81
82
83
84
85 private final R66Dir dir;
86
87
88
89 private R66File file;
90
91
92
93 private volatile boolean isReady = false;
94
95
96
97
98 private final R66Restart restart;
99
100
101
102
103 private DbTaskRunner runner = null;
104
105 private String status = "NoStatus";
106
107
108
109
110 private final MachineState<R66FiniteDualStates> state;
111
112
113
114 private R66BusinessInterface businessObject = null;
115
116
117
118 private boolean extendedProtocol = Configuration.configuration.extendedProtocol;
119
120
121
122
123 public R66Session() {
124 isReady = false;
125 auth = new R66Auth(this);
126 dir = new R66Dir(this);
127 restart = new R66Restart(this);
128 state = R66FiniteDualStates.newSessionMachineState();
129 }
130
131
132
133
134 public boolean getExtendedProtocol() {
135 return extendedProtocol;
136 }
137
138
139
140
141 public R66BusinessInterface getBusinessObject() {
142 return businessObject;
143 }
144
145
146
147
148 public void setBusinessObject(R66BusinessInterface businessObject) {
149 this.businessObject = businessObject;
150 }
151
152
153
154
155
156
157 public void newState(R66FiniteDualStates desiredstate) {
158 try {
159 state.setCurrent(desiredstate);
160 } catch (IllegalFiniteStateException e) {
161 logger.warn("Should not changed of State: {} {}", this, e.getMessage());
162 state.setDryCurrent(desiredstate);
163 }
164 }
165
166
167
168
169 public void setStatus(int stat) {
170 StackTraceElement elt = Thread.currentThread().getStackTrace()[2];
171 this.status = "("+elt.getFileName()+":"+elt.getLineNumber()+"):"+stat;
172 }
173
174
175
176
177
178 @Override
179 public void clear() {
180
181 if (runner != null && (!runner.isFinished()) && (!runner.continueTransfer())) {
182 if (localChannelReference != null) {
183 if (!localChannelReference.getFutureRequest().isDone()) {
184 R66Result result = new R66Result(new OpenR66RunnerErrorException(
185 "Close before ending"), this, true,
186 ErrorCode.Disconnection, runner);
187 result.runner = runner;
188 try {
189 setFinalizeTransfer(false, result);
190 } catch (OpenR66RunnerErrorException e) {
191 } catch (OpenR66ProtocolSystemException e) {
192 }
193 }
194 }
195 }
196 if (dir != null) {
197 dir.clear();
198 }
199 if (auth != null) {
200 auth.clear();
201 }
202 if (runner != null) {
203 runner.clear();
204 }
205 if (state != null) {
206 try {
207 state.setCurrent(R66FiniteDualStates.CLOSEDCHANNEL);
208 } catch (IllegalFiniteStateException e) {
209 }
210
211 }
212
213 isReady = false;
214 if (businessObject != null) {
215 businessObject.releaseResources();
216 businessObject = null;
217 }
218 }
219
220
221
222
223
224
225 @Override
226 public R66Auth getAuth() {
227 return auth;
228 }
229
230
231
232
233
234
235 @Override
236 public int getBlockSize() {
237 return blockSize;
238 }
239
240
241
242
243
244 public void setBlockSize(int blocksize) {
245 blockSize = blocksize;
246 }
247
248
249
250
251
252
253 @Override
254 public R66Dir getDir() {
255 return dir;
256 }
257
258
259
260
261
262
263 @Override
264 public FilesystemBasedFileParameterImpl getFileParameter() {
265 return Configuration.getFileParameter();
266 }
267
268
269
270
271
272
273 @Override
274 public R66Restart getRestart() {
275 return restart;
276 }
277
278
279
280
281
282 public boolean isAuthenticated() {
283 if (auth == null) {
284 return false;
285 }
286 return auth.isIdentified();
287 }
288
289
290
291
292 public boolean isReady() {
293 return isReady;
294 }
295
296
297
298
299
300 public void setReady(boolean isReady) {
301 this.isReady = isReady;
302 }
303
304
305
306
307 public DbTaskRunner getRunner() {
308 return runner;
309 }
310
311
312
313
314
315 public void setLocalChannelReference(
316 LocalChannelReference localChannelReference) {
317 this.localChannelReference = localChannelReference;
318 this.localChannelReference.setSession(this);
319 if (this.localChannelReference.getNetworkChannel() != null) {
320 this.raddress = this.localChannelReference.getNetworkChannel().getRemoteAddress();
321 this.laddress = this.localChannelReference.getNetworkChannel().getLocalAddress();
322 } else {
323 this.raddress = this.laddress = new InetSocketAddress(0);
324 }
325 }
326
327
328
329
330
331 public SocketAddress getRemoteAddress() {
332 return this.raddress;
333 }
334
335
336
337
338 public SocketAddress getLocalAddress() {
339 return this.laddress;
340 }
341
342
343
344 public LocalChannelReference getLocalChannelReference() {
345 return localChannelReference;
346 }
347
348
349
350
351
352
353 public void setNoSessionRunner(DbTaskRunner runner, LocalChannelReference localChannelReference) {
354 this.runner = runner;
355
356 try {
357 file = (R66File) dir.setFile(this.runner.getFilename(),
358 false);
359 } catch (CommandAbstractException e1) {
360 }
361 this.auth.specialNoSessionAuth(false, Configuration.configuration.HOST_ID);
362 this.localChannelReference = localChannelReference;
363 if (this.localChannelReference == null) {
364 if (this.runner.getLocalChannelReference() != null) {
365 this.localChannelReference = this.runner.getLocalChannelReference();
366 } else {
367 this.localChannelReference = new LocalChannelReference();
368 }
369 this.localChannelReference.setErrorMessage(this.runner.getErrorInfo().mesg,
370 this.runner.getErrorInfo());
371 }
372 runner.setLocalChannelReference(this.localChannelReference);
373 this.localChannelReference.setSession(this);
374 }
375
376
377
378
379 public void setFileBeforePreRunner() throws OpenR66RunnerErrorException {
380
381 String filename;
382 if (this.runner.isPreTaskStarting()) {
383 filename = R66Dir.normalizePath(this.runner.getOriginalFilename());
384 this.runner.setOriginalFilename(filename);
385 } else {
386 filename = this.runner.getFilename();
387 }
388 if (this.runner.isSender()) {
389 try {
390 if (file == null) {
391 try {
392 file = (R66File) dir.setFile(filename, false);
393 } catch (CommandAbstractException e) {
394
395
396 file = dir.setFileNoCheck(filename);
397 }
398 }
399 if (runner.isSendThrough()) {
400
401 logger.debug("File is in through mode: {}", file);
402 } else if (!file.canRead()) {
403
404
405 file = new R66File(this, dir, filename);
406 if (!file.canRead()) {
407 this.runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
408 throw new OpenR66RunnerErrorException("File cannot be read: "+
409 file.getTrueFile().getAbsolutePath());
410 }
411 }
412 } catch (CommandAbstractException e) {
413 throw new OpenR66RunnerErrorException(e);
414 }
415 } else {
416
417 file = new R66File(this, dir, filename);
418 }
419 }
420
421
422
423
424
425
426
427 public void setFileAfterPreRunner(boolean createFile) throws OpenR66RunnerErrorException {
428 if (this.businessObject != null) {
429 this.businessObject.checkAtChangeFilename(this);
430 }
431
432 if (this.runner.isSender()) {
433 try {
434 if (file == null) {
435 try {
436 file = (R66File) dir.setFile(this.runner.getFilename(),
437 false);
438 } catch (CommandAbstractException e) {
439
440
441 file = dir.setFileNoCheck(this.runner.getFilename());
442
443 }
444 }
445 if (runner.isSendThrough()) {
446
447 logger.debug("File is in through mode: {}", file);
448 } else if (!file.canRead()) {
449
450
451 file = new R66File(this, dir, this.runner.getFilename());
452 if (!file.canRead()) {
453 this.runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
454 throw new OpenR66RunnerErrorException("File cannot be read: "+
455 file.getTrueFile().getAbsolutePath());
456 }
457 }
458 } catch (CommandAbstractException e) {
459 throw new OpenR66RunnerErrorException(e);
460 }
461 } else {
462
463 if (runner.getRank() > 0) {
464
465 try {
466 file = (R66File) dir.setFile(this.runner
467 .getFilename(), true);
468 if (runner.isRecvThrough()) {
469
470 logger.debug("File is in through mode: {}", file);
471 } else if (!file.canWrite()) {
472 throw new OpenR66RunnerErrorException(
473 "File cannot be write");
474 }
475 } catch (CommandAbstractException e) {
476 throw new OpenR66RunnerErrorException(e);
477 }
478 } else {
479
480 if (createFile) {
481 file = null;
482 String newfilename = this.runner.getOriginalFilename();
483 if (newfilename.charAt(1) == ':') {
484
485 newfilename = newfilename.substring(2);
486 }
487 this.runner.setFilename(R66File.getBasename(newfilename));
488 try {
489 file = dir.setUniqueFile(this.runner.getSpecialId(),
490 this.runner.getFilename());
491 if (runner.isRecvThrough()) {
492
493 logger.debug("File is in through mode: {}", file);
494 this.runner.deleteTempFile();
495 } else if (!file.canWrite()) {
496 this.runner.deleteTempFile();
497 throw new OpenR66RunnerErrorException(
498 "File cannot be write");
499 }
500 } catch (CommandAbstractException e) {
501 this.runner.deleteTempFile();
502 throw new OpenR66RunnerErrorException(e);
503 }
504 } else {
505 throw new OpenR66RunnerErrorException("No file created");
506 }
507 }
508 }
509
510 try {
511 if (this.runner.isFileMoved()) {
512 this.runner.setFileMoved(file.getFile(), true);
513 } else {
514 this.runner.setFilename(file.getFile());
515 }
516 } catch (CommandAbstractException e) {
517 this.runner.deleteTempFile();
518 throw new OpenR66RunnerErrorException(e);
519 }
520 }
521
522
523
524
525
526 public void setBadRunner(DbTaskRunner runner, ErrorCode code) {
527 this.runner = runner;
528 if (code == ErrorCode.QueryAlreadyFinished) {
529 if (this.runner.isSender()) {
530
531 try {
532 dir.changeDirectory(this.runner.getRule().sendPath);
533 } catch (CommandAbstractException e) {
534 }
535 } else {
536
537 try {
538 dir.changeDirectory(this.runner.getRule().workPath);
539 } catch (CommandAbstractException e) {
540 }
541 }
542 if (this.businessObject != null) {
543 try {
544 this.businessObject.checkAtError(this);
545 } catch (OpenR66RunnerErrorException e) {
546 }
547 }
548 this.runner.setPostTask();
549 try {
550 setFileAfterPreRunner(false);
551 } catch (OpenR66RunnerErrorException e) {
552 }
553 }
554 }
555
556
557
558
559
560
561
562 public void setRunner(DbTaskRunner runner)
563 throws OpenR66RunnerErrorException {
564 this.runner = runner;
565 setBusinessObject(Configuration.configuration.r66BusinessFactory.getBusinessInterface(this));
566 this.runner.checkThroughMode();
567 if (this.businessObject != null) {
568 this.businessObject.checkAtStartup(this);
569 }
570 if (this.runner.isSender()) {
571 if (runner.isSendThrough()) {
572
573
574 try {
575 dir.changeDirectory(this.runner.getRule().sendPath);
576 } catch (CommandAbstractException e) {
577
578 }
579 } else {
580
581 try {
582 dir.changeDirectory(this.runner.getRule().sendPath);
583 } catch (CommandAbstractException e) {
584 throw new OpenR66RunnerErrorException(e);
585 }
586 }
587 } else {
588 if (runner.isRecvThrough()) {
589
590
591 try {
592 dir.changeDirectory(this.runner.getRule().workPath);
593 } catch (CommandAbstractException e) {
594 }
595 } else {
596
597 try {
598 dir.changeDirectory(this.runner.getRule().workPath);
599 } catch (CommandAbstractException e) {
600 throw new OpenR66RunnerErrorException(e);
601 }
602 }
603 }
604 if (runner.getRank() > 0) {
605 logger.debug("restart at "+runner.getRank()+ " {}",runner);
606 runner.setTransferTask(runner.getRank());
607 restart.restartMarker(runner.getBlocksize() * runner.getRank());
608 } else {
609 restart.restartMarker(0);
610 }
611 if (runner.getGloballaststep() == TASKSTEP.NOTASK.ordinal() ||
612 runner.getGloballaststep() == TASKSTEP.PRETASK.ordinal()) {
613 setFileBeforePreRunner();
614 this.runner.setPreTask();
615 runner.saveStatus();
616 this.runner.run();
617 runner.saveStatus();
618 runner.setTransferTask(runner.getRank());
619 } else {
620 runner.reset();
621 runner.changeUpdatedInfo(UpdatedInfo.RUNNING);
622 runner.saveStatus();
623 }
624
625 setFileAfterPreRunner(true);
626 if (runner.getGloballaststep() == TASKSTEP.TRANSFERTASK.ordinal()) {
627 if (this.businessObject != null) {
628 this.businessObject.checkAfterPreCommand(this);
629 }
630 if (!this.runner.isSender()) {
631
632 if (runner.isRecvThrough()) {
633
634 } else {
635 try {
636 long length = file.length();
637 long oldPosition = restart.getPosition();
638 restart.setSet(true);
639 if (oldPosition > length) {
640 int newRank = ((int) (length / this.runner.getBlocksize()))
641 - Configuration.RANKRESTART;
642 if (newRank <= 0) {
643 newRank = 1;
644 }
645 logger.warn("Decreased Rank Restart for {} at "+newRank, runner);
646 runner.setTransferTask(newRank);
647 restart.restartMarker(this.runner.getBlocksize() * this.runner.getRank());
648 }
649 try {
650 file.restartMarker(restart);
651 } catch (CommandAbstractException e) {
652 this.runner.deleteTempFile();
653 throw new OpenR66RunnerErrorException(e);
654 }
655 } catch (CommandAbstractException e1) {
656
657 throw new OpenR66RunnerErrorException("File length is wrong", e1);
658 } catch (NoRestartException e) {
659
660 }
661 }
662 } else {
663 try {
664 this.localChannelReference.getFutureRequest().filesize = file.length();
665 } catch (CommandAbstractException e1) {
666 }
667 try {
668 file.restartMarker(restart);
669 } catch (CommandAbstractException e) {
670 this.runner.deleteTempFile();
671 throw new OpenR66RunnerErrorException(e);
672 }
673 }
674 }
675 this.runner.saveStatus();
676 logger.info("Final init: {}", this.runner);
677 }
678
679
680
681
682
683
684 public void renameReceiverFile(String newFilename) throws OpenR66RunnerErrorException {
685
686 if (runner.getRank() > 0) {
687 logger.error("Renaming file is not correct since transfer does not start from first block");
688
689 throw new OpenR66RunnerErrorException("Renaming file not correct since transfer already started");
690 }
691 if (!runner.isRecvThrough()) {
692 this.runner.deleteTempFile();
693 }
694
695 this.runner.setOriginalFilename(newFilename);
696 this.setFileAfterPreRunner(true);
697 this.runner.saveStatus();
698 }
699
700
701
702
703
704
705
706
707
708 public void setFinalizeTransfer(boolean status, R66Result finalValue)
709 throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
710 logger.debug(status+":"+finalValue+":"+runner);
711 if (runner == null) {
712 if (localChannelReference != null) {
713 if (status) {
714 localChannelReference.validateRequest(finalValue);
715 } else {
716 localChannelReference.invalidateRequest(finalValue);
717 }
718 }
719 if (this.businessObject != null) {
720 if (status) {
721 this.businessObject.checkAfterTransfer(this);
722 } else {
723 this.businessObject.checkAtError(this);
724 }
725 }
726 return;
727 }
728 if (this.businessObject != null) {
729 if (this.businessObject != null) {
730 if (status) {
731 this.businessObject.checkAfterTransfer(this);
732 } else {
733 this.businessObject.checkAtError(this);
734 }
735 }
736 }
737 if (runner.isAllDone()) {
738 logger.debug("Transfer already done but " + status + " on " + file+runner.toShortString(),
739 new OpenR66RunnerErrorException(finalValue.toString()));
740
741
742
743 return;
744 }
745 if (localChannelReference.getFutureRequest().isDone()) {
746 logger.debug("Request already done but " + status + " on " + file+runner.toShortString(),
747 new OpenR66RunnerErrorException(finalValue.toString()));
748
749 return;
750 }
751 if (! status) {
752 this.runner.deleteTempFile();
753 runner.setErrorExecutionStatus(finalValue.code);
754 }
755 if (status) {
756 runner.finishTransferTask(ErrorCode.TransferOk);
757 } else {
758 runner.finishTransferTask(finalValue.code);
759 }
760 runner.saveStatus();
761 logger.debug("Transfer " + status + " on {} and {}", file, runner);
762 if (!runner.ready()) {
763
764 OpenR66RunnerErrorException runnerErrorException;
765 if (!status && finalValue.exception != null) {
766 runnerErrorException = new OpenR66RunnerErrorException(
767 "Pre task in error (or even before)",
768 finalValue.exception);
769 } else {
770 runnerErrorException = new OpenR66RunnerErrorException(
771 "Pre task in error (or even before)");
772 }
773 finalValue.exception = runnerErrorException;
774 logger.error("Pre task in error (or even before) : "+
775 runnerErrorException.getMessage());
776 localChannelReference.invalidateRequest(finalValue);
777 throw runnerErrorException;
778 }
779 try {
780 if (file != null) {
781 file.closeFile();
782 }
783 } catch (CommandAbstractException e1) {
784 R66Result result = finalValue;
785 if (status) {
786 result = new R66Result(new OpenR66RunnerErrorException(e1),
787 this, false, ErrorCode.Internal, runner);
788 }
789 localChannelReference.invalidateRequest(result);
790 throw (OpenR66RunnerErrorException) result.exception;
791 }
792 runner.finalizeTransfer(localChannelReference, file, finalValue, status);
793 if (this.businessObject != null) {
794 this.businessObject.checkAfterPost(this);
795 }
796 }
797
798
799
800
801
802
803
804 public void tryFinalizeRequest(R66Result errorValue)
805 throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
806 if (this.getLocalChannelReference() == null) {
807 return;
808 }
809 if (this.getLocalChannelReference().getFutureRequest().isDone()) {
810 return;
811 }
812
813 if (runner == null) {
814 localChannelReference.invalidateRequest(errorValue);
815 return;
816 }
817
818 if (runner.getStatus() == ErrorCode.CompleteOk) {
819
820 runner.setAllDone();
821 try {
822 runner.update();
823 } catch (GoldenGateDatabaseException e) {
824 }
825 localChannelReference.validateRequest(
826 new R66Result(this, true, ErrorCode.CompleteOk, runner));
827 } else if (runner.getStatus() == ErrorCode.TransferOk &&
828 ((!runner.isSender()) || errorValue.code == ErrorCode.QueryAlreadyFinished)) {
829
830
831 try {
832 this.setFinalizeTransfer(true,
833 new R66Result(this, true, ErrorCode.CompleteOk, runner));
834 localChannelReference.validateRequest(
835 localChannelReference.getFutureEndTransfer().getResult());
836 } catch (OpenR66ProtocolSystemException e) {
837 logger.error("Cannot validate runner:\n {}",runner.toShortString());
838 runner.changeUpdatedInfo(UpdatedInfo.INERROR);
839 runner.setErrorExecutionStatus(errorValue.code);
840 try {
841 runner.update();
842 } catch (GoldenGateDatabaseException e1) {
843 }
844 this.setFinalizeTransfer(false, errorValue);
845 } catch (OpenR66RunnerErrorException e) {
846 logger.error("Cannot validate runner:\n {}",runner.toShortString());
847 runner.changeUpdatedInfo(UpdatedInfo.INERROR);
848 runner.setErrorExecutionStatus(errorValue.code);
849 try {
850 runner.update();
851 } catch (GoldenGateDatabaseException e1) {
852 }
853 this.setFinalizeTransfer(false, errorValue);
854 }
855 } else {
856
857 this.setFinalizeTransfer(false, errorValue);
858 }
859 }
860
861
862
863 public R66File getFile() {
864 return file;
865 }
866
867 @Override
868 public String toString() {
869 return "Session: FS[" + state.getCurrent()+"] "+status+"\n "+
870 (auth != null? auth.toString() : "no Auth") + "\n " +
871 (dir != null? dir.toString() : "no Dir") + "\n " +
872 (file != null? file.toString() : "no File") + "\n " +
873 (runner != null? runner.toShortString() : "no Runner");
874 }
875
876
877
878
879
880
881 @Override
882 public String getUniqueExtension() {
883 return Configuration.EXT_R66;
884 }
885 }