1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.context.task;
22
23 import goldengate.commandexec.utils.LocalExecResult;
24 import goldengate.common.command.exception.CommandAbstractException;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27
28 import java.io.File;
29 import java.io.IOException;
30 import java.io.PipedInputStream;
31 import java.io.PipedOutputStream;
32
33 import openr66.context.ErrorCode;
34 import openr66.context.R66Result;
35 import openr66.context.R66Session;
36 import openr66.context.task.exception.OpenR66RunnerErrorException;
37 import openr66.context.task.localexec.LocalExecClient;
38 import openr66.protocol.configuration.Configuration;
39
40 import org.apache.commons.exec.CommandLine;
41 import org.apache.commons.exec.DefaultExecutor;
42 import org.apache.commons.exec.ExecuteException;
43 import org.apache.commons.exec.ExecuteWatchdog;
44 import org.apache.commons.exec.PumpStreamHandler;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class ExecOutputTask extends AbstractTask {
61
62
63
64 private static final GgInternalLogger logger = GgInternalLoggerFactory
65 .getLogger(ExecOutputTask.class);
66
67 public static final String DELIMITER = "NEWFINALNAME:";
68
69
70
71
72
73
74 public ExecOutputTask(String argRule, int delay, String argTransfer,
75 R66Session session) {
76 super(TaskType.EXECOUTPUT, delay, argRule, argTransfer, session);
77 }
78
79
80
81
82
83
84 @Override
85 public void run() {
86
87
88
89
90
91
92
93
94
95 logger.info("ExecOutput with " + argRule + ":" + argTransfer + " and {}",
96 session);
97 String finalname = argRule;
98 finalname = getReplacedValue(finalname, argTransfer.split(" "));
99
100 waitForValidation = true;
101 if (Configuration.configuration.useLocalExec && useLocalExec) {
102 LocalExecClient localExecClient = new LocalExecClient();
103 if (localExecClient.connect()) {
104 localExecClient.runOneCommand(finalname, delay, waitForValidation, futureCompletion);
105 LocalExecResult result = localExecClient.getLocalExecResult();
106 finalize(result.status, result.result, finalname);
107 localExecClient.disconnect();
108 return;
109 }
110 }
111 String[] args = finalname.split(" ");
112 File exec = new File(args[0]);
113 if (exec.isAbsolute()) {
114 if (! exec.canExecute()) {
115 logger.error("Exec command is not executable: " + finalname);
116 R66Result result = new R66Result(session, false,
117 ErrorCode.CommandNotFound, session.getRunner());
118 futureCompletion.setResult(result);
119 futureCompletion.cancel();
120 return;
121 }
122 }
123 CommandLine commandLine = new CommandLine(args[0]);
124 for (int i = 1; i < args.length; i ++) {
125 commandLine.addArgument(args[i]);
126 }
127 DefaultExecutor defaultExecutor = new DefaultExecutor();
128 PipedInputStream inputStream = new PipedInputStream();
129 PipedOutputStream outputStream = null;
130 try {
131 outputStream = new PipedOutputStream(inputStream);
132 } catch (IOException e1) {
133 try {
134 inputStream.close();
135 } catch (IOException e) {
136 }
137 logger.error("Exception: " + e1.getMessage() +
138 " Exec in error with " + commandLine.toString(), e1);
139 futureCompletion.setFailure(e1);
140 return;
141 }
142 PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(
143 outputStream, null);
144 defaultExecutor.setStreamHandler(pumpStreamHandler);
145 int[] correctValues = {
146 0, 1 };
147 defaultExecutor.setExitValues(correctValues);
148 ExecuteWatchdog watchdog = null;
149 if (delay > 0) {
150 watchdog = new ExecuteWatchdog(delay);
151 defaultExecutor.setWatchdog(watchdog);
152 }
153 AllLineReader allLineReader = new AllLineReader(inputStream);
154 Thread thread = new Thread(allLineReader, "ExecRename" + session.getRunner().getSpecialId());
155 thread.setDaemon(true);
156 Configuration.configuration.getExecutorService().execute(thread);
157 int status = -1;
158 try {
159 status = defaultExecutor.execute(commandLine);
160 } catch (ExecuteException e) {
161 if (e.getExitValue() == -559038737) {
162
163 try {
164 Thread.sleep(Configuration.RETRYINMS);
165 } catch (InterruptedException e1) {
166 }
167 try {
168 status = defaultExecutor.execute(commandLine);
169 } catch (ExecuteException e1) {
170 finalizeFromError(outputStream,
171 pumpStreamHandler,
172 inputStream,
173 allLineReader,
174 thread,
175 status,
176 commandLine);
177 return;
178 } catch (IOException e1) {
179 try {
180 outputStream.flush();
181 } catch (IOException e2) {
182 }
183 try {
184 outputStream.close();
185 } catch (IOException e2) {
186 }
187 thread.interrupt();
188 try {
189 inputStream.close();
190 } catch (IOException e2) {
191 }
192 pumpStreamHandler.stop();
193 logger.error("IOException: " + e.getMessage() +
194 " . Exec in error with " + commandLine.toString());
195 futureCompletion.setFailure(e);
196 return;
197 }
198 } else {
199 finalizeFromError(outputStream,
200 pumpStreamHandler,
201 inputStream,
202 allLineReader,
203 thread,
204 status,
205 commandLine);
206 return;
207 }
208 } catch (IOException e) {
209 try {
210 outputStream.close();
211 } catch (IOException e1) {
212 }
213 thread.interrupt();
214 try {
215 inputStream.close();
216 } catch (IOException e1) {
217 }
218 pumpStreamHandler.stop();
219 logger.error("IOException: " + e.getMessage() +
220 " . Exec in error with " + commandLine.toString());
221 futureCompletion.setFailure(e);
222 return;
223 }
224 try {
225 outputStream.flush();
226 } catch (IOException e) {
227 }
228 try {
229 outputStream.close();
230 } catch (IOException e) {
231 }
232 pumpStreamHandler.stop();
233 try {
234 if (delay > 0) {
235 thread.join(delay);
236 } else {
237 thread.join();
238 }
239 } catch (InterruptedException e) {
240 Thread.currentThread().interrupt();
241 }
242 try {
243 inputStream.close();
244 } catch (IOException e1) {
245 }
246 String newname = null;
247 if (defaultExecutor.isFailure(status) && watchdog != null &&
248 watchdog.killedProcess()) {
249
250 status = -1;
251 newname = "TimeOut";
252 } else {
253 newname = allLineReader.lastLine.toString();
254 }
255 finalize(status, newname, commandLine.toString());
256 }
257
258 private void finalize(int status, String newName, String commandLine) {
259 String newname = newName;
260 if (status == 0) {
261 futureCompletion.setSuccess();
262 logger.info("Exec OK with {} returns {}", commandLine,
263 newname);
264 } else if (status == 1) {
265 logger.warn("Exec in warning with " + commandLine+
266 " returns " + newname);
267 session.getRunner().setErrorExecutionStatus(ErrorCode.Warning);
268 futureCompletion.setSuccess();
269 } else {
270 int pos = newname.lastIndexOf(DELIMITER);
271 if (pos >= 0) {
272 String newfilename = newname.substring(pos+DELIMITER.length());
273 newname = newname.substring(0, pos);
274 if (newfilename.indexOf(' ') > 0) {
275 logger.warn("Exec returns a multiple string in final line: " +
276 newfilename);
277 String []args = newfilename.split(" ");
278 newfilename = args[args.length - 1];
279 }
280
281 File file = new File(newfilename);
282 if (! file.exists()) {
283 logger.warn("New file does not exist at the end of the exec: "+newfilename);
284 }
285
286 try {
287 session.getFile().replaceFilename(newfilename, true);
288 } catch (CommandAbstractException e) {
289 logger
290 .warn("Exec in warning with " + commandLine,
291 e);
292 }
293 session.getRunner().setFileMoved(newfilename, true);
294 }
295 logger.error("Status: " + status + " Exec in error with " +
296 commandLine + " returns " + newname);
297 OpenR66RunnerErrorException exc =
298 new OpenR66RunnerErrorException("<STATUS>" + status +"</STATUS><ERROR>" + newname+"</ERROR>");
299 futureCompletion.setFailure(exc);
300 }
301 }
302 private void finalizeFromError(PipedOutputStream outputStream,
303 PumpStreamHandler pumpStreamHandler,
304 PipedInputStream inputStream, AllLineReader allLineReader, Thread thread,
305 int status, CommandLine commandLine) {
306 try {
307 Thread.sleep(Configuration.RETRYINMS);
308 } catch (InterruptedException e) {
309 }
310 try {
311 outputStream.flush();
312 } catch (IOException e2) {
313 }
314 try {
315 Thread.sleep(Configuration.RETRYINMS);
316 } catch (InterruptedException e) {
317 }
318 try {
319 outputStream.close();
320 } catch (IOException e1) {
321 }
322 thread.interrupt();
323 try {
324 inputStream.close();
325 } catch (IOException e1) {
326 }
327 try {
328 Thread.sleep(Configuration.RETRYINMS);
329 } catch (InterruptedException e) {
330 }
331 pumpStreamHandler.stop();
332 try {
333 Thread.sleep(Configuration.RETRYINMS);
334 } catch (InterruptedException e) {
335 }
336 String result = allLineReader.lastLine.toString();
337 logger.error("Status: " + status + " Exec in error with " +
338 commandLine + " returns\n" + result);
339 OpenR66RunnerErrorException exc =
340 new OpenR66RunnerErrorException("<STATUS>" + status +"</STATUS><ERROR>" + result+"</ERROR>");
341 futureCompletion.setFailure(exc);
342 }
343 }