View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.commander;
22  
23  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24  import goldengate.common.database.exception.GoldenGateDatabaseException;
25  import goldengate.common.database.exception.GoldenGateDatabaseNoConnectionException;
26  import goldengate.common.database.exception.GoldenGateDatabaseSqlException;
27  import goldengate.common.logging.GgInternalLogger;
28  import goldengate.common.logging.GgInternalLoggerFactory;
29  
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.ScheduledExecutorService;
34  import java.util.concurrent.ScheduledFuture;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  import openr66.database.DbConstant;
39  import openr66.database.data.DbTaskRunner;
40  import openr66.protocol.configuration.Configuration;
41  import openr66.protocol.networkhandler.NetworkTransaction;
42  
43  /**
44   * This class launch and control the Commander and enable TaskRunner job submissions
45   *
46   * @author Frederic Bregier
47   *
48   */
49  public class InternalRunner {
50      /**
51       * Internal Logger
52       */
53      private static final GgInternalLogger logger = GgInternalLoggerFactory
54              .getLogger(InternalRunner.class);
55  
56      private final ScheduledExecutorService scheduledExecutorService;
57      private ScheduledFuture<?> scheduledFuture;
58      private CommanderInterface commander = null;
59      private volatile boolean isRunning = true;
60      private final ThreadPoolExecutor threadPoolExecutor;
61      private final BlockingQueue<Runnable> workQueue;
62      private final NetworkTransaction networkTransaction;
63  
64      /**
65       * Create the structure to enable submission by database
66       * @throws GoldenGateDatabaseNoConnectionException
67       * @throws GoldenGateDatabaseSqlException
68       */
69      public InternalRunner() throws GoldenGateDatabaseNoConnectionException, GoldenGateDatabaseSqlException {
70          if (DbConstant.admin.isConnected) {
71              commander = new Commander(this, true);
72          } else {
73              commander = new CommanderNoDb(this, true);
74          }
75          scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
76          isRunning = true;
77          workQueue = new ArrayBlockingQueue<Runnable>(10);
78          threadPoolExecutor = new ThreadPoolExecutor(10, Configuration.configuration.RUNNER_THREAD,
79                  1000, TimeUnit.MILLISECONDS, workQueue);
80          scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
81                  Configuration.configuration.delayCommander,
82                  Configuration.configuration.delayCommander, TimeUnit.MILLISECONDS);
83          networkTransaction = new NetworkTransaction();
84      }
85      public NetworkTransaction getNetworkTransaction() {
86          return networkTransaction;
87      }
88      /**
89       * Submit a task
90       * @param taskRunner
91       */
92      public void submitTaskRunner(DbTaskRunner taskRunner) {
93          if (isRunning || !Configuration.configuration.isShutdown) {
94              if (threadPoolExecutor.getActiveCount()+5 > Configuration.configuration.RUNNER_THREAD) {
95                  // too many current active threads
96                  taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
97                  try {
98                      taskRunner.update();
99                  } catch (GoldenGateDatabaseException e) {
100                 }
101                 return;
102             }
103             logger.debug("Will run {}",taskRunner);
104             ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, null);
105             if (taskRunner.isSendThrough() && (taskRunner.isRescheduledTransfer()
106                     || taskRunner.isPreTaskStarting())) {
107                 runner.setSendThroughMode();
108                 taskRunner.checkThroughMode();
109             }
110             runner.setDaemon(true);
111             // create the client, connect and run
112             threadPoolExecutor.execute(runner);
113             runner = null;
114         }
115     }
116     /**
117      * First step while shutting down the service
118      */
119     public void prepareStopInternalRunner() {
120         isRunning = false;
121         scheduledFuture.cancel(false);
122         scheduledExecutorService.shutdown();
123         threadPoolExecutor.shutdown();
124     }
125     /**
126      * This should be called when the server is shutting down, after stopping active requests
127      * if possible.
128      */
129     public void stopInternalRunner() {
130         isRunning = false;
131         logger.info("Stopping Commander and Runner Tasks");
132         scheduledFuture.cancel(false);
133         scheduledExecutorService.shutdownNow();
134         threadPoolExecutor.shutdownNow();
135         networkTransaction.closeAll();
136     }
137     public int nbInternalRunner() {
138         return threadPoolExecutor.getActiveCount();
139     }
140     public void reloadInternalRunner()
141     throws GoldenGateDatabaseNoConnectionException, GoldenGateDatabaseSqlException {
142         scheduledFuture.cancel(false);
143         if (commander != null) {
144             commander.finalize();
145         }
146         if (DbConstant.admin.isConnected) {
147             commander = new Commander(this);
148         } else {
149             commander = new CommanderNoDb(this);
150         }
151         scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
152                 Configuration.configuration.delayCommander,
153                 Configuration.configuration.delayCommander, TimeUnit.MILLISECONDS);
154     }
155 }