View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package goldengate.commandexec.server;
22  
23  import goldengate.commandexec.utils.LocalExecDefaultResult;
24  import goldengate.common.logging.GgInternalLogger;
25  import goldengate.common.logging.GgInternalLoggerFactory;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.File;
29  import java.io.IOException;
30  import java.nio.channels.CancelledKeyException;
31  import java.nio.channels.ClosedChannelException;
32  import java.util.Map;
33  import java.util.Timer;
34  import java.util.TimerTask;
35  import java.util.concurrent.RejectedExecutionException;
36  
37  import org.apache.commons.exec.CommandLine;
38  import org.apache.commons.exec.DefaultExecutor;
39  import org.apache.commons.exec.ExecuteException;
40  import org.apache.commons.exec.ExecuteWatchdog;
41  import org.apache.commons.exec.PumpStreamHandler;
42  
43  import org.jboss.netty.channel.Channel;
44  import org.jboss.netty.channel.ChannelHandlerContext;
45  import org.jboss.netty.channel.ChannelStateEvent;
46  import org.jboss.netty.channel.Channels;
47  import org.jboss.netty.channel.ExceptionEvent;
48  import org.jboss.netty.channel.MessageEvent;
49  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
50  
51  /**
52   * Handles a server-side channel for LocalExec.
53   *
54   *
55   */
56  public class LocalExecServerHandler extends SimpleChannelUpstreamHandler {
57      // Fixed delay, but could change if necessary at construction
58      private long delay = LocalExecDefaultResult.MAXWAITPROCESS;
59      protected LocalExecServerPipelineFactory factory = null;
60      static protected boolean isShutdown = false;
61  
62      /**
63       * Internal Logger
64       */
65      private static final GgInternalLogger logger = GgInternalLoggerFactory
66              .getLogger(LocalExecServerHandler.class);
67  
68      protected boolean answered = false;
69  
70      /**
71       * Is the Local Exec Server going Shutdown
72       * @param channel associated channel
73       * @return True if in Shutdown
74       */
75      public static boolean isShutdown(Channel channel) {
76          if (isShutdown) {
77              channel.write(LocalExecDefaultResult.ConnectionRefused.result);
78              try {
79                  channel.write(LocalExecDefaultResult.ENDOFCOMMAND).await();
80              } catch (InterruptedException e) {
81              }
82              Channels.close(channel);
83              return true;
84          }
85          return false;
86      }
87      /**
88       * Print stack trace
89       * @param thread
90       * @param stacks
91       */
92      static private void printStackTrace(Thread thread, StackTraceElement[] stacks) {
93          System.err.print(thread.toString() + " : ");
94          for (int i = 0; i < stacks.length-1; i++) {
95              System.err.print(stacks[i].toString()+" ");
96          }
97          System.err.println(stacks[stacks.length-1].toString());
98      }
99      /**
100      * Shutdown thread
101      * @author Frederic Bregier
102      *
103      */
104     private static class GGLEThreadShutdown extends Thread {
105         long delay = 3000;
106         LocalExecServerPipelineFactory factory;
107         public GGLEThreadShutdown(LocalExecServerPipelineFactory factory) {
108             this.factory = factory;
109         }
110         /* (non-Javadoc)
111          * @see java.lang.Thread#run()
112          */
113         @Override
114         public void run() {
115             Timer timer = null;
116             timer = new Timer(true);
117             GGLETimerTask ggleTimerTask = new GGLETimerTask();
118             timer.schedule(ggleTimerTask, delay);
119             factory.releaseResources();
120             System.exit(0);
121         }
122 
123     }
124     /**
125      * TimerTask to terminate the server
126      * @author Frederic Bregier
127      *
128      */
129     private static class GGLETimerTask extends TimerTask {
130         /**
131          * Internal Logger
132          */
133         private static final GgInternalLogger logger = GgInternalLoggerFactory
134                 .getLogger(GGLETimerTask.class);
135         /*
136          * (non-Javadoc)
137          *
138          * @see java.util.TimerTask#run()
139          */
140         @Override
141         public void run() {
142             logger.error("System will force EXIT");
143             Map<Thread, StackTraceElement[]> map = Thread
144                     .getAllStackTraces();
145             for (Thread thread: map.keySet()) {
146                 printStackTrace(thread, map.get(thread));
147             }
148             System.exit(0);
149         }
150     }
151     /**
152      * Constructor with a specific delay
153      * @param newdelay
154      */
155     public LocalExecServerHandler(LocalExecServerPipelineFactory factory, long newdelay) {
156         this.factory = factory;
157         delay = newdelay;
158     }
159 
160     /* (non-Javadoc)
161      * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
162      */
163     @Override
164     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
165             throws Exception {
166         if (isShutdown(ctx.getChannel())) {
167             answered = true;
168             return;
169         }
170         answered = false;
171         factory.addChannel(ctx.getChannel());
172     }
173     /* (non-Javadoc)
174      * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
175      */
176     @Override
177     public void channelDisconnected(ChannelHandlerContext ctx,
178             ChannelStateEvent e) throws Exception {
179         this.factory.removeChannel(e.getChannel());
180     }
181     /**
182      * Change the delay to the specific value. Need to be called before any receive message.
183      * @param newdelay
184      */
185     public void setNewDelay(long newdelay) {
186         delay = newdelay;
187     }
188 
189     @Override
190     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
191         answered = false;
192         // Cast to a String first.
193         // We know it is a String because we put some codec in
194         // LocalExecPipelineFactory.
195         String request = (String) evt.getMessage();
196         
197         // Generate and write a response.
198         String response;
199         response = LocalExecDefaultResult.NoStatus.status+" "+
200             LocalExecDefaultResult.NoStatus.result;
201         boolean isLocallyShutdown = false;
202         ExecuteWatchdog watchdog = null;
203         try {
204             if (request.length() == 0) {
205                 // No command
206                 response = LocalExecDefaultResult.NoCommand.status+" "+
207                     LocalExecDefaultResult.NoCommand.result;
208             } else {
209                 String[] args = request.split(" ");
210                 int cpt = 0;
211                 long tempDelay;
212                 try {
213                     tempDelay = Long.parseLong(args[0]);
214                     cpt++;
215                 } catch (NumberFormatException e) {
216                     tempDelay = delay;
217                 }
218                 if (tempDelay < 0) {
219                     // Shutdown Order
220                     isShutdown = true;
221                     logger.warn("Shutdown order received");
222                     isLocallyShutdown = isShutdown(evt.getChannel());
223                     // Wait the specified time
224                     try {
225                         Thread.sleep((-tempDelay/10)*10);
226                     } catch (InterruptedException e) {
227                     }
228                     Thread thread = new GGLEThreadShutdown(factory);
229                     thread.start();
230                     return;
231                 }
232                 String binary = args[cpt++];
233                 File exec = new File(binary);
234                 if (exec.isAbsolute()) {
235                     // If true file, is it executable
236                     if (! exec.canExecute()) {
237                         logger.error("Exec command is not executable: " + request);
238                         response = LocalExecDefaultResult.NotExecutable.status+" "+
239                             LocalExecDefaultResult.NotExecutable.result;
240                         return;
241                     }
242                 }
243                 // Create command with parameters
244                 CommandLine commandLine = new CommandLine(binary);
245                 for (; cpt < args.length; cpt ++) {
246                     commandLine.addArgument(args[cpt]);
247                 }
248                 DefaultExecutor defaultExecutor = new DefaultExecutor();
249                 ByteArrayOutputStream outputStream;
250                 outputStream = new ByteArrayOutputStream();
251                 PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(outputStream);
252                 defaultExecutor.setStreamHandler(pumpStreamHandler);
253                 int[] correctValues = { 0, 1 };
254                 defaultExecutor.setExitValues(correctValues);
255                 if (tempDelay > 0) {
256                     // If delay (max time), then setup Watchdog
257                     watchdog = new ExecuteWatchdog(tempDelay);
258                     defaultExecutor.setWatchdog(watchdog);
259                 }
260                 int status = -1;
261                 try {
262                     // Execute the command
263                     status = defaultExecutor.execute(commandLine);
264                 } catch (ExecuteException e) {
265                     if (e.getExitValue() == -559038737) {
266                         // Cannot run immediately so retry once
267                         try {
268                             Thread.sleep(LocalExecDefaultResult.RETRYINMS);
269                         } catch (InterruptedException e1) {
270                         }
271                         try {
272                             status = defaultExecutor.execute(commandLine);
273                         } catch (ExecuteException e1) {
274                             pumpStreamHandler.stop();
275                             logger.error("Exception: " + e.getMessage() +
276                                     " Exec in error with " + commandLine.toString());
277                             response = LocalExecDefaultResult.BadExecution.status+" "+
278                                 LocalExecDefaultResult.BadExecution.result;
279                             try {
280                                 outputStream.close();
281                             } catch (IOException e2) {
282                             }
283                             return;
284                         } catch (IOException e1) {
285                             pumpStreamHandler.stop();
286                             logger.error("Exception: " + e.getMessage() +
287                                     " Exec in error with " + commandLine.toString());
288                             response = LocalExecDefaultResult.BadExecution.status+" "+
289                                 LocalExecDefaultResult.BadExecution.result;
290                             try {
291                                 outputStream.close();
292                             } catch (IOException e2) {
293                             }
294                             return;
295                         }
296                     } else {
297                         pumpStreamHandler.stop();
298                         logger.error("Exception: " + e.getMessage() +
299                                 " Exec in error with " + commandLine.toString());
300                         response = LocalExecDefaultResult.BadExecution.status+" "+
301                             LocalExecDefaultResult.BadExecution.result;
302                         try {
303                             outputStream.close();
304                         } catch (IOException e2) {
305                         }
306                         return;
307                     }
308                 } catch (IOException e) {
309                     pumpStreamHandler.stop();
310                     logger.error("Exception: " + e.getMessage() +
311                             " Exec in error with " + commandLine.toString());
312                     response = LocalExecDefaultResult.BadExecution.status+" "+
313                         LocalExecDefaultResult.BadExecution.result;
314                     try {
315                         outputStream.close();
316                     } catch (IOException e2) {
317                     }
318                     return;
319                 }
320                 pumpStreamHandler.stop();
321                 if (defaultExecutor.isFailure(status) && watchdog != null &&
322                         watchdog.killedProcess()) {
323                     // kill by the watchdoc (time out)
324                     logger.error("Exec is in Time Out");
325                     response = LocalExecDefaultResult.TimeOutExecution.status+" "+
326                         LocalExecDefaultResult.TimeOutExecution.result;
327                     try {
328                         outputStream.close();
329                     } catch (IOException e2) {
330                     }
331                 } else {
332                     response = status+" "+outputStream.toString();
333                     try {
334                         outputStream.close();
335                     } catch (IOException e2) {
336                     }
337                 }
338             }
339         } finally {
340             if (isLocallyShutdown) {
341                 return;
342             }
343             // We do not need to write a ChannelBuffer here.
344             // We know the encoder inserted at LocalExecPipelineFactory will do the
345             // conversion.
346             evt.getChannel().write(response+"\n");
347             answered = true;
348             if (watchdog != null) {
349                 watchdog.stop();
350             }
351             logger.info("End of Command: "+request+" : "+response);
352             evt.getChannel().write(LocalExecDefaultResult.ENDOFCOMMAND);
353         }
354     }
355 
356     @Override
357     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
358         if (answered) {
359             logger.debug("Exception while answered: ",e.getCause());
360         } else {
361             logger.error("Unexpected exception from downstream while not answered.", e
362                 .getCause());
363         }
364         Throwable e1 = e.getCause();
365         // Look if Nothing to do since execution will stop later on and
366         // an error will occur on client side
367         // since no message arrived before close (or partially)
368         if (e1 instanceof CancelledKeyException) {
369         } else if (e1 instanceof ClosedChannelException) {
370         } else if (e1 instanceof NullPointerException) {
371             if (e.getChannel().isConnected()) {
372                 e.getChannel().close();
373             }
374         } else if (e1 instanceof IOException) {
375             if (e.getChannel().isConnected()) {
376                 e.getChannel().close();
377             }
378         } else if (e1 instanceof RejectedExecutionException) {
379             if (e.getChannel().isConnected()) {
380                 e.getChannel().close();
381             }
382         }
383     }
384 }