View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.context.task;
22  
23  import goldengate.commandexec.utils.LocalExecResult;
24  import goldengate.common.command.exception.CommandAbstractException;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  
28  import java.io.File;
29  import java.io.IOException;
30  import java.io.PipedInputStream;
31  import java.io.PipedOutputStream;
32  
33  import openr66.context.ErrorCode;
34  import openr66.context.R66Result;
35  import openr66.context.R66Session;
36  import openr66.context.task.localexec.LocalExecClient;
37  import openr66.protocol.configuration.Configuration;
38  
39  import org.apache.commons.exec.CommandLine;
40  import org.apache.commons.exec.DefaultExecutor;
41  import org.apache.commons.exec.ExecuteException;
42  import org.apache.commons.exec.ExecuteWatchdog;
43  import org.apache.commons.exec.PumpStreamHandler;
44  
45  /**
46   * Execute an external command and Rename the file (using the new name from the result).<br>
47   *
48   * The move of the file (if any) should be done by the external command itself.<br><br>
49   * 
50   * waitForValidation (#NOWAIT#) must not be set since it will prevent to have 
51   * the MOVE TASK to occur normally. So even if set, the #NOWAIT# will be ignored.
52   *
53   * @author Frederic Bregier
54   *
55   */
56  public class ExecMoveTask extends AbstractTask {
57      /**
58       * Internal Logger
59       */
60      private static final GgInternalLogger logger = GgInternalLoggerFactory
61              .getLogger(ExecMoveTask.class);
62  
63      /**
64       * @param argRule
65       * @param delay
66       * @param argTransfer
67       * @param session
68       */
69      public ExecMoveTask(String argRule, int delay, String argTransfer,
70              R66Session session) {
71          super(TaskType.EXECMOVE, delay, argRule, argTransfer, session);
72      }
73  
74      /*
75       * (non-Javadoc)
76       *
77       * @see openr66.context.task.AbstractTask#run()
78       */
79      @Override
80      public void run() {
81          /*
82           * First apply all replacements and format to argRule from context and
83           * argTransfer. Will call exec (from first element of resulting string)
84           * with arguments as the following value from the replacements. Return 0
85           * if OK, else 1 for a warning else as an error. The last line of stdout
86           * will be the new name given to the R66File in case of status 0. The
87           * previous file should be deleted by the script or will be deleted in
88           * case of status 0. If the status is 1, no change is made to the file.
89           */
90          logger.info("ExecMove with " + argRule + ":" + argTransfer + " and {}",
91                  session);
92          String finalname = argRule;
93          finalname = getReplacedValue(finalname, argTransfer.split(" "));
94          // Force the WaitForValidation
95          waitForValidation = true;
96          if (Configuration.configuration.useLocalExec && useLocalExec) {
97              LocalExecClient localExecClient = new LocalExecClient();
98              if (localExecClient.connect()) {
99                  localExecClient.runOneCommand(finalname, delay, waitForValidation, futureCompletion);
100                 LocalExecResult result = localExecClient.getLocalExecResult();
101                 move(result.status, result.result, finalname);
102                 localExecClient.disconnect();
103                 return;
104             } // else continue
105         }
106         String[] args = finalname.split(" ");
107         File exec = new File(args[0]);
108         if (exec.isAbsolute()) {
109             if (! exec.canExecute()) {
110                 logger.error("Exec command is not executable: " + finalname);
111                 R66Result result = new R66Result(session, false,
112                         ErrorCode.CommandNotFound, session.getRunner());
113                 futureCompletion.setResult(result);
114                 futureCompletion.cancel();
115                 return;
116             }
117         }
118         CommandLine commandLine = new CommandLine(args[0]);
119         for (int i = 1; i < args.length; i ++) {
120             commandLine.addArgument(args[i]);
121         }
122         DefaultExecutor defaultExecutor = new DefaultExecutor();
123         PipedInputStream inputStream = new PipedInputStream();
124         PipedOutputStream outputStream = null;
125         try {
126             outputStream = new PipedOutputStream(inputStream);
127         } catch (IOException e1) {
128             try {
129                 inputStream.close();
130             } catch (IOException e) {
131             }
132             logger.error("Exception: " + e1.getMessage() +
133                     " Exec in error with " + commandLine.toString(), e1);
134             futureCompletion.setFailure(e1);
135             return;
136         }
137         PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(
138                 outputStream, null);
139         defaultExecutor.setStreamHandler(pumpStreamHandler);
140         int[] correctValues = {
141                 0, 1 };
142         defaultExecutor.setExitValues(correctValues);
143         ExecuteWatchdog watchdog = null;
144         
145         if (delay > 0) {
146             watchdog = new ExecuteWatchdog(delay);
147             defaultExecutor.setWatchdog(watchdog);
148         }
149         LastLineReader lastLineReader = new LastLineReader(inputStream);
150         Thread thread = new Thread(lastLineReader, "ExecRename" + session.getRunner().getSpecialId());
151         thread.setDaemon(true);
152         Configuration.configuration.getExecutorService().execute(thread);
153         int status = -1;
154         try {
155             status = defaultExecutor.execute(commandLine);
156         } catch (ExecuteException e) {
157             if (e.getExitValue() == -559038737) {
158                 // Cannot run immediately so retry once
159                 try {
160                     Thread.sleep(Configuration.RETRYINMS);
161                 } catch (InterruptedException e1) {
162                 }
163                 try {
164                     status = defaultExecutor.execute(commandLine);
165                 } catch (ExecuteException e1) {
166                     try {
167                         outputStream.close();
168                     } catch (IOException e2) {
169                     }
170                     thread.interrupt();
171                     try {
172                         inputStream.close();
173                     } catch (IOException e2) {
174                     }
175                     pumpStreamHandler.stop();
176                     logger.error("ExecuteException: " + e.getMessage() +
177                             " . Exec in error with " + commandLine.toString());
178                     futureCompletion.setFailure(e);
179                     return;
180                 } catch (IOException e1) {
181                     try {
182                         outputStream.close();
183                     } catch (IOException e2) {
184                     }
185                     thread.interrupt();
186                     try {
187                         inputStream.close();
188                     } catch (IOException e2) {
189                     }
190                     pumpStreamHandler.stop();
191                     logger.error("IOException: " + e.getMessage() +
192                             " . Exec in error with " + commandLine.toString());
193                     futureCompletion.setFailure(e);
194                     return;
195                 }
196             } else {
197                 try {
198                     outputStream.close();
199                 } catch (IOException e1) {
200                 }
201                 thread.interrupt();
202                 try {
203                     inputStream.close();
204                 } catch (IOException e1) {
205                 }
206                 pumpStreamHandler.stop();
207                 logger.error("ExecuteException: " + e.getMessage() +
208                         " . Exec in error with " + commandLine.toString());
209                 futureCompletion.setFailure(e);
210                 return;
211             }
212         } catch (IOException e) {
213             try {
214                 outputStream.close();
215             } catch (IOException e1) {
216             }
217             thread.interrupt();
218             try {
219                 inputStream.close();
220             } catch (IOException e1) {
221             }
222             pumpStreamHandler.stop();
223             logger.error("IOException: " + e.getMessage() +
224                     " . Exec in error with " + commandLine.toString());
225             futureCompletion.setFailure(e);
226             return;
227         }
228         try {
229             outputStream.flush();
230         } catch (IOException e) {
231         }
232         try {
233             outputStream.close();
234         } catch (IOException e) {
235         }
236         pumpStreamHandler.stop();
237         try {
238             if (delay > 0) {
239                 thread.join(delay);
240             } else {
241                 thread.join();
242             }
243         } catch (InterruptedException e) {
244             Thread.currentThread().interrupt();
245         }
246         try {
247             inputStream.close();
248         } catch (IOException e1) {
249         }
250         String newname = null;
251         if (defaultExecutor.isFailure(status) && watchdog != null &&
252                 watchdog.killedProcess()) {
253             // kill by the watchdoc (time out)
254             status = -1;
255             newname = "TimeOut";
256         } else {
257             newname = lastLineReader.lastLine;
258             if (status == 0 && (newname == null || newname.length() == 0)) {
259                 status = 1;
260             }
261         }
262         move(status, newname, commandLine.toString());
263     }
264 
265     private void move(int status, String newName, String commandLine) {
266         String newname = newName;
267         if (status == 0) {
268             if (newname.indexOf(' ') > 0) {
269                 logger.warn("Exec returns a multiple string in final line: " +
270                         newname);
271                 String []args = newname.split(" ");
272                 newname = args[args.length - 1];
273             }
274             // now test if the previous file was deleted (should be)
275             File file = new File(newname);
276             if (! file.exists()) {
277                 logger.warn("New file does not exist at the end of the exec: "+newname);
278             }
279             // now replace the file with the new one
280             try {
281                 session.getFile().replaceFilename(newname, true);
282             } catch (CommandAbstractException e) {
283                 logger
284                         .warn("Exec in warning with " + commandLine,
285                                 e);
286             }
287             session.getRunner().setFileMoved(newname, true);
288             futureCompletion.setSuccess();
289             logger.info("Exec OK with {} returns {}", commandLine,
290                     newname);
291         } else if (status == 1) {
292             logger.warn("Exec in warning with " + commandLine+
293                     " returns " + newname);
294             session.getRunner().setErrorExecutionStatus(ErrorCode.Warning);
295             futureCompletion.setSuccess();
296         } else {
297             logger.error("Status: " + status + " Exec in error with " +
298                     commandLine + " returns " + newname);
299             futureCompletion.cancel();
300         }
301     }
302 }