View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.commander;
22  
23  import java.io.File;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  
26  import goldengate.common.database.data.AbstractDbData;
27  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
28  import goldengate.common.database.exception.GoldenGateDatabaseException;
29  import goldengate.common.logging.GgInternalLogger;
30  import goldengate.common.logging.GgInternalLoggerFactory;
31  import openr66.configuration.ExtensionFilter;
32  import openr66.database.data.DbConfiguration;
33  import openr66.database.data.DbHostAuth;
34  import openr66.database.data.DbRule;
35  import openr66.database.data.DbTaskRunner;
36  import openr66.protocol.configuration.Configuration;
37  import openr66.protocol.utils.FileUtils;
38  import openr66.protocol.utils.OpenR66SignalHandler;
39  
40  /**
41   * Commander is responsible to read list of updated data from time to time in order to
42   * achieve new runner or new configuration updates.
43   *
44   * Based on no Database support
45   *
46   * @author Frederic Bregier
47   *
48   */
49  public class CommanderNoDb implements CommanderInterface {
50      /**
51       * Internal Logger
52       */
53      private static final GgInternalLogger logger = GgInternalLoggerFactory
54              .getLogger(CommanderNoDb.class);
55  
56      private InternalRunner internalRunner = null;
57      public static final ConcurrentLinkedQueue<AbstractDbData> todoList = new ConcurrentLinkedQueue<AbstractDbData>();
58  
59      /**
60       * Prepare requests that will be executed from time to time
61       * @param runner
62       */
63      public CommanderNoDb(InternalRunner runner) {
64          this.internalConstructor(runner);
65      }
66      /**
67       * Prepare requests that will be executed from time to time
68       * @param runner
69       * @param fromStartup True if call from startup of the server
70       */
71      public CommanderNoDb(InternalRunner runner, boolean fromStartup) {
72          this.internalConstructor(runner);
73          if (fromStartup) {
74              ClientRunner.activeRunners = new ConcurrentLinkedQueue<ClientRunner>();
75              // Change RUNNING or INTERRUPTED to TOSUBMIT since they should be ready
76              /*
77              Configuration.configuration.baseDirectory+
78              Configuration.configuration.archivePath+R66Dir.SEPARATOR+
79              this.requesterHostId+"_"+this.requestedHostId+"_"+this.ruleId+"_"+this.specialId
80              +XMLEXTENSION;
81               */
82              File directory = new File(Configuration.configuration.baseDirectory+
83                      Configuration.configuration.archivePath);
84              File[] files = FileUtils.getFiles(directory,
85                      new ExtensionFilter(DbTaskRunner.XMLEXTENSION));
86              for (File file: files) {
87                  String shortname = file.getName();
88                  String []info = shortname.split("_");
89                  if (info.length < 5) {
90                      continue;
91                  }
92                  DbRule rule;
93                  try {
94                      rule = new DbRule(null, info[2]);
95                  } catch (GoldenGateDatabaseException e) {
96                      logger.warn("Cannot find the rule named: "+info[2]);
97                      continue;
98                  }
99                  long id = Long.parseLong(info[3]);
100                 try {
101                     DbTaskRunner task = new DbTaskRunner(null, null, rule, id, info[0], info[1]);
102                     UpdatedInfo status = task.getUpdatedInfo();
103                     if (status == UpdatedInfo.RUNNING || status == UpdatedInfo.INTERRUPTED) {
104                         task.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
105                         task.update();
106                     }
107                 } catch (GoldenGateDatabaseException e) {
108                     logger.warn("Cannot reload the task named: "+shortname);
109                     continue;
110                 }
111             }
112         }
113     }
114     private void internalConstructor(InternalRunner runner) {
115         internalRunner = runner;
116     }
117     /**
118      * Finalize internal data
119      */
120     public void finalize() {
121         // no since it will be reloaded
122         //todoList.clear();
123     }
124 
125     /* (non-Javadoc)
126      * @see java.lang.Runnable#run()
127      */
128     @Override
129     public void run() {
130         Thread.currentThread().setName("OpenR66Commander");
131         while (! todoList.isEmpty()) {
132             try {
133                 AbstractDbData data = todoList.poll();
134                 // First check Configuration
135                 if (data instanceof DbConfiguration) {
136                  // should be only one...
137                     DbConfiguration configuration = (DbConfiguration) data;
138                     if (configuration.isOwnConfiguration()) {
139                         configuration.updateConfiguration();
140                     }
141                     configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
142                         configuration.update();
143                 }
144                 // Check HostAuthent
145                 else if (data instanceof DbHostAuth) {
146                     DbHostAuth hostAuth = (DbHostAuth) data;
147                     // Nothing to do except validate
148                     hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
149                     hostAuth.update();
150                 }
151                 // Check Rules
152                 else if (data instanceof DbRule) {
153                     // Nothing to do except validate
154                     DbRule rule = (DbRule) data;
155                     rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
156                     rule.update();
157                 }
158                 // Check TaskRunner
159                 else if (data instanceof DbTaskRunner) {
160                     DbTaskRunner taskRunner = (DbTaskRunner) data;
161                     logger.debug("get a task: {}",taskRunner);
162                     // Launch if possible this task
163                     String key = taskRunner.getRequested()+" "+taskRunner.getRequester()+
164                         " "+taskRunner.getSpecialId();
165                     if (Configuration.configuration.getLocalTransaction().
166                             getFromRequest(key) != null) {
167                         // already running
168                         continue;
169                     }
170                     if (taskRunner.isSelfRequested()) {
171                         // cannot schedule a request where the host is the requested host
172                         taskRunner.changeUpdatedInfo(UpdatedInfo.INTERRUPTED);
173                         taskRunner.update();
174                         continue;
175                     }
176                     taskRunner.changeUpdatedInfo(UpdatedInfo.RUNNING);
177                     taskRunner.update();
178                     internalRunner.submitTaskRunner(taskRunner);
179                     try {
180                         Thread.sleep(Configuration.RETRYINMS);
181                     } catch (InterruptedException e) {
182                     }
183                     taskRunner = null;
184                 }
185                 if (OpenR66SignalHandler.isInShutdown()) {
186                     // no more task to submit
187                     return;
188                 }
189             } catch (GoldenGateDatabaseException e) {
190                 // TODO Auto-generated catch block
191                 e.printStackTrace();
192             }
193         }
194     }
195 
196 }