1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.context.task;
22
23 import goldengate.commandexec.utils.LocalExecResult;
24 import goldengate.common.command.exception.CommandAbstractException;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27
28 import java.io.File;
29 import java.io.IOException;
30 import java.io.PipedInputStream;
31 import java.io.PipedOutputStream;
32
33 import openr66.context.ErrorCode;
34 import openr66.context.R66Result;
35 import openr66.context.R66Session;
36 import openr66.context.task.localexec.LocalExecClient;
37 import openr66.protocol.configuration.Configuration;
38
39 import org.apache.commons.exec.CommandLine;
40 import org.apache.commons.exec.DefaultExecutor;
41 import org.apache.commons.exec.ExecuteException;
42 import org.apache.commons.exec.ExecuteWatchdog;
43 import org.apache.commons.exec.PumpStreamHandler;
44
45
46
47
48
49
50
51
52
53
54
55
56 public class ExecMoveTask extends AbstractTask {
57
58
59
60 private static final GgInternalLogger logger = GgInternalLoggerFactory
61 .getLogger(ExecMoveTask.class);
62
63
64
65
66
67
68
69 public ExecMoveTask(String argRule, int delay, String argTransfer,
70 R66Session session) {
71 super(TaskType.EXECMOVE, delay, argRule, argTransfer, session);
72 }
73
74
75
76
77
78
79 @Override
80 public void run() {
81
82
83
84
85
86
87
88
89
90 logger.info("ExecMove with " + argRule + ":" + argTransfer + " and {}",
91 session);
92 String finalname = argRule;
93 finalname = getReplacedValue(finalname, argTransfer.split(" "));
94
95 waitForValidation = true;
96 if (Configuration.configuration.useLocalExec && useLocalExec) {
97 LocalExecClient localExecClient = new LocalExecClient();
98 if (localExecClient.connect()) {
99 localExecClient.runOneCommand(finalname, delay, waitForValidation, futureCompletion);
100 LocalExecResult result = localExecClient.getLocalExecResult();
101 move(result.status, result.result, finalname);
102 localExecClient.disconnect();
103 return;
104 }
105 }
106 String[] args = finalname.split(" ");
107 File exec = new File(args[0]);
108 if (exec.isAbsolute()) {
109 if (! exec.canExecute()) {
110 logger.error("Exec command is not executable: " + finalname);
111 R66Result result = new R66Result(session, false,
112 ErrorCode.CommandNotFound, session.getRunner());
113 futureCompletion.setResult(result);
114 futureCompletion.cancel();
115 return;
116 }
117 }
118 CommandLine commandLine = new CommandLine(args[0]);
119 for (int i = 1; i < args.length; i ++) {
120 commandLine.addArgument(args[i]);
121 }
122 DefaultExecutor defaultExecutor = new DefaultExecutor();
123 PipedInputStream inputStream = new PipedInputStream();
124 PipedOutputStream outputStream = null;
125 try {
126 outputStream = new PipedOutputStream(inputStream);
127 } catch (IOException e1) {
128 try {
129 inputStream.close();
130 } catch (IOException e) {
131 }
132 logger.error("Exception: " + e1.getMessage() +
133 " Exec in error with " + commandLine.toString(), e1);
134 futureCompletion.setFailure(e1);
135 return;
136 }
137 PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(
138 outputStream, null);
139 defaultExecutor.setStreamHandler(pumpStreamHandler);
140 int[] correctValues = {
141 0, 1 };
142 defaultExecutor.setExitValues(correctValues);
143 ExecuteWatchdog watchdog = null;
144
145 if (delay > 0) {
146 watchdog = new ExecuteWatchdog(delay);
147 defaultExecutor.setWatchdog(watchdog);
148 }
149 LastLineReader lastLineReader = new LastLineReader(inputStream);
150 Thread thread = new Thread(lastLineReader, "ExecRename" + session.getRunner().getSpecialId());
151 thread.setDaemon(true);
152 Configuration.configuration.getExecutorService().execute(thread);
153 int status = -1;
154 try {
155 status = defaultExecutor.execute(commandLine);
156 } catch (ExecuteException e) {
157 if (e.getExitValue() == -559038737) {
158
159 try {
160 Thread.sleep(Configuration.RETRYINMS);
161 } catch (InterruptedException e1) {
162 }
163 try {
164 status = defaultExecutor.execute(commandLine);
165 } catch (ExecuteException e1) {
166 try {
167 outputStream.close();
168 } catch (IOException e2) {
169 }
170 thread.interrupt();
171 try {
172 inputStream.close();
173 } catch (IOException e2) {
174 }
175 pumpStreamHandler.stop();
176 logger.error("ExecuteException: " + e.getMessage() +
177 " . Exec in error with " + commandLine.toString());
178 futureCompletion.setFailure(e);
179 return;
180 } catch (IOException e1) {
181 try {
182 outputStream.close();
183 } catch (IOException e2) {
184 }
185 thread.interrupt();
186 try {
187 inputStream.close();
188 } catch (IOException e2) {
189 }
190 pumpStreamHandler.stop();
191 logger.error("IOException: " + e.getMessage() +
192 " . Exec in error with " + commandLine.toString());
193 futureCompletion.setFailure(e);
194 return;
195 }
196 } else {
197 try {
198 outputStream.close();
199 } catch (IOException e1) {
200 }
201 thread.interrupt();
202 try {
203 inputStream.close();
204 } catch (IOException e1) {
205 }
206 pumpStreamHandler.stop();
207 logger.error("ExecuteException: " + e.getMessage() +
208 " . Exec in error with " + commandLine.toString());
209 futureCompletion.setFailure(e);
210 return;
211 }
212 } catch (IOException e) {
213 try {
214 outputStream.close();
215 } catch (IOException e1) {
216 }
217 thread.interrupt();
218 try {
219 inputStream.close();
220 } catch (IOException e1) {
221 }
222 pumpStreamHandler.stop();
223 logger.error("IOException: " + e.getMessage() +
224 " . Exec in error with " + commandLine.toString());
225 futureCompletion.setFailure(e);
226 return;
227 }
228 try {
229 outputStream.flush();
230 } catch (IOException e) {
231 }
232 try {
233 outputStream.close();
234 } catch (IOException e) {
235 }
236 pumpStreamHandler.stop();
237 try {
238 if (delay > 0) {
239 thread.join(delay);
240 } else {
241 thread.join();
242 }
243 } catch (InterruptedException e) {
244 Thread.currentThread().interrupt();
245 }
246 try {
247 inputStream.close();
248 } catch (IOException e1) {
249 }
250 String newname = null;
251 if (defaultExecutor.isFailure(status) && watchdog != null &&
252 watchdog.killedProcess()) {
253
254 status = -1;
255 newname = "TimeOut";
256 } else {
257 newname = lastLineReader.lastLine;
258 if (status == 0 && (newname == null || newname.length() == 0)) {
259 status = 1;
260 }
261 }
262 move(status, newname, commandLine.toString());
263 }
264
265 private void move(int status, String newName, String commandLine) {
266 String newname = newName;
267 if (status == 0) {
268 if (newname.indexOf(' ') > 0) {
269 logger.warn("Exec returns a multiple string in final line: " +
270 newname);
271 String []args = newname.split(" ");
272 newname = args[args.length - 1];
273 }
274
275 File file = new File(newname);
276 if (! file.exists()) {
277 logger.warn("New file does not exist at the end of the exec: "+newname);
278 }
279
280 try {
281 session.getFile().replaceFilename(newname, true);
282 } catch (CommandAbstractException e) {
283 logger
284 .warn("Exec in warning with " + commandLine,
285 e);
286 }
287 session.getRunner().setFileMoved(newname, true);
288 futureCompletion.setSuccess();
289 logger.info("Exec OK with {} returns {}", commandLine,
290 newname);
291 } else if (status == 1) {
292 logger.warn("Exec in warning with " + commandLine+
293 " returns " + newname);
294 session.getRunner().setErrorExecutionStatus(ErrorCode.Warning);
295 futureCompletion.setSuccess();
296 } else {
297 logger.error("Status: " + status + " Exec in error with " +
298 commandLine + " returns " + newname);
299 futureCompletion.cancel();
300 }
301 }
302 }