1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.commandexec.server;
22
23 import goldengate.commandexec.utils.LocalExecDefaultResult;
24 import goldengate.common.logging.GgInternalLogger;
25 import goldengate.common.logging.GgInternalLoggerFactory;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.File;
29 import java.io.IOException;
30 import java.nio.channels.CancelledKeyException;
31 import java.nio.channels.ClosedChannelException;
32 import java.util.Map;
33 import java.util.Timer;
34 import java.util.TimerTask;
35 import java.util.concurrent.RejectedExecutionException;
36
37 import org.apache.commons.exec.CommandLine;
38 import org.apache.commons.exec.DefaultExecutor;
39 import org.apache.commons.exec.ExecuteException;
40 import org.apache.commons.exec.ExecuteWatchdog;
41 import org.apache.commons.exec.PumpStreamHandler;
42
43 import org.jboss.netty.channel.Channel;
44 import org.jboss.netty.channel.ChannelHandlerContext;
45 import org.jboss.netty.channel.ChannelStateEvent;
46 import org.jboss.netty.channel.Channels;
47 import org.jboss.netty.channel.ExceptionEvent;
48 import org.jboss.netty.channel.MessageEvent;
49 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
50
51
52
53
54
55
56 public class LocalExecServerHandler extends SimpleChannelUpstreamHandler {
57
58 private long delay = LocalExecDefaultResult.MAXWAITPROCESS;
59 protected LocalExecServerPipelineFactory factory = null;
60 static protected boolean isShutdown = false;
61
62
63
64
65 private static final GgInternalLogger logger = GgInternalLoggerFactory
66 .getLogger(LocalExecServerHandler.class);
67
68 protected boolean answered = false;
69
70
71
72
73
74
75 public static boolean isShutdown(Channel channel) {
76 if (isShutdown) {
77 channel.write(LocalExecDefaultResult.ConnectionRefused.result);
78 try {
79 channel.write(LocalExecDefaultResult.ENDOFCOMMAND).await();
80 } catch (InterruptedException e) {
81 }
82 Channels.close(channel);
83 return true;
84 }
85 return false;
86 }
87
88
89
90
91
92 static private void printStackTrace(Thread thread, StackTraceElement[] stacks) {
93 System.err.print(thread.toString() + " : ");
94 for (int i = 0; i < stacks.length-1; i++) {
95 System.err.print(stacks[i].toString()+" ");
96 }
97 System.err.println(stacks[stacks.length-1].toString());
98 }
99
100
101
102
103
104 private static class GGLEThreadShutdown extends Thread {
105 long delay = 3000;
106 LocalExecServerPipelineFactory factory;
107 public GGLEThreadShutdown(LocalExecServerPipelineFactory factory) {
108 this.factory = factory;
109 }
110
111
112
113 @Override
114 public void run() {
115 Timer timer = null;
116 timer = new Timer(true);
117 GGLETimerTask ggleTimerTask = new GGLETimerTask();
118 timer.schedule(ggleTimerTask, delay);
119 factory.releaseResources();
120 System.exit(0);
121 }
122
123 }
124
125
126
127
128
129 private static class GGLETimerTask extends TimerTask {
130
131
132
133 private static final GgInternalLogger logger = GgInternalLoggerFactory
134 .getLogger(GGLETimerTask.class);
135
136
137
138
139
140 @Override
141 public void run() {
142 logger.error("System will force EXIT");
143 Map<Thread, StackTraceElement[]> map = Thread
144 .getAllStackTraces();
145 for (Thread thread: map.keySet()) {
146 printStackTrace(thread, map.get(thread));
147 }
148 System.exit(0);
149 }
150 }
151
152
153
154
155 public LocalExecServerHandler(LocalExecServerPipelineFactory factory, long newdelay) {
156 this.factory = factory;
157 delay = newdelay;
158 }
159
160
161
162
163 @Override
164 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
165 throws Exception {
166 if (isShutdown(ctx.getChannel())) {
167 answered = true;
168 return;
169 }
170 answered = false;
171 factory.addChannel(ctx.getChannel());
172 }
173
174
175
176 @Override
177 public void channelDisconnected(ChannelHandlerContext ctx,
178 ChannelStateEvent e) throws Exception {
179 this.factory.removeChannel(e.getChannel());
180 }
181
182
183
184
185 public void setNewDelay(long newdelay) {
186 delay = newdelay;
187 }
188
189 @Override
190 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
191 answered = false;
192
193
194
195 String request = (String) evt.getMessage();
196
197
198 String response;
199 response = LocalExecDefaultResult.NoStatus.status+" "+
200 LocalExecDefaultResult.NoStatus.result;
201 boolean isLocallyShutdown = false;
202 ExecuteWatchdog watchdog = null;
203 try {
204 if (request.length() == 0) {
205
206 response = LocalExecDefaultResult.NoCommand.status+" "+
207 LocalExecDefaultResult.NoCommand.result;
208 } else {
209 String[] args = request.split(" ");
210 int cpt = 0;
211 long tempDelay;
212 try {
213 tempDelay = Long.parseLong(args[0]);
214 cpt++;
215 } catch (NumberFormatException e) {
216 tempDelay = delay;
217 }
218 if (tempDelay < 0) {
219
220 isShutdown = true;
221 logger.warn("Shutdown order received");
222 isLocallyShutdown = isShutdown(evt.getChannel());
223
224 try {
225 Thread.sleep((-tempDelay/10)*10);
226 } catch (InterruptedException e) {
227 }
228 Thread thread = new GGLEThreadShutdown(factory);
229 thread.start();
230 return;
231 }
232 String binary = args[cpt++];
233 File exec = new File(binary);
234 if (exec.isAbsolute()) {
235
236 if (! exec.canExecute()) {
237 logger.error("Exec command is not executable: " + request);
238 response = LocalExecDefaultResult.NotExecutable.status+" "+
239 LocalExecDefaultResult.NotExecutable.result;
240 return;
241 }
242 }
243
244 CommandLine commandLine = new CommandLine(binary);
245 for (; cpt < args.length; cpt ++) {
246 commandLine.addArgument(args[cpt]);
247 }
248 DefaultExecutor defaultExecutor = new DefaultExecutor();
249 ByteArrayOutputStream outputStream;
250 outputStream = new ByteArrayOutputStream();
251 PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(outputStream);
252 defaultExecutor.setStreamHandler(pumpStreamHandler);
253 int[] correctValues = { 0, 1 };
254 defaultExecutor.setExitValues(correctValues);
255 if (tempDelay > 0) {
256
257 watchdog = new ExecuteWatchdog(tempDelay);
258 defaultExecutor.setWatchdog(watchdog);
259 }
260 int status = -1;
261 try {
262
263 status = defaultExecutor.execute(commandLine);
264 } catch (ExecuteException e) {
265 if (e.getExitValue() == -559038737) {
266
267 try {
268 Thread.sleep(LocalExecDefaultResult.RETRYINMS);
269 } catch (InterruptedException e1) {
270 }
271 try {
272 status = defaultExecutor.execute(commandLine);
273 } catch (ExecuteException e1) {
274 pumpStreamHandler.stop();
275 logger.error("Exception: " + e.getMessage() +
276 " Exec in error with " + commandLine.toString());
277 response = LocalExecDefaultResult.BadExecution.status+" "+
278 LocalExecDefaultResult.BadExecution.result;
279 try {
280 outputStream.close();
281 } catch (IOException e2) {
282 }
283 return;
284 } catch (IOException e1) {
285 pumpStreamHandler.stop();
286 logger.error("Exception: " + e.getMessage() +
287 " Exec in error with " + commandLine.toString());
288 response = LocalExecDefaultResult.BadExecution.status+" "+
289 LocalExecDefaultResult.BadExecution.result;
290 try {
291 outputStream.close();
292 } catch (IOException e2) {
293 }
294 return;
295 }
296 } else {
297 pumpStreamHandler.stop();
298 logger.error("Exception: " + e.getMessage() +
299 " Exec in error with " + commandLine.toString());
300 response = LocalExecDefaultResult.BadExecution.status+" "+
301 LocalExecDefaultResult.BadExecution.result;
302 try {
303 outputStream.close();
304 } catch (IOException e2) {
305 }
306 return;
307 }
308 } catch (IOException e) {
309 pumpStreamHandler.stop();
310 logger.error("Exception: " + e.getMessage() +
311 " Exec in error with " + commandLine.toString());
312 response = LocalExecDefaultResult.BadExecution.status+" "+
313 LocalExecDefaultResult.BadExecution.result;
314 try {
315 outputStream.close();
316 } catch (IOException e2) {
317 }
318 return;
319 }
320 pumpStreamHandler.stop();
321 if (defaultExecutor.isFailure(status) && watchdog != null &&
322 watchdog.killedProcess()) {
323
324 logger.error("Exec is in Time Out");
325 response = LocalExecDefaultResult.TimeOutExecution.status+" "+
326 LocalExecDefaultResult.TimeOutExecution.result;
327 try {
328 outputStream.close();
329 } catch (IOException e2) {
330 }
331 } else {
332 response = status+" "+outputStream.toString();
333 try {
334 outputStream.close();
335 } catch (IOException e2) {
336 }
337 }
338 }
339 } finally {
340 if (isLocallyShutdown) {
341 return;
342 }
343
344
345
346 evt.getChannel().write(response+"\n");
347 answered = true;
348 if (watchdog != null) {
349 watchdog.stop();
350 }
351 logger.info("End of Command: "+request+" : "+response);
352 evt.getChannel().write(LocalExecDefaultResult.ENDOFCOMMAND);
353 }
354 }
355
356 @Override
357 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
358 if (answered) {
359 logger.debug("Exception while answered: ",e.getCause());
360 } else {
361 logger.error("Unexpected exception from downstream while not answered.", e
362 .getCause());
363 }
364 Throwable e1 = e.getCause();
365
366
367
368 if (e1 instanceof CancelledKeyException) {
369 } else if (e1 instanceof ClosedChannelException) {
370 } else if (e1 instanceof NullPointerException) {
371 if (e.getChannel().isConnected()) {
372 e.getChannel().close();
373 }
374 } else if (e1 instanceof IOException) {
375 if (e.getChannel().isConnected()) {
376 e.getChannel().close();
377 }
378 } else if (e1 instanceof RejectedExecutionException) {
379 if (e.getChannel().isConnected()) {
380 e.getChannel().close();
381 }
382 }
383 }
384 }