1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.commander;
22
23 import java.io.File;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25
26 import goldengate.common.database.data.AbstractDbData;
27 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
28 import goldengate.common.database.exception.GoldenGateDatabaseException;
29 import goldengate.common.logging.GgInternalLogger;
30 import goldengate.common.logging.GgInternalLoggerFactory;
31 import openr66.configuration.ExtensionFilter;
32 import openr66.database.data.DbConfiguration;
33 import openr66.database.data.DbHostAuth;
34 import openr66.database.data.DbRule;
35 import openr66.database.data.DbTaskRunner;
36 import openr66.protocol.configuration.Configuration;
37 import openr66.protocol.utils.FileUtils;
38 import openr66.protocol.utils.OpenR66SignalHandler;
39
40
41
42
43
44
45
46
47
48
49 public class CommanderNoDb implements CommanderInterface {
50
51
52
53 private static final GgInternalLogger logger = GgInternalLoggerFactory
54 .getLogger(CommanderNoDb.class);
55
56 private InternalRunner internalRunner = null;
57 public static final ConcurrentLinkedQueue<AbstractDbData> todoList = new ConcurrentLinkedQueue<AbstractDbData>();
58
59
60
61
62
63 public CommanderNoDb(InternalRunner runner) {
64 this.internalConstructor(runner);
65 }
66
67
68
69
70
71 public CommanderNoDb(InternalRunner runner, boolean fromStartup) {
72 this.internalConstructor(runner);
73 if (fromStartup) {
74 ClientRunner.activeRunners = new ConcurrentLinkedQueue<ClientRunner>();
75
76
77
78
79
80
81
82 File directory = new File(Configuration.configuration.baseDirectory+
83 Configuration.configuration.archivePath);
84 File[] files = FileUtils.getFiles(directory,
85 new ExtensionFilter(DbTaskRunner.XMLEXTENSION));
86 for (File file: files) {
87 String shortname = file.getName();
88 String []info = shortname.split("_");
89 if (info.length < 5) {
90 continue;
91 }
92 DbRule rule;
93 try {
94 rule = new DbRule(null, info[2]);
95 } catch (GoldenGateDatabaseException e) {
96 logger.warn("Cannot find the rule named: "+info[2]);
97 continue;
98 }
99 long id = Long.parseLong(info[3]);
100 try {
101 DbTaskRunner task = new DbTaskRunner(null, null, rule, id, info[0], info[1]);
102 UpdatedInfo status = task.getUpdatedInfo();
103 if (status == UpdatedInfo.RUNNING || status == UpdatedInfo.INTERRUPTED) {
104 task.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
105 task.update();
106 }
107 } catch (GoldenGateDatabaseException e) {
108 logger.warn("Cannot reload the task named: "+shortname);
109 continue;
110 }
111 }
112 }
113 }
114 private void internalConstructor(InternalRunner runner) {
115 internalRunner = runner;
116 }
117
118
119
120 public void finalize() {
121
122
123 }
124
125
126
127
128 @Override
129 public void run() {
130 Thread.currentThread().setName("OpenR66Commander");
131 while (! todoList.isEmpty()) {
132 try {
133 AbstractDbData data = todoList.poll();
134
135 if (data instanceof DbConfiguration) {
136
137 DbConfiguration configuration = (DbConfiguration) data;
138 if (configuration.isOwnConfiguration()) {
139 configuration.updateConfiguration();
140 }
141 configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
142 configuration.update();
143 }
144
145 else if (data instanceof DbHostAuth) {
146 DbHostAuth hostAuth = (DbHostAuth) data;
147
148 hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
149 hostAuth.update();
150 }
151
152 else if (data instanceof DbRule) {
153
154 DbRule rule = (DbRule) data;
155 rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
156 rule.update();
157 }
158
159 else if (data instanceof DbTaskRunner) {
160 DbTaskRunner taskRunner = (DbTaskRunner) data;
161 logger.debug("get a task: {}",taskRunner);
162
163 String key = taskRunner.getRequested()+" "+taskRunner.getRequester()+
164 " "+taskRunner.getSpecialId();
165 if (Configuration.configuration.getLocalTransaction().
166 getFromRequest(key) != null) {
167
168 continue;
169 }
170 if (taskRunner.isSelfRequested()) {
171
172 taskRunner.changeUpdatedInfo(UpdatedInfo.INTERRUPTED);
173 taskRunner.update();
174 continue;
175 }
176 taskRunner.changeUpdatedInfo(UpdatedInfo.RUNNING);
177 taskRunner.update();
178 internalRunner.submitTaskRunner(taskRunner);
179 try {
180 Thread.sleep(Configuration.RETRYINMS);
181 } catch (InterruptedException e) {
182 }
183 taskRunner = null;
184 }
185 if (OpenR66SignalHandler.isInShutdown()) {
186
187 return;
188 }
189 } catch (GoldenGateDatabaseException e) {
190
191 e.printStackTrace();
192 }
193 }
194 }
195
196 }