1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.commander;
22
23 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24 import goldengate.common.database.exception.GoldenGateDatabaseException;
25 import goldengate.common.database.exception.GoldenGateDatabaseNoConnectionException;
26 import goldengate.common.database.exception.GoldenGateDatabaseSqlException;
27 import goldengate.common.logging.GgInternalLogger;
28 import goldengate.common.logging.GgInternalLoggerFactory;
29
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37
38 import openr66.database.DbConstant;
39 import openr66.database.data.DbTaskRunner;
40 import openr66.protocol.configuration.Configuration;
41 import openr66.protocol.networkhandler.NetworkTransaction;
42
43
44
45
46
47
48
49 public class InternalRunner {
50
51
52
53 private static final GgInternalLogger logger = GgInternalLoggerFactory
54 .getLogger(InternalRunner.class);
55
56 private final ScheduledExecutorService scheduledExecutorService;
57 private ScheduledFuture<?> scheduledFuture;
58 private CommanderInterface commander = null;
59 private volatile boolean isRunning = true;
60 private final ThreadPoolExecutor threadPoolExecutor;
61 private final BlockingQueue<Runnable> workQueue;
62 private final NetworkTransaction networkTransaction;
63
64
65
66
67
68
69 public InternalRunner() throws GoldenGateDatabaseNoConnectionException, GoldenGateDatabaseSqlException {
70 if (DbConstant.admin.isConnected) {
71 commander = new Commander(this, true);
72 } else {
73 commander = new CommanderNoDb(this, true);
74 }
75 scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
76 isRunning = true;
77 workQueue = new ArrayBlockingQueue<Runnable>(10);
78 threadPoolExecutor = new ThreadPoolExecutor(10, Configuration.configuration.RUNNER_THREAD,
79 1000, TimeUnit.MILLISECONDS, workQueue);
80 scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
81 Configuration.configuration.delayCommander,
82 Configuration.configuration.delayCommander, TimeUnit.MILLISECONDS);
83 networkTransaction = new NetworkTransaction();
84 }
85 public NetworkTransaction getNetworkTransaction() {
86 return networkTransaction;
87 }
88
89
90
91
92 public void submitTaskRunner(DbTaskRunner taskRunner) {
93 if (isRunning || !Configuration.configuration.isShutdown) {
94 if (threadPoolExecutor.getActiveCount()+5 > Configuration.configuration.RUNNER_THREAD) {
95
96 taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
97 try {
98 taskRunner.update();
99 } catch (GoldenGateDatabaseException e) {
100 }
101 return;
102 }
103 logger.debug("Will run {}",taskRunner);
104 ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, null);
105 if (taskRunner.isSendThrough() && (taskRunner.isRescheduledTransfer()
106 || taskRunner.isPreTaskStarting())) {
107 runner.setSendThroughMode();
108 taskRunner.checkThroughMode();
109 }
110 runner.setDaemon(true);
111
112 threadPoolExecutor.execute(runner);
113 runner = null;
114 }
115 }
116
117
118
119 public void prepareStopInternalRunner() {
120 isRunning = false;
121 scheduledFuture.cancel(false);
122 scheduledExecutorService.shutdown();
123 threadPoolExecutor.shutdown();
124 }
125
126
127
128
129 public void stopInternalRunner() {
130 isRunning = false;
131 logger.info("Stopping Commander and Runner Tasks");
132 scheduledFuture.cancel(false);
133 scheduledExecutorService.shutdownNow();
134 threadPoolExecutor.shutdownNow();
135 networkTransaction.closeAll();
136 }
137 public int nbInternalRunner() {
138 return threadPoolExecutor.getActiveCount();
139 }
140 public void reloadInternalRunner()
141 throws GoldenGateDatabaseNoConnectionException, GoldenGateDatabaseSqlException {
142 scheduledFuture.cancel(false);
143 if (commander != null) {
144 commander.finalize();
145 }
146 if (DbConstant.admin.isConnected) {
147 commander = new Commander(this);
148 } else {
149 commander = new CommanderNoDb(this);
150 }
151 scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
152 Configuration.configuration.delayCommander,
153 Configuration.configuration.delayCommander, TimeUnit.MILLISECONDS);
154 }
155 }