1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package openr66.protocol.localhandler;
21
22 import goldengate.common.database.DbSession;
23 import goldengate.common.logging.GgInternalLogger;
24 import goldengate.common.logging.GgInternalLoggerFactory;
25 import openr66.client.RecvThroughHandler;
26 import openr66.commander.ClientRunner;
27 import openr66.context.ErrorCode;
28 import openr66.context.R66FiniteDualStates;
29 import openr66.context.R66Result;
30 import openr66.context.R66Session;
31 import openr66.context.task.exception.OpenR66RunnerErrorException;
32 import openr66.database.DbConstant;
33 import openr66.database.data.DbTaskRunner;
34 import openr66.protocol.configuration.Configuration;
35 import openr66.protocol.exception.OpenR66Exception;
36 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
37 import openr66.protocol.networkhandler.NetworkChannel;
38 import openr66.protocol.networkhandler.NetworkServerHandler;
39 import openr66.protocol.networkhandler.NetworkServerPipelineFactory;
40 import openr66.protocol.networkhandler.NetworkTransaction;
41 import openr66.protocol.utils.R66Future;
42
43 import org.jboss.netty.channel.Channel;
44 import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
45
46
47
48
49
50
51
52 public class LocalChannelReference {
53
54
55
56 private static final GgInternalLogger logger = GgInternalLoggerFactory
57 .getLogger(LocalChannelReference.class);
58
59
60
61
62 private final Channel localChannel;
63
64
65
66
67 private final Channel networkChannel;
68
69
70
71
72 private ChannelTrafficShapingHandler cts;
73
74
75
76
77 private NetworkChannel networkChannelObject;
78
79
80
81
82 private final NetworkServerHandler networkServerHandler;
83
84
85
86
87 private final Integer localId;
88
89
90
91
92 private Integer remoteId;
93
94
95
96
97 private final R66Future futureRequest;
98
99
100
101
102 private R66Future futureValidRequest = new R66Future(true);
103
104
105
106
107 private R66Future futureEndTransfer = new R66Future(true);
108
109
110
111
112 private final R66Future futureConnection = new R66Future(true);
113
114
115
116
117 private final R66Future futureStartup = new R66Future(true);
118
119
120
121
122 private R66Session session;
123
124
125
126
127 private String errorMessage = "NoError";
128
129
130
131
132 private ErrorCode code = ErrorCode.Unknown;
133
134
135
136
137 private RecvThroughHandler recvThroughHandler;
138
139 private boolean isSendThroughMode = false;
140
141
142
143 private ClientRunner clientRunner = null;
144
145
146
147
148
149
150
151
152 public LocalChannelReference(Channel localChannel, Channel networkChannel,
153 Integer remoteId, R66Future futureRequest) {
154 this.localChannel = localChannel;
155 this.networkChannel = networkChannel;
156 networkServerHandler = (NetworkServerHandler) this.networkChannel
157 .getPipeline().getLast();
158 localId = this.localChannel.getId();
159 this.remoteId = remoteId;
160 if (futureRequest == null) {
161 this.futureRequest = new R66Future(true);
162 } else {
163 this.futureRequest = futureRequest;
164 }
165 cts = (ChannelTrafficShapingHandler) networkChannel.getPipeline().get(NetworkServerPipelineFactory.LIMITCHANNEL);
166 }
167
168
169
170
171 public LocalChannelReference() {
172 this.localChannel = null;
173 this.networkChannel = null;
174 networkServerHandler = null;
175 localId = 0;
176 this.futureRequest = new R66Future(true);
177 }
178
179
180
181
182 public Channel getLocalChannel() {
183 return localChannel;
184 }
185
186
187
188
189 public Channel getNetworkChannel() {
190 return networkChannel;
191 }
192
193
194
195
196 public Integer getLocalId() {
197 return localId;
198 }
199
200
201
202
203 public Integer getRemoteId() {
204 return remoteId;
205 }
206
207
208
209
210 public ChannelTrafficShapingHandler getChannelTrafficShapingHandler() {
211 return cts;
212 }
213
214
215
216
217 public NetworkChannel getNetworkChannelObject() {
218 return networkChannelObject;
219 }
220
221
222
223
224
225 public void setNetworkChannelObject(NetworkChannel networkChannelObject) {
226 this.networkChannelObject = networkChannelObject;
227 }
228
229
230
231
232 public NetworkServerHandler getNetworkServerHandler() {
233 return networkServerHandler;
234 }
235
236
237
238
239
240 public DbSession getDbSession() {
241 if (networkServerHandler != null) {
242 return networkServerHandler.getDbSession();
243 }
244 return DbConstant.admin.session;
245 }
246
247
248
249
250
251 public void setRemoteId(Integer remoteId) {
252 this.remoteId = remoteId;
253 }
254
255
256
257
258 public R66Session getSession() {
259 return session;
260 }
261
262
263
264
265
266 public void setSession(R66Session session) {
267 this.session = session;
268 }
269
270
271
272
273 public String getErrorMessage() {
274 return errorMessage;
275 }
276
277
278
279
280
281 public void setErrorMessage(String errorMessage, ErrorCode code) {
282 this.errorMessage = errorMessage;
283 this.code = code;
284 }
285
286
287
288
289 public ErrorCode getCurrentCode() {
290 return code;
291 }
292
293
294
295
296
297
298 public void validateStartup(boolean validate) {
299 if (futureStartup.isDone()) {
300 return;
301 }
302 if (validate) {
303 futureStartup.setSuccess();
304 } else {
305 futureStartup.cancel();
306 }
307 }
308
309
310
311
312
313 public R66Future getFutureValidateStartup() {
314 try {
315 if (!futureStartup.await(Configuration.configuration.TIMEOUTCON)) {
316 validateStartup(false);
317 return futureStartup;
318 }
319 } catch (InterruptedException e) {
320 validateStartup(false);
321 return futureStartup;
322 }
323 return futureStartup;
324 }
325
326
327
328
329
330
331 public void validateConnection(boolean validate, R66Result result) {
332 if (futureConnection.isDone()) {
333 logger.debug("LocalChannelReference already validated: " +
334 futureConnection.isSuccess());
335 return;
336 }
337 if (validate) {
338 futureConnection.setResult(result);
339 futureConnection.setSuccess();
340 } else {
341 futureConnection.setResult(result);
342 setErrorMessage(result.getMessage(), result.code);
343 futureConnection.cancel();
344 }
345 }
346
347
348
349
350
351 public R66Future getFutureValidateConnection() {
352 R66Result result;
353 try {
354 for (int i = 0; i < Configuration.RETRYNB; i++) {
355 if (this.networkChannel.isConnected()) {
356 if (!futureConnection.await(Configuration.configuration.TIMEOUTCON)) {
357 if (futureConnection.isDone()) {
358 return futureConnection;
359 } else {
360 if (this.networkChannel.isConnected()) {
361 continue;
362 }
363 result = new R66Result(
364 new OpenR66ProtocolNoConnectionException(
365 "Out of time"), session, false,
366 ErrorCode.ConnectionImpossible, null);
367 validateConnection(false, result);
368 return futureConnection;
369 }
370 } else {
371 return futureConnection;
372 }
373 } else {
374 break;
375 }
376 }
377 } catch (InterruptedException e) {
378 result = new R66Result(
379 new OpenR66ProtocolNoConnectionException(
380 "Interrupted connection"), session, false,
381 ErrorCode.ConnectionImpossible, null);
382 validateConnection(false, result);
383 return futureConnection;
384 }
385 logger.warn("Cannot get Connection due to out of Time: {}",this);
386 result = new R66Result(
387 new OpenR66ProtocolNoConnectionException(
388 "Out of time"), session, false,
389 ErrorCode.ConnectionImpossible, null);
390 validateConnection(false, result);
391 return futureConnection;
392 }
393
394
395
396
397
398
399 public void validateEndTransfer(R66Result finalValue) {
400 if (!futureEndTransfer.isDone()) {
401 futureEndTransfer.setResult(finalValue);
402 futureEndTransfer.setSuccess();
403 } else {
404 logger.debug("Could not validate since Already validated: " +
405 futureEndTransfer.isSuccess() + " " + finalValue);
406 if (!futureEndTransfer.getResult().isAnswered) {
407 futureEndTransfer.getResult().isAnswered = finalValue.isAnswered;
408 }
409 }
410 }
411
412
413
414
415 public R66Future getFutureEndTransfer() {
416 return futureEndTransfer;
417 }
418
419
420
421
422
423
424 public void waitReadyForSendThrough() throws OpenR66Exception {
425 logger.debug("Wait for End of Prepare Transfer");
426 try {
427 this.futureEndTransfer.await();
428 } catch (InterruptedException e) {
429 throw new OpenR66RunnerErrorException("Interrupted", e);
430 }
431 if (this.futureEndTransfer.isSuccess()) {
432
433 this.futureEndTransfer = new R66Future(true);
434 } else {
435 throw this.futureEndTransfer.getResult().exception;
436 }
437 }
438
439
440
441
442 public R66Future getFutureValidRequest() {
443 return futureValidRequest;
444 }
445
446
447
448
449 public R66Future getFutureRequest() {
450 return futureRequest;
451 }
452
453
454
455
456
457
458 public void invalidateRequest(R66Result finalvalue) {
459 R66Result finalValue = finalvalue;
460 if (finalValue == null) {
461 finalValue = new R66Result(session, false, ErrorCode.Unknown, this.session.getRunner());
462 }
463 logger.debug("FET: " + futureEndTransfer.isDone() + ":" +
464 futureEndTransfer.isSuccess() + " FVR: " +
465 futureValidRequest.isDone() + ":" +
466 futureValidRequest.isSuccess() + " FR: " +
467 futureRequest.isDone() + ":" + futureRequest.isSuccess() + " " +
468 finalValue.getMessage());
469 if (!futureEndTransfer.isDone()) {
470 futureEndTransfer.setResult(finalValue);
471 if (finalValue.exception != null) {
472 futureEndTransfer.setFailure(finalValue.exception);
473 } else {
474 futureEndTransfer.cancel();
475 }
476 }
477 if (!futureValidRequest.isDone()) {
478 futureValidRequest.setResult(finalValue);
479 if (finalValue.exception != null) {
480 futureValidRequest.setFailure(finalValue.exception);
481 } else {
482 futureValidRequest.cancel();
483 }
484 }
485 logger.debug("Invalidate Request", new Exception(
486 "Trace for Invalidation"));
487 if (finalValue.code != ErrorCode.ServerOverloaded) {
488 if (!futureRequest.isDone()) {
489 setErrorMessage(finalValue.getMessage(), finalValue.code);
490 futureRequest.setResult(finalValue);
491 if (finalValue.exception != null) {
492 futureRequest.setFailure(finalValue.exception);
493 } else {
494 futureRequest.cancel();
495 }
496 } else {
497 logger.debug("Could not invalidate since Already finished: " +
498 futureEndTransfer.getResult());
499 }
500 } else {
501 setErrorMessage(finalValue.getMessage(), finalValue.code);
502 logger.debug("Overloaded");
503 }
504 if (this.session != null) {
505 DbTaskRunner runner = this.session.getRunner();
506 if (runner != null) {
507 if (runner.isSender()) {
508 NetworkTransaction.stopRetrieve(this);
509 }
510 }
511 }
512 }
513
514
515
516
517
518
519 public void validateRequest(R66Result finalValue) {
520 setErrorMessage("NoError", null);
521 if (!futureEndTransfer.isDone()) {
522 logger.debug("Will validate EndTransfer");
523 validateEndTransfer(finalValue);
524 }
525 if (!futureValidRequest.isDone()) {
526 futureValidRequest.setResult(finalValue);
527 futureValidRequest.setSuccess();
528 }
529 logger.debug("Validate Request");
530 if (!futureRequest.isDone()) {
531 if (finalValue.other == null &&
532 session.getBusinessObject() != null &&
533 session.getBusinessObject().getInfo() != null) {
534 finalValue.other = session.getBusinessObject().getInfo();
535 }
536 futureRequest.setResult(finalValue);
537 futureRequest.setSuccess();
538 } else {
539 logger.info("Already validated: " + futureRequest.isSuccess() +
540 " " + finalValue);
541 if (!futureRequest.getResult().isAnswered) {
542 futureRequest.getResult().isAnswered = finalValue.isAnswered;
543 }
544 }
545 }
546
547 @Override
548 public String toString() {
549 return "LCR: L: " + localId + " R: " + remoteId + "\nStartup["+
550 (futureStartup != null ? futureStartup : "noStartup")+ "]\nConn[" +
551 (futureConnection != null ? futureConnection : "noConn")+ "]\nValidRequestRequest[" +
552 (futureValidRequest != null ? futureValidRequest : "noValidRequest")+ "]\nEndTransfer[" +
553 (futureEndTransfer != null ? futureEndTransfer : "noEndTransfer")+"]\nRequest["+
554 (futureRequest != null ? futureRequest : "noRequest")+ "]";
555 }
556
557
558
559
560 public RecvThroughHandler getRecvThroughHandler() {
561 return recvThroughHandler;
562 }
563
564
565
566
567
568 public boolean isRecvThroughMode() {
569 return recvThroughHandler != null;
570 }
571
572
573
574
575
576 public void setRecvThroughHandler(RecvThroughHandler recvThroughHandler) {
577 this.recvThroughHandler = recvThroughHandler;
578 }
579
580
581
582
583 public boolean isSendThroughMode() {
584 return isSendThroughMode;
585 }
586
587
588
589
590 public void setSendThroughMode(boolean isSendThroughMode) {
591 this.isSendThroughMode = isSendThroughMode;
592 }
593
594
595
596
597 public ClientRunner getClientRunner() {
598 return clientRunner;
599 }
600
601
602
603
604
605 public void setClientRunner(ClientRunner clientRunner) {
606 this.clientRunner = clientRunner;
607 }
608
609
610
611
612
613 public void sessionNewState(R66FiniteDualStates desiredState) {
614 if (session != null) {
615 session.newState(desiredState);
616 }
617 }
618
619 }