View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.context.task;
22  
23  import goldengate.commandexec.utils.LocalExecResult;
24  import goldengate.common.command.exception.CommandAbstractException;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  
28  import java.io.File;
29  import java.io.IOException;
30  import java.io.PipedInputStream;
31  import java.io.PipedOutputStream;
32  
33  import openr66.context.ErrorCode;
34  import openr66.context.R66Result;
35  import openr66.context.R66Session;
36  import openr66.context.task.exception.OpenR66RunnerErrorException;
37  import openr66.context.task.localexec.LocalExecClient;
38  import openr66.protocol.configuration.Configuration;
39  
40  import org.apache.commons.exec.CommandLine;
41  import org.apache.commons.exec.DefaultExecutor;
42  import org.apache.commons.exec.ExecuteException;
43  import org.apache.commons.exec.ExecuteWatchdog;
44  import org.apache.commons.exec.PumpStreamHandler;
45  
46  /**
47   * Execute an external command and Use the output if an error occurs.<br>
48   * 
49   * The output is ignored if the command has a correct status.<br>
50   * If the output finishes with <tt>NEWFINALNAME:xxx</tt> then this part is removed from the
51   * output and the xxx is used as the last valid name for the file (meaning the 
52   * file was moved or renamed)<br><br>
53   * 
54   * waitForValidation (#NOWAIT#) must not be set since it will prevent to have 
55   * the feedback in case of error. So it is ignored.
56   *
57   * @author Frederic Bregier
58   *
59   */
60  public class ExecOutputTask extends AbstractTask {
61      /**
62       * Internal Logger
63       */
64      private static final GgInternalLogger logger = GgInternalLoggerFactory
65              .getLogger(ExecOutputTask.class);
66  
67      public static final String DELIMITER = "NEWFINALNAME:";
68      /**
69       * @param argRule
70       * @param delay
71       * @param argTransfer
72       * @param session
73       */
74      public ExecOutputTask(String argRule, int delay, String argTransfer,
75              R66Session session) {
76          super(TaskType.EXECOUTPUT, delay, argRule, argTransfer, session);
77      }
78  
79      /*
80       * (non-Javadoc)
81       *
82       * @see openr66.context.task.AbstractTask#run()
83       */
84      @Override
85      public void run() {
86          /*
87           * First apply all replacements and format to argRule from context and
88           * argTransfer. Will call exec (from first element of resulting string)
89           * with arguments as the following value from the replacements. Return 0
90           * if OK, else 1 for a warning else as an error. 
91           * In case of an error (> 0), all the line from output will be
92           * send back to the partner with the Error code.
93           * No change is made to the file.
94           */
95          logger.info("ExecOutput with " + argRule + ":" + argTransfer + " and {}",
96                  session);
97          String finalname = argRule;
98          finalname = getReplacedValue(finalname, argTransfer.split(" "));
99          // Force the WaitForValidation
100         waitForValidation = true;
101         if (Configuration.configuration.useLocalExec && useLocalExec) {
102             LocalExecClient localExecClient = new LocalExecClient();
103             if (localExecClient.connect()) {
104                 localExecClient.runOneCommand(finalname, delay, waitForValidation, futureCompletion);
105                 LocalExecResult result = localExecClient.getLocalExecResult();
106                 finalize(result.status, result.result, finalname);
107                 localExecClient.disconnect();
108                 return;
109             } // else continue
110         }
111         String[] args = finalname.split(" ");
112         File exec = new File(args[0]);
113         if (exec.isAbsolute()) {
114             if (! exec.canExecute()) {
115                 logger.error("Exec command is not executable: " + finalname);
116                 R66Result result = new R66Result(session, false,
117                         ErrorCode.CommandNotFound, session.getRunner());
118                 futureCompletion.setResult(result);
119                 futureCompletion.cancel();
120                 return;
121             }
122         }
123         CommandLine commandLine = new CommandLine(args[0]);
124         for (int i = 1; i < args.length; i ++) {
125             commandLine.addArgument(args[i]);
126         }
127         DefaultExecutor defaultExecutor = new DefaultExecutor();
128         PipedInputStream inputStream = new PipedInputStream();
129         PipedOutputStream outputStream = null;
130         try {
131             outputStream = new PipedOutputStream(inputStream);
132         } catch (IOException e1) {
133             try {
134                 inputStream.close();
135             } catch (IOException e) {
136             }
137             logger.error("Exception: " + e1.getMessage() +
138                     " Exec in error with " + commandLine.toString(), e1);
139             futureCompletion.setFailure(e1);
140             return;
141         }
142         PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(
143                 outputStream, null);
144         defaultExecutor.setStreamHandler(pumpStreamHandler);
145         int[] correctValues = {
146                 0, 1 };
147         defaultExecutor.setExitValues(correctValues);
148         ExecuteWatchdog watchdog = null;
149         if (delay > 0) {
150             watchdog = new ExecuteWatchdog(delay);
151             defaultExecutor.setWatchdog(watchdog);
152         }
153         AllLineReader allLineReader = new AllLineReader(inputStream);
154         Thread thread = new Thread(allLineReader, "ExecRename" + session.getRunner().getSpecialId());
155         thread.setDaemon(true);
156         Configuration.configuration.getExecutorService().execute(thread);
157         int status = -1;
158         try {
159             status = defaultExecutor.execute(commandLine);
160         } catch (ExecuteException e) {
161             if (e.getExitValue() == -559038737) {
162                 // Cannot run immediately so retry once
163                 try {
164                     Thread.sleep(Configuration.RETRYINMS);
165                 } catch (InterruptedException e1) {
166                 }
167                 try {
168                     status = defaultExecutor.execute(commandLine);
169                 } catch (ExecuteException e1) {
170                     finalizeFromError(outputStream, 
171                             pumpStreamHandler, 
172                             inputStream, 
173                             allLineReader, 
174                             thread, 
175                             status, 
176                             commandLine);
177                     return;
178                 } catch (IOException e1) {
179                     try {
180                         outputStream.flush();
181                     } catch (IOException e2) {
182                     }
183                     try {
184                         outputStream.close();
185                     } catch (IOException e2) {
186                     }
187                     thread.interrupt();
188                     try {
189                         inputStream.close();
190                     } catch (IOException e2) {
191                     }
192                     pumpStreamHandler.stop();
193                     logger.error("IOException: " + e.getMessage() +
194                             " . Exec in error with " + commandLine.toString());
195                     futureCompletion.setFailure(e);
196                     return;
197                 }
198             } else {
199                 finalizeFromError(outputStream, 
200                         pumpStreamHandler, 
201                         inputStream, 
202                         allLineReader, 
203                         thread, 
204                         status, 
205                         commandLine);
206                 return;
207             }
208         } catch (IOException e) {
209             try {
210                 outputStream.close();
211             } catch (IOException e1) {
212             }
213             thread.interrupt();
214             try {
215                 inputStream.close();
216             } catch (IOException e1) {
217             }
218             pumpStreamHandler.stop();
219             logger.error("IOException: " + e.getMessage() +
220                     " . Exec in error with " + commandLine.toString());
221             futureCompletion.setFailure(e);
222             return;
223         }
224         try {
225             outputStream.flush();
226         } catch (IOException e) {
227         }
228         try {
229             outputStream.close();
230         } catch (IOException e) {
231         }
232         pumpStreamHandler.stop();
233         try {
234             if (delay > 0) {
235                 thread.join(delay);
236             } else {
237                 thread.join();
238             }
239         } catch (InterruptedException e) {
240             Thread.currentThread().interrupt();
241         }
242         try {
243             inputStream.close();
244         } catch (IOException e1) {
245         }
246         String newname = null;
247         if (defaultExecutor.isFailure(status) && watchdog != null &&
248                 watchdog.killedProcess()) {
249             // kill by the watchdoc (time out)
250             status = -1;
251             newname = "TimeOut";
252         } else {
253             newname = allLineReader.lastLine.toString();
254         }
255         finalize(status, newname, commandLine.toString());
256     }
257 
258     private void finalize(int status, String newName, String commandLine) {
259         String newname = newName;
260         if (status == 0) {
261             futureCompletion.setSuccess();
262             logger.info("Exec OK with {} returns {}", commandLine,
263                     newname);
264         } else if (status == 1) {
265             logger.warn("Exec in warning with " + commandLine+
266                     " returns " + newname);
267             session.getRunner().setErrorExecutionStatus(ErrorCode.Warning);
268             futureCompletion.setSuccess();
269         } else {
270             int pos = newname.lastIndexOf(DELIMITER);
271             if (pos >= 0) {
272                 String newfilename = newname.substring(pos+DELIMITER.length());
273                 newname = newname.substring(0, pos);
274                 if (newfilename.indexOf(' ') > 0) {
275                     logger.warn("Exec returns a multiple string in final line: " +
276                             newfilename);
277                     String []args = newfilename.split(" ");
278                     newfilename = args[args.length - 1];
279                 }
280                 // now test if the previous file was deleted (should be)
281                 File file = new File(newfilename);
282                 if (! file.exists()) {
283                     logger.warn("New file does not exist at the end of the exec: "+newfilename);
284                 }
285                 // now replace the file with the new one
286                 try {
287                     session.getFile().replaceFilename(newfilename, true);
288                 } catch (CommandAbstractException e) {
289                     logger
290                             .warn("Exec in warning with " + commandLine,
291                                     e);
292                 }
293                 session.getRunner().setFileMoved(newfilename, true);
294             }
295             logger.error("Status: " + status + " Exec in error with " +
296                     commandLine + " returns " + newname);
297             OpenR66RunnerErrorException exc = 
298                 new OpenR66RunnerErrorException("<STATUS>" + status +"</STATUS><ERROR>" + newname+"</ERROR>");
299             futureCompletion.setFailure(exc);
300         }
301     }
302     private void finalizeFromError(PipedOutputStream outputStream, 
303             PumpStreamHandler pumpStreamHandler,
304             PipedInputStream inputStream, AllLineReader allLineReader, Thread thread,
305             int status, CommandLine commandLine) {
306         try {
307             Thread.sleep(Configuration.RETRYINMS);
308         } catch (InterruptedException e) {
309         }
310         try {
311             outputStream.flush();
312         } catch (IOException e2) {
313         }
314         try {
315             Thread.sleep(Configuration.RETRYINMS);
316         } catch (InterruptedException e) {
317         }
318         try {
319             outputStream.close();
320         } catch (IOException e1) {
321         }
322         thread.interrupt();
323         try {
324             inputStream.close();
325         } catch (IOException e1) {
326         }
327         try {
328             Thread.sleep(Configuration.RETRYINMS);
329         } catch (InterruptedException e) {
330         }
331         pumpStreamHandler.stop();
332         try {
333             Thread.sleep(Configuration.RETRYINMS);
334         } catch (InterruptedException e) {
335         }
336         String result = allLineReader.lastLine.toString();
337         logger.error("Status: " + status + " Exec in error with " +
338                 commandLine + " returns\n" + result);
339         OpenR66RunnerErrorException exc = 
340             new OpenR66RunnerErrorException("<STATUS>" + status +"</STATUS><ERROR>" + result+"</ERROR>");
341         futureCompletion.setFailure(exc);
342     }
343 }