View Javadoc

1   /**
2    * This file is part of GoldenGate Project (named also GoldenGate or GG).
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author
5    * tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    * 
8    * All GoldenGate Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   * 
13   * GoldenGate is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Public License along with
18   * GoldenGate . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package openr66.protocol.localhandler;
21  
22  import goldengate.common.database.DbSession;
23  import goldengate.common.logging.GgInternalLogger;
24  import goldengate.common.logging.GgInternalLoggerFactory;
25  import openr66.client.RecvThroughHandler;
26  import openr66.commander.ClientRunner;
27  import openr66.context.ErrorCode;
28  import openr66.context.R66FiniteDualStates;
29  import openr66.context.R66Result;
30  import openr66.context.R66Session;
31  import openr66.context.task.exception.OpenR66RunnerErrorException;
32  import openr66.database.DbConstant;
33  import openr66.database.data.DbTaskRunner;
34  import openr66.protocol.configuration.Configuration;
35  import openr66.protocol.exception.OpenR66Exception;
36  import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
37  import openr66.protocol.networkhandler.NetworkChannel;
38  import openr66.protocol.networkhandler.NetworkServerHandler;
39  import openr66.protocol.networkhandler.NetworkServerPipelineFactory;
40  import openr66.protocol.networkhandler.NetworkTransaction;
41  import openr66.protocol.utils.R66Future;
42  
43  import org.jboss.netty.channel.Channel;
44  import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
45  
46  /**
47   * Reference of one object using Local Channel localId and containing local
48   * channel and network channel.
49   * 
50   * @author Frederic Bregier
51   */
52  public class LocalChannelReference {
53      /**
54       * Internal Logger
55       */
56      private static final GgInternalLogger logger = GgInternalLoggerFactory
57              .getLogger(LocalChannelReference.class);
58  
59      /**
60       * Local Channel
61       */
62      private final Channel localChannel;
63  
64      /**
65       * Network Channel
66       */
67      private final Channel networkChannel;
68      
69      /**
70       * Traffic handler associated if any
71       */
72      private ChannelTrafficShapingHandler cts;
73  
74      /**
75       * Associated NetworkChannel
76       */
77      private NetworkChannel networkChannelObject;
78  
79      /**
80       * Network Server Handler
81       */
82      private final NetworkServerHandler networkServerHandler;
83  
84      /**
85       * Local Id
86       */
87      private final Integer localId;
88  
89      /**
90       * Remote Id
91       */
92      private Integer remoteId;
93  
94      /**
95       * Future on Request
96       */
97      private final R66Future futureRequest;
98  
99      /**
100      * Future on Valid Starting Request
101      */
102     private R66Future futureValidRequest = new R66Future(true);
103 
104     /**
105      * Future on Transfer
106      */
107     private R66Future futureEndTransfer = new R66Future(true);
108 
109     /**
110      * Future on Connection
111      */
112     private final R66Future futureConnection = new R66Future(true);
113 
114     /**
115      * Future on Startup
116      */
117     private final R66Future futureStartup = new R66Future(true);
118 
119     /**
120      * Session
121      */
122     private R66Session session;
123 
124     /**
125      * Last error message
126      */
127     private String errorMessage = "NoError";
128 
129     /**
130      * Last error code
131      */
132     private ErrorCode code = ErrorCode.Unknown;
133 
134     /**
135      * RecvThroughHandler
136      */
137     private RecvThroughHandler recvThroughHandler;
138     
139     private boolean isSendThroughMode = false;
140     /**
141      * Thread for ClientRunner if any
142      */
143     private ClientRunner clientRunner = null;
144 
145     /**
146      * 
147      * @param localChannel
148      * @param networkChannel
149      * @param remoteId
150      * @param futureRequest
151      */
152     public LocalChannelReference(Channel localChannel, Channel networkChannel,
153             Integer remoteId, R66Future futureRequest) {
154         this.localChannel = localChannel;
155         this.networkChannel = networkChannel;
156         networkServerHandler = (NetworkServerHandler) this.networkChannel
157                 .getPipeline().getLast();
158         localId = this.localChannel.getId();
159         this.remoteId = remoteId;
160         if (futureRequest == null) {
161             this.futureRequest = new R66Future(true);
162         } else {
163             this.futureRequest = futureRequest;
164         }
165         cts = (ChannelTrafficShapingHandler) networkChannel.getPipeline().get(NetworkServerPipelineFactory.LIMITCHANNEL);
166     }
167 
168     /**
169      * Special empty LCR constructor
170      */
171     public LocalChannelReference() {
172         this.localChannel = null;
173         this.networkChannel = null;
174         networkServerHandler = null;
175         localId = 0;
176         this.futureRequest = new R66Future(true);
177     }
178 
179     /**
180      * @return the localChannel
181      */
182     public Channel getLocalChannel() {
183         return localChannel;
184     }
185 
186     /**
187      * @return the networkChannel
188      */
189     public Channel getNetworkChannel() {
190         return networkChannel;
191     }
192 
193     /**
194      * @return the id
195      */
196     public Integer getLocalId() {
197         return localId;
198     }
199 
200     /**
201      * @return the remoteId
202      */
203     public Integer getRemoteId() {
204         return remoteId;
205     }
206 
207     /**
208      * @return the ChannelTrafficShapingHandler
209      */
210     public ChannelTrafficShapingHandler getChannelTrafficShapingHandler() {
211         return cts;
212     }
213 
214     /**
215      * @return the networkChannelObject
216      */
217     public NetworkChannel getNetworkChannelObject() {
218         return networkChannelObject;
219     }
220 
221     /**
222      * @param networkChannelObject
223      *            the networkChannelObject to set
224      */
225     public void setNetworkChannelObject(NetworkChannel networkChannelObject) {
226         this.networkChannelObject = networkChannelObject;
227     }
228 
229     /**
230      * @return the networkServerHandler
231      */
232     public NetworkServerHandler getNetworkServerHandler() {
233         return networkServerHandler;
234     }
235 
236     /**
237      * 
238      * @return the actual dbSession
239      */
240     public DbSession getDbSession() {
241         if (networkServerHandler != null) {
242             return networkServerHandler.getDbSession();
243         }
244         return DbConstant.admin.session;
245     }
246 
247     /**
248      * @param remoteId
249      *            the remoteId to set
250      */
251     public void setRemoteId(Integer remoteId) {
252         this.remoteId = remoteId;
253     }
254 
255     /**
256      * @return the session
257      */
258     public R66Session getSession() {
259         return session;
260     }
261 
262     /**
263      * @param session
264      *            the session to set
265      */
266     public void setSession(R66Session session) {
267         this.session = session;
268     }
269 
270     /**
271      * @return the current errorMessage
272      */
273     public String getErrorMessage() {
274         return errorMessage;
275     }
276 
277     /**
278      * @param errorMessage
279      *            the errorMessage to set
280      */
281     public void setErrorMessage(String errorMessage, ErrorCode code) {
282         this.errorMessage = errorMessage;
283         this.code = code;
284     }
285 
286     /**
287      * @return the code
288      */
289     public ErrorCode getCurrentCode() {
290         return code;
291     }
292 
293     /**
294      * Validate or not the Startup (before connection)
295      * 
296      * @param validate
297      */
298     public void validateStartup(boolean validate) {
299         if (futureStartup.isDone()) {
300             return;
301         }
302         if (validate) {
303             futureStartup.setSuccess();
304         } else {
305             futureStartup.cancel();
306         }
307     }
308 
309     /**
310      * 
311      * @return the futureValidateStartup
312      */
313     public R66Future getFutureValidateStartup() {
314         try {
315             if (!futureStartup.await(Configuration.configuration.TIMEOUTCON)) {
316                 validateStartup(false);
317                 return futureStartup;
318             }
319         } catch (InterruptedException e) {
320             validateStartup(false);
321             return futureStartup;
322         }
323         return futureStartup;
324     }
325 
326     /**
327      * Validate or Invalidate the connection (authentication)
328      * 
329      * @param validate
330      */
331     public void validateConnection(boolean validate, R66Result result) {
332         if (futureConnection.isDone()) {
333             logger.debug("LocalChannelReference already validated: " +
334                     futureConnection.isSuccess());
335             return;
336         }
337         if (validate) {
338             futureConnection.setResult(result);
339             futureConnection.setSuccess();
340         } else {
341             futureConnection.setResult(result);
342             setErrorMessage(result.getMessage(), result.code);
343             futureConnection.cancel();
344         }
345     }
346 
347     /**
348      * 
349      * @return the futureValidateConnection
350      */
351     public R66Future getFutureValidateConnection() {
352         R66Result result;
353         try {
354             for (int i = 0; i < Configuration.RETRYNB; i++) {
355                 if (this.networkChannel.isConnected()) {
356                     if (!futureConnection.await(Configuration.configuration.TIMEOUTCON)) {
357                         if (futureConnection.isDone()) {
358                             return futureConnection;
359                         } else {
360                             if (this.networkChannel.isConnected()) {
361                                 continue;
362                             }
363                             result = new R66Result(
364                                     new OpenR66ProtocolNoConnectionException(
365                                             "Out of time"), session, false,
366                                     ErrorCode.ConnectionImpossible, null);
367                             validateConnection(false, result);
368                             return futureConnection;
369                         }
370                     } else {
371                         return futureConnection;
372                     }
373                 } else {
374                     break;
375                 }
376             }
377         } catch (InterruptedException e) {
378             result = new R66Result(
379                     new OpenR66ProtocolNoConnectionException(
380                             "Interrupted connection"), session, false,
381                     ErrorCode.ConnectionImpossible, null);
382             validateConnection(false, result);
383             return futureConnection;
384         }
385         logger.warn("Cannot get Connection due to out of Time: {}",this);
386         result = new R66Result(
387                 new OpenR66ProtocolNoConnectionException(
388                         "Out of time"), session, false,
389                 ErrorCode.ConnectionImpossible, null);
390         validateConnection(false, result);
391         return futureConnection;
392     }
393 
394     /**
395      * Validate the End of a Transfer
396      * 
397      * @param finalValue
398      */
399     public void validateEndTransfer(R66Result finalValue) {
400         if (!futureEndTransfer.isDone()) {
401             futureEndTransfer.setResult(finalValue);
402             futureEndTransfer.setSuccess();
403         } else {
404             logger.debug("Could not validate since Already validated: " +
405                     futureEndTransfer.isSuccess() + " " + finalValue);
406             if (!futureEndTransfer.getResult().isAnswered) {
407                 futureEndTransfer.getResult().isAnswered = finalValue.isAnswered;
408             }
409         }
410     }
411 
412     /**
413      * @return the futureEndTransfer
414      */
415     public R66Future getFutureEndTransfer() {
416         return futureEndTransfer;
417     }
418 
419     /**
420      * Special waiter for Send Through method. It reset the EndTransfer future.
421      * 
422      * @throws OpenR66Exception
423      */
424     public void waitReadyForSendThrough() throws OpenR66Exception {
425         logger.debug("Wait for End of Prepare Transfer");
426         try {
427             this.futureEndTransfer.await();
428         } catch (InterruptedException e) {
429             throw new OpenR66RunnerErrorException("Interrupted", e);
430         }
431         if (this.futureEndTransfer.isSuccess()) {
432             // reset since transfer will start now
433             this.futureEndTransfer = new R66Future(true);
434         } else {
435             throw this.futureEndTransfer.getResult().exception;
436         }
437     }
438 
439     /**
440      * @return the futureValidRequest
441      */
442     public R66Future getFutureValidRequest() {
443         return futureValidRequest;
444     }
445 
446     /**
447      * @return the futureRequest
448      */
449     public R66Future getFutureRequest() {
450         return futureRequest;
451     }
452 
453     /**
454      * Invalidate the current request
455      * 
456      * @param finalvalue
457      */
458     public void invalidateRequest(R66Result finalvalue) {
459         R66Result finalValue = finalvalue;
460         if (finalValue == null) {
461             finalValue = new R66Result(session, false, ErrorCode.Unknown, this.session.getRunner());
462         }
463         logger.debug("FET: " + futureEndTransfer.isDone() + ":" +
464                 futureEndTransfer.isSuccess() + " FVR: " +
465                 futureValidRequest.isDone() + ":" +
466                 futureValidRequest.isSuccess() + " FR: " +
467                 futureRequest.isDone() + ":" + futureRequest.isSuccess() + " " +
468                 finalValue.getMessage());
469         if (!futureEndTransfer.isDone()) {
470             futureEndTransfer.setResult(finalValue);
471             if (finalValue.exception != null) {
472                 futureEndTransfer.setFailure(finalValue.exception);
473             } else {
474                 futureEndTransfer.cancel();
475             }
476         }
477         if (!futureValidRequest.isDone()) {
478             futureValidRequest.setResult(finalValue);
479             if (finalValue.exception != null) {
480                 futureValidRequest.setFailure(finalValue.exception);
481             } else {
482                 futureValidRequest.cancel();
483             }
484         }
485         logger.debug("Invalidate Request", new Exception(
486                 "Trace for Invalidation"));
487         if (finalValue.code != ErrorCode.ServerOverloaded) {
488             if (!futureRequest.isDone()) {
489                 setErrorMessage(finalValue.getMessage(), finalValue.code);
490                 futureRequest.setResult(finalValue);
491                 if (finalValue.exception != null) {
492                     futureRequest.setFailure(finalValue.exception);
493                 } else {
494                     futureRequest.cancel();
495                 }
496             } else {
497                 logger.debug("Could not invalidate since Already finished: " +
498                         futureEndTransfer.getResult());
499             }
500         } else {
501             setErrorMessage(finalValue.getMessage(), finalValue.code);
502             logger.debug("Overloaded");
503         }
504         if (this.session != null) {
505             DbTaskRunner runner = this.session.getRunner();
506             if (runner != null) {
507                 if (runner.isSender()) {
508                     NetworkTransaction.stopRetrieve(this);
509                 }
510             }
511         }
512     }
513 
514     /**
515      * Validate the current Request
516      * 
517      * @param finalValue
518      */
519     public void validateRequest(R66Result finalValue) {
520         setErrorMessage("NoError", null);
521         if (!futureEndTransfer.isDone()) {
522             logger.debug("Will validate EndTransfer");
523             validateEndTransfer(finalValue);
524         }
525         if (!futureValidRequest.isDone()) {
526             futureValidRequest.setResult(finalValue);
527             futureValidRequest.setSuccess();
528         }
529         logger.debug("Validate Request");
530         if (!futureRequest.isDone()) {
531             if (finalValue.other == null && 
532                     session.getBusinessObject() != null && 
533                     session.getBusinessObject().getInfo() != null) {
534                 finalValue.other = session.getBusinessObject().getInfo();
535             }
536             futureRequest.setResult(finalValue);
537             futureRequest.setSuccess();
538         } else {
539             logger.info("Already validated: " + futureRequest.isSuccess() +
540                     " " + finalValue);
541             if (!futureRequest.getResult().isAnswered) {
542                 futureRequest.getResult().isAnswered = finalValue.isAnswered;
543             }
544         }
545     }
546 
547     @Override
548     public String toString() {
549         return "LCR: L: " + localId + " R: " + remoteId + "\nStartup["+
550         (futureStartup != null ? futureStartup : "noStartup")+ "]\nConn[" +
551         (futureConnection != null ? futureConnection : "noConn")+ "]\nValidRequestRequest[" +
552         (futureValidRequest != null ? futureValidRequest : "noValidRequest")+ "]\nEndTransfer[" +
553         (futureEndTransfer != null ? futureEndTransfer : "noEndTransfer")+"]\nRequest["+
554         (futureRequest != null ? futureRequest : "noRequest")+ "]";
555     }
556 
557     /**
558      * @return the recvThroughHandler
559      */
560     public RecvThroughHandler getRecvThroughHandler() {
561         return recvThroughHandler;
562     }
563     
564     /**
565      * 
566      * @return True if in RecvThrough Mode
567      */
568     public boolean isRecvThroughMode() {
569         return recvThroughHandler != null;
570     }
571 
572     /**
573      * @param recvThroughHandler
574      *            the recvThroughHandler to set
575      */
576     public void setRecvThroughHandler(RecvThroughHandler recvThroughHandler) {
577         this.recvThroughHandler = recvThroughHandler;
578     }
579 
580     /**
581      * @return True if in SendThrough Mode
582      */
583     public boolean isSendThroughMode() {
584         return isSendThroughMode;
585     }
586 
587     /**
588      * @param isSendThroughMode the isSendThroughMode to set
589      */
590     public void setSendThroughMode(boolean isSendThroughMode) {
591         this.isSendThroughMode = isSendThroughMode;
592     }
593 
594     /**
595      * @return the clientRunner
596      */
597     public ClientRunner getClientRunner() {
598         return clientRunner;
599     }
600 
601     /**
602      * @param clientRunner
603      *            the clientRunner to set
604      */
605     public void setClientRunner(ClientRunner clientRunner) {
606         this.clientRunner = clientRunner;
607     }
608 
609     /**
610      * Shortcut to set a new state in Session
611      * @param desiredState
612      */
613     public void sessionNewState(R66FiniteDualStates desiredState) {
614         if (session != null) {
615             session.newState(desiredState);
616         }
617     }
618 
619 }