View Javadoc

1   /**
2    * This file is part of GoldenGate Project (named also GoldenGate or GG).
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author
5    * tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    * 
8    * All GoldenGate Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   * 
13   * GoldenGate is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Public License along with
18   * GoldenGate . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package openr66.commander;
21  
22  import goldengate.common.database.data.AbstractDbData;
23  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24  import goldengate.common.database.exception.GoldenGateDatabaseException;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  
28  import java.net.SocketAddress;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  
32  import openr66.client.RecvThroughHandler;
33  import openr66.context.ErrorCode;
34  import openr66.context.R66FiniteDualStates;
35  import openr66.context.R66Result;
36  import openr66.context.authentication.R66Auth;
37  import openr66.context.task.exception.OpenR66RunnerErrorException;
38  import openr66.database.DbConstant;
39  import openr66.database.data.DbHostAuth;
40  import openr66.database.data.DbTaskRunner;
41  import openr66.database.data.DbTaskRunner.TASKSTEP;
42  import openr66.protocol.configuration.Configuration;
43  import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
44  import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
45  import openr66.protocol.exception.OpenR66ProtocolPacketException;
46  import openr66.protocol.localhandler.LocalChannelReference;
47  import openr66.protocol.localhandler.packet.RequestPacket;
48  import openr66.protocol.networkhandler.NetworkTransaction;
49  import openr66.protocol.utils.ChannelUtils;
50  import openr66.protocol.utils.R66Future;
51  import openr66.protocol.utils.TransferUtils;
52  
53  import org.jboss.netty.channel.Channels;
54  
55  /**
56   * Client Runner from a TaskRunner
57   * 
58   * @author Frederic Bregier
59   * 
60   */
61  public class ClientRunner extends Thread {
62      /**
63       * Internal Logger
64       */
65      private static final GgInternalLogger logger = GgInternalLoggerFactory
66              .getLogger(ClientRunner.class);
67  
68      private static final ConcurrentHashMap<String, Integer> taskRunnerRetryHashMap = new ConcurrentHashMap<String, Integer>();
69  
70      public static ConcurrentLinkedQueue<ClientRunner> activeRunners = null;
71  
72      private final NetworkTransaction networkTransaction;
73  
74      private final DbTaskRunner taskRunner;
75  
76      private final R66Future futureRequest;
77  
78      private RecvThroughHandler handler = null;
79      
80      private boolean isSendThroughMode = false;
81  
82      private LocalChannelReference localChannelReference = null;
83  
84      public ClientRunner(NetworkTransaction networkTransaction,
85              DbTaskRunner taskRunner, R66Future futureRequest) {
86          this.networkTransaction = networkTransaction;
87          this.taskRunner = taskRunner;
88          this.futureRequest = futureRequest;
89      }
90  
91      /**
92       * @return the networkTransaction
93       */
94      public NetworkTransaction getNetworkTransaction() {
95          return networkTransaction;
96      }
97  
98      /**
99       * @return the taskRunner
100      */
101     public DbTaskRunner getTaskRunner() {
102         return taskRunner;
103     }
104 
105     /**
106      * @return the localChannelReference
107      */
108     public LocalChannelReference getLocalChannelReference() {
109         return localChannelReference;
110     }
111 
112     /*
113      * (non-Javadoc)
114      * 
115      * @see java.lang.Runnable#run()
116      */
117     @Override
118     public void run() {
119         if (Configuration.configuration.isShutdown) {
120             taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
121             try {
122                 taskRunner.update();
123             } catch (GoldenGateDatabaseException e) {
124             }
125             return;
126         }
127         try {
128             if (activeRunners != null) {
129                 activeRunners.add(this);
130             }
131             R66Future transfer;
132             try {
133                 transfer = this.runTransfer();
134             } catch (OpenR66RunnerErrorException e) {
135                 logger.error("Runner Error: {} {}", e.getMessage(),
136                         taskRunner.toShortString());
137                 return;
138             } catch (OpenR66ProtocolNoConnectionException e) {
139                 logger.error("No connection Error {}", e.getMessage());
140                 if (localChannelReference != null) {
141                     localChannelReference.setErrorMessage(
142                             ErrorCode.ConnectionImpossible.mesg,
143                             ErrorCode.ConnectionImpossible);
144                 }
145                 taskRunner.setErrorTask(localChannelReference);
146                 try {
147                     taskRunner.saveStatus();
148                     taskRunner.run();
149                 } catch (OpenR66RunnerErrorException e1) {
150                     this.changeUpdatedInfo(UpdatedInfo.INERROR,
151                             ErrorCode.ConnectionImpossible);
152                 }
153                 return;
154             } catch (OpenR66ProtocolPacketException e) {
155                 logger.error("Protocol Error", e);
156                 return;
157             } catch (OpenR66ProtocolNotYetConnectionException e) {
158                 logger.warn("No connection warning {}", e.getMessage());
159                 return;
160             }
161             R66Result result = transfer.getResult();
162             if (result != null) {
163                 if (result.code == ErrorCode.QueryAlreadyFinished) {
164                     logger.warn("TRANSFER RESULT:\n    " +
165                             (transfer.isSuccess()? "SUCCESS" : "FAILURE") +
166                             "\n    " + ErrorCode.QueryAlreadyFinished.mesg +
167                             ":" +
168                             (result != null? result.toString() : "no result"));
169                 } else {
170                     if (transfer.isSuccess()) {
171                         logger.info("TRANSFER RESULT:\n    SUCCESS\n    " +
172                                 (result != null? result.toString()
173                                         : "no result"));
174                     } else {
175                         logger.error("TRANSFER RESULT:\n    FAILURE\n    " +
176                                 (result != null? result.toString()
177                                         : "no result"));
178                     }
179                 }
180             } else {
181                 if (transfer.isSuccess()) {
182                     logger.warn("TRANSFER REQUESTED RESULT:\n    SUCCESS\n    no result");
183                 } else {
184                     logger.error("TRANSFER REQUESTED RESULT:\n    FAILURE\n    no result");
185                 }
186             }
187             transfer = null;
188             Thread.currentThread().setName(
189                     "Finished_" + Thread.currentThread().getName());
190         } finally {
191             if (activeRunners != null) {
192                 activeRunners.remove(this);
193             }
194         }
195     }
196 
197     /**
198      * 
199      * @param runner
200      * @param limit
201      * @return True if the task was run less than limit, else False
202      */
203     public boolean incrementTaskRunerTry(DbTaskRunner runner, int limit) {
204         String key = runner.getKey();
205         Integer tries = taskRunnerRetryHashMap.get(key);
206         logger.debug("try to find integer: " + tries);
207         if (tries == null) {
208             tries = new Integer(1);
209         } else {
210             tries = tries + 1;
211         }
212         if (limit <= tries) {
213             taskRunnerRetryHashMap.remove(key);
214             return false;
215         } else {
216             taskRunnerRetryHashMap.put(key, tries);
217             return true;
218         }
219     }
220 
221     /**
222      * True transfer run (can be called directly to enable exception outside any
223      * executors)
224      * 
225      * @return The R66Future of the transfer operation
226      * @throws OpenR66RunnerErrorException
227      * @throws OpenR66ProtocolNoConnectionException
228      * @throws OpenR66ProtocolPacketException
229      * @throws OpenR66ProtocolNotYetConnectionException
230      */
231     public R66Future runTransfer() throws OpenR66RunnerErrorException,
232             OpenR66ProtocolNoConnectionException,
233             OpenR66ProtocolPacketException,
234             OpenR66ProtocolNotYetConnectionException {
235         logger.debug("Start attempt Transfer");
236         localChannelReference = initRequest();
237         try {
238             localChannelReference.getFutureValidRequest().await();
239         } catch (InterruptedException e) {
240         }
241         if (localChannelReference.getFutureValidRequest().isSuccess()) {
242             return finishTransfer(true, localChannelReference);
243         } else if (localChannelReference.getFutureValidRequest().getResult().code == ErrorCode.ServerOverloaded) {
244             return tryAgainTransferOnOverloaded(true, localChannelReference);
245         } else
246             return finishTransfer(true, localChannelReference);
247     }
248 
249     /**
250      * In case an overloaded signal is returned by the requested
251      * 
252      * @param retry
253      *            if True, it will retry in case of overloaded remote server,
254      *            else it just stops
255      * @param localChannelReference
256      * @return The R66Future of the transfer operation
257      * @throws OpenR66RunnerErrorException
258      * @throws OpenR66ProtocolNoConnectionException
259      * @throws OpenR66ProtocolPacketException
260      * @throws OpenR66ProtocolNotYetConnectionException
261      */
262     public R66Future tryAgainTransferOnOverloaded(boolean retry,
263             LocalChannelReference localChannelReference)
264             throws OpenR66RunnerErrorException,
265             OpenR66ProtocolNoConnectionException,
266             OpenR66ProtocolPacketException,
267             OpenR66ProtocolNotYetConnectionException {
268         if (this.localChannelReference == null) {
269             this.localChannelReference = localChannelReference;
270         }
271         boolean incRetry = incrementTaskRunerTry(taskRunner,
272                 Configuration.RETRYNB);
273         logger.debug("tryAgainTransferOnOverloaded: " + retry + ":" + incRetry);
274         switch (taskRunner.getUpdatedInfo()) {
275             case DONE:
276             case INERROR:
277             case INTERRUPTED:
278                 break;
279             default:
280                 this.changeUpdatedInfo(UpdatedInfo.INERROR,
281                         ErrorCode.ServerOverloaded);
282         }
283         // redo if possible
284         if (retry && incRetry) {
285             try {
286                 Thread.sleep(Configuration.configuration.constraintLimitHandler
287                         .getSleepTime());
288             } catch (InterruptedException e) {
289             }
290             return runTransfer();
291         } else {
292             if (localChannelReference == null) {
293                 taskRunner
294                         .setLocalChannelReference(new LocalChannelReference());
295             }
296             taskRunner.getLocalChannelReference().setErrorMessage(
297                     ErrorCode.ConnectionImpossible.mesg,
298                     ErrorCode.ConnectionImpossible);
299             this.taskRunner.setErrorTask(localChannelReference);
300             this.taskRunner.run();
301             throw new OpenR66ProtocolNoConnectionException(
302                     "End of retry on ServerOverloaded");
303         }
304     }
305 
306     /**
307      * Finish the transfer (called at the end of runTransfer)
308      * 
309      * @param retry
310      *            if True, it will retry in case of overloaded remote server,
311      *            else it just stops
312      * @param localChannelReference
313      * @return The R66Future of the transfer operation
314      * @throws OpenR66ProtocolNotYetConnectionException
315      * @throws OpenR66ProtocolPacketException
316      * @throws OpenR66ProtocolNoConnectionException
317      * @throws OpenR66RunnerErrorException
318      */
319     public R66Future finishTransfer(boolean retry,
320             LocalChannelReference localChannelReference)
321             throws OpenR66RunnerErrorException {
322         if (this.localChannelReference == null) {
323             this.localChannelReference = localChannelReference;
324         }
325         R66Future transfer = localChannelReference.getFutureRequest();
326         try {
327             transfer.await();
328         } catch (InterruptedException e1) {
329         }
330         taskRunnerRetryHashMap.remove(taskRunner.getKey());
331         logger.info("Request done with {}", (transfer.isSuccess()? "success"
332                 : "error"));
333         Channels.close(localChannelReference.getLocalChannel());
334         // now reload TaskRunner if it still exists (light client can forget it)
335         if (transfer.isSuccess()) {
336             try {
337                 taskRunner.select();
338             } catch (GoldenGateDatabaseException e) {
339                 logger.debug("Not a problem but cannot find at the end the task");
340                 taskRunner.setFrom(transfer.runner);
341             }
342             this.changeUpdatedInfo(UpdatedInfo.DONE, ErrorCode.CompleteOk);
343         } else {
344             try {
345                 taskRunner.select();
346             } catch (GoldenGateDatabaseException e) {
347                 logger.debug("Not a problem but cannot find at the end the task");
348                 taskRunner.setFrom(transfer.runner);
349             }
350             // Case when we were interrupted
351             if (transfer.getResult() == null) {
352                 switch (taskRunner.getUpdatedInfo()) {
353                     case DONE:
354                         R66Result ok = new R66Result(null, true,
355                                 ErrorCode.CompleteOk, taskRunner);
356                         transfer.setResult(ok);
357                         transfer.setSuccess();
358                         this.changeUpdatedInfo(UpdatedInfo.DONE,
359                                 ErrorCode.CompleteOk);
360                         break;
361                     case INERROR:
362                     case INTERRUPTED:
363                     default:
364                         R66Result error = new R66Result(null, true,
365                                 ErrorCode.Internal, taskRunner);
366                         transfer.setResult(error);
367                         transfer.cancel();
368                         this.changeUpdatedInfo(UpdatedInfo.INERROR,
369                                 ErrorCode.Internal);
370                 }
371                 return transfer;
372             }
373             if (transfer.getResult().code == ErrorCode.QueryAlreadyFinished) {
374                 // check if post task to execute
375                 logger.warn("WARN QueryAlreadyFinished:\n    " +
376                         transfer.toString() + "\n    " +
377                         taskRunner.toShortString());
378                 try {
379                     TransferUtils.finalizeTaskWithNoSession(taskRunner,
380                             localChannelReference);
381                 } catch (OpenR66RunnerErrorException e) {
382                     this.taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
383                     try {
384                         this.taskRunner.update();
385                     } catch (GoldenGateDatabaseException e1) {
386                     }
387                 }
388             } else {
389                 switch (taskRunner.getUpdatedInfo()) {
390                     case DONE:
391                     case INERROR:
392                     case INTERRUPTED:
393                         break;
394                     default:
395                         this.changeUpdatedInfo(UpdatedInfo.INERROR,
396                                 transfer.getResult().code);
397                 }
398             }
399         }
400         return transfer;
401     }
402 
403     /**
404      * Initialize the request
405      * 
406      * @return the localChannelReference holding the transfer request
407      * @throws OpenR66ProtocolNoConnectionException
408      * @throws OpenR66RunnerErrorException
409      * @throws OpenR66ProtocolPacketException
410      * @throws OpenR66ProtocolNotYetConnectionException
411      */
412     public LocalChannelReference initRequest()
413             throws OpenR66ProtocolNoConnectionException,
414             OpenR66RunnerErrorException, OpenR66ProtocolPacketException,
415             OpenR66ProtocolNotYetConnectionException {
416         this.changeUpdatedInfo(UpdatedInfo.RUNNING, ErrorCode.Running);
417         long id = taskRunner.getSpecialId();
418         String tid;
419         if (id == DbConstant.ILLEGALVALUE) {
420             tid = taskRunner.getRuleId() + "_" + taskRunner.getMode() +
421                     "_NEWTRANSFER";
422         } else {
423             tid = taskRunner.getRuleId() + "_" + taskRunner.getMode() + "_" +
424                     id;
425         }
426         Thread.currentThread().setName(tid);
427         logger.debug("Will run {}", this.taskRunner);
428         boolean restartPost = false;
429         if (taskRunner.getGloballaststep() == TASKSTEP.POSTTASK.ordinal()) {
430             // Send a validation to requested
431             if (!taskRunner.isSelfRequested()) {
432                 // restart
433                 restartPost = true;
434             }
435         }
436         if (taskRunner.isSelfRequested()) {
437             // Don't have to restart a task for itself (or should use requester)
438             logger.warn("Requested host cannot initiate itself the request");
439             this.changeUpdatedInfo(UpdatedInfo.INERROR,
440                     ErrorCode.LoopSelfRequestedHost);
441             throw new OpenR66ProtocolNoConnectionException(
442                     "Requested host cannot initiate itself the request");
443         }
444         DbHostAuth host = R66Auth.getServerAuth(DbConstant.admin.session,
445                 taskRunner.getRequested());
446         if (host == null) {
447             logger.error("Requested host cannot be found: " +
448                     taskRunner.getRequested());
449             this.changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.NotKnownHost);
450             throw new OpenR66ProtocolNoConnectionException(
451                     "Requested host cannot be found " +
452                             taskRunner.getRequested());
453         }
454         if (host.isClient()) {
455             logger.debug("Cannot initiate a connection with a client: {}", host);
456             this.changeUpdatedInfo(UpdatedInfo.INERROR,
457                     ErrorCode.ConnectionImpossible);
458             throw new OpenR66ProtocolNoConnectionException(
459                     "Cannot connect to client " + host.toString());
460         }
461         SocketAddress socketAddress = host.getSocketAddress();
462         boolean isSSL = host.isSsl();
463 
464         LocalChannelReference localChannelReference = networkTransaction
465                 .createConnectionWithRetry(socketAddress, isSSL, futureRequest);
466         taskRunner.setLocalChannelReference(localChannelReference);
467         socketAddress = null;
468         if (localChannelReference == null) {
469             // propose to redo
470             // See if reprogramming is ok (not too many tries)
471             String retry;
472             if (incrementTaskRunerTry(taskRunner, Configuration.RETRYNB)) {
473                 logger.debug("Will retry since Cannot connect to {}", host);
474                 retry = " but will retry";
475                 // now wait
476                 try {
477                     Thread.sleep(Configuration.configuration.delayRetry);
478                 } catch (InterruptedException e) {
479                 }
480                 this.changeUpdatedInfo(UpdatedInfo.TOSUBMIT,
481                         ErrorCode.ConnectionImpossible);
482                 throw new OpenR66ProtocolNotYetConnectionException(
483                         "Cannot connect to server " + host.toString() + retry);
484             } else {
485                 logger.debug(
486                         "Will not retry since limit of connection attemtps is reached for {}",
487                         host);
488                 retry = " and retries limit is reached so stop here";
489                 this.changeUpdatedInfo(UpdatedInfo.INERROR,
490                         ErrorCode.ConnectionImpossible);
491                 taskRunner
492                         .setLocalChannelReference(new LocalChannelReference());
493                 throw new OpenR66ProtocolNoConnectionException(
494                         "Cannot connect to server " + host.toString() + retry);
495             }
496         }
497         if (handler != null) {
498             localChannelReference.setRecvThroughHandler(handler);
499         }
500         localChannelReference.setSendThroughMode(isSendThroughMode);
501         if (restartPost) {
502             RequestPacket request = taskRunner.getRequest();
503             logger.debug("Will send request {} ", request);
504             localChannelReference.setClientRunner(this);
505             localChannelReference.sessionNewState(R66FiniteDualStates.REQUESTR);
506             try {
507                 ChannelUtils.writeAbstractLocalPacket(localChannelReference,
508                         request, false);
509             } catch (OpenR66ProtocolPacketException e) {
510                 // propose to redo
511                 logger.warn("Cannot transfer request to " + host.toString());
512                 this.changeUpdatedInfo(UpdatedInfo.INTERRUPTED,
513                         ErrorCode.Internal);
514                 Channels.close(localChannelReference.getLocalChannel());
515                 localChannelReference = null;
516                 host = null;
517                 request = null;
518                 throw e;
519             }
520             logger.debug("Wait for request to {}", host);
521             request = null;
522             host = null;
523             return localChannelReference;
524         }
525         // If Requester is NOT Sender, and if TransferTask then decrease now if
526         // possible the rank
527         if (!taskRunner.isSender() &&
528                 (taskRunner.getGloballaststep() == TASKSTEP.TRANSFERTASK
529                         .ordinal())) {
530             logger.debug(
531                     "Requester is not Sender so decrease if possible the rank {}",
532                     taskRunner);
533             taskRunner.restartRank();
534             taskRunner.saveStatus();
535             logger.debug(
536                     "Requester is not Sender so new rank is " +
537                             taskRunner.getRank() + " {}", taskRunner);
538         }
539         RequestPacket request = taskRunner.getRequest();
540         logger.debug("Will send request {} {}", request, localChannelReference);
541         localChannelReference.setClientRunner(this);
542         localChannelReference.sessionNewState(R66FiniteDualStates.REQUESTR);
543         try {
544             ChannelUtils.writeAbstractLocalPacket(localChannelReference,
545                     request, false);
546         } catch (OpenR66ProtocolPacketException e) {
547             // propose to redo
548             logger.warn("Cannot transfer request to " + host.toString());
549             this.changeUpdatedInfo(UpdatedInfo.INTERRUPTED, ErrorCode.Internal);
550             Channels.close(localChannelReference.getLocalChannel());
551             localChannelReference = null;
552             host = null;
553             request = null;
554             throw e;
555         }
556         logger.debug("Wait for request to {}", host);
557         request = null;
558         host = null;
559         return localChannelReference;
560     }
561 
562     /**
563      * Change the UpdatedInfo of the current runner
564      * 
565      * @param info
566      */
567     public void changeUpdatedInfo(AbstractDbData.UpdatedInfo info,
568             ErrorCode code) {
569         this.taskRunner.changeUpdatedInfo(info);
570         this.taskRunner.setErrorExecutionStatus(code);
571         try {
572             this.taskRunner.update();
573         } catch (GoldenGateDatabaseException e) {
574         }
575     }
576 
577     /**
578      * @param handler
579      *            the handler to set
580      */
581     public void setRecvThroughHandler(RecvThroughHandler handler) {
582         this.handler = handler;
583     }
584     public void setSendThroughMode() {
585         isSendThroughMode = true;
586     }
587     public boolean getSendThroughMode() {
588         return isSendThroughMode;
589     }
590 }