View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.client;
22  
23  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24  import goldengate.common.database.exception.GoldenGateDatabaseException;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  import goldengate.common.logging.GgSlf4JLoggerFactory;
28  
29  import java.net.SocketAddress;
30  
31  import openr66.configuration.FileBasedConfiguration;
32  import openr66.context.ErrorCode;
33  import openr66.context.R66FiniteDualStates;
34  import openr66.context.R66Result;
35  import openr66.context.authentication.R66Auth;
36  import openr66.context.task.exception.OpenR66RunnerErrorException;
37  import openr66.database.DbConstant;
38  import openr66.database.data.DbHostAuth;
39  import openr66.database.data.DbTaskRunner;
40  import openr66.protocol.configuration.Configuration;
41  import openr66.protocol.exception.OpenR66DatabaseGlobalException;
42  import openr66.protocol.exception.OpenR66Exception;
43  import openr66.protocol.exception.OpenR66ProtocolPacketException;
44  import openr66.protocol.localhandler.LocalChannelReference;
45  import openr66.protocol.localhandler.packet.LocalPacketFactory;
46  import openr66.protocol.localhandler.packet.ValidPacket;
47  import openr66.protocol.networkhandler.NetworkTransaction;
48  import openr66.protocol.utils.ChannelUtils;
49  import openr66.protocol.utils.R66Future;
50  
51  import org.jboss.netty.channel.Channels;
52  import org.jboss.netty.logging.InternalLoggerFactory;
53  
54  /**
55   * Class to request information or request cancellation or restart
56   *
57   * @author Frederic Bregier
58   *
59   */
60  public class RequestTransfer implements Runnable {
61      /**
62       * Internal Logger
63       */
64      static volatile GgInternalLogger logger;
65  
66      protected final NetworkTransaction networkTransaction;
67      final R66Future future;
68      final long specialId;
69      String requested = null;
70      String requester = null;
71      boolean cancel = false;
72      boolean stop = false;
73      boolean restart = false;
74  
75      static long sspecialId;
76      static String srequested = null;
77      static String srequester = null;
78      static boolean scancel = false;
79      static boolean sstop = false;
80      static boolean srestart = false;
81  
82      /**
83       * Parse the parameter and set current values
84       * @param args
85       * @return True if all parameters were found and correct
86       */
87      protected static boolean getParams(String []args) {
88          if (args.length < 5) {
89              logger
90                      .error("Needs at least 5 arguments:\n" +
91                              "  the XML client configuration file,\n" +
92                              "  '-id' the transfer Id,\n" +
93                              "  '-to' the requested host Id or '-from' the requester host Id " +
94                              "(localhost will be the opposite),\n" +
95                              "Other options (only one):\n" +
96                              "  '-cancel' to cancel completely the transfer,\n" +
97                              "  '-stop' to stop the transfer (maybe restarted),\n" +
98                              "  '-restart' to restart if possible a transfer");
99              return false;
100         }
101         if (! FileBasedConfiguration
102                 .setClientConfigurationFromXml(Configuration.configuration, args[0])) {
103             logger
104                     .error("Needs a correct configuration file as first argument");
105             return false;
106         }
107         for (int i = 1; i < args.length; i++) {
108             if (args[i].equalsIgnoreCase("-id")) {
109                 i++;
110                 sspecialId = Long.parseLong(args[i]);
111             } else if (args[i].equalsIgnoreCase("-to")) {
112                 i++;
113                 srequested = args[i];
114                 try {
115                     srequester = Configuration.configuration.getHostId(DbConstant.admin.session,
116                             srequested);
117                 } catch (GoldenGateDatabaseException e) {
118                     logger.error("Cannot get Host Id: "+srequester,e);
119                     return false;
120                 }
121             } else if (args[i].equalsIgnoreCase("-from")) {
122                 i++;
123                 srequester = args[i];
124                 try {
125                     srequested = Configuration.configuration.getHostId(DbConstant.admin.session,
126                             srequester);
127                 } catch (GoldenGateDatabaseException e) {
128                     logger.error("Cannot get Host Id: "+srequested,e);
129                     return false;
130                 }
131             } else if (args[i].equalsIgnoreCase("-cancel")) {
132                 scancel = true;
133             } else if (args[i].equalsIgnoreCase("-stop")) {
134                 sstop = true;
135             } else if (args[i].equalsIgnoreCase("-restart")) {
136                 srestart = true;
137             }
138         }
139         if ((scancel && srestart) || (scancel && sstop) || (srestart && sstop)) {
140             logger.error("Cannot cancel or restart or stop at the same time");
141             return false;
142         }
143         if (sspecialId == DbConstant.ILLEGALVALUE || srequested == null) {
144             logger.error("TransferId and Requested/Requester HostId must be set");
145             return false;
146         }
147 
148         return true;
149     }
150 
151 
152     /**
153      * @param future
154      * @param specialId
155      * @param requested
156      * @param requester
157      * @param cancel
158      * @param stop
159      * @param restart
160      * @param networkTransaction
161      */
162     public RequestTransfer(R66Future future, long specialId, String requested, String requester,
163             boolean cancel, boolean stop, boolean restart,
164             NetworkTransaction networkTransaction) {
165         this.future = future;
166         this.specialId = specialId;
167         this.requested = requested;
168         this.requester = requester;
169         this.cancel = cancel;
170         this.stop = stop;
171         this.restart = restart;
172         this.networkTransaction = networkTransaction;
173     }
174 
175 
176     public void run() {
177         if (logger == null) {
178             logger = GgInternalLoggerFactory.getLogger(RequestTransfer.class);
179         }
180         DbTaskRunner runner = null;
181         try {
182             runner = new DbTaskRunner(DbConstant.admin.session,null,null,
183                     specialId,requester,requested);
184         } catch (GoldenGateDatabaseException e) {
185             logger.error("Cannot find the transfer");
186             future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
187                     ErrorCode.Internal, null));
188             future.setFailure(e);
189             return;
190         }
191         if (cancel || stop || restart) {
192             if (cancel) {
193                 // Cancel the task and delete any file if in retrieve
194                 if (runner.isAllDone()) {
195                     // nothing to do since already finished
196                     setDone(runner);
197                     logger.info("Transfer already finished: "+runner.toString());
198                     future.setResult(new R66Result(null,true,ErrorCode.TransferOk, runner));
199                     future.getResult().runner = runner;
200                     future.setSuccess();
201                     return;
202                 } else {
203                     // Send a request of cancel
204                     ErrorCode code = sendValid(LocalPacketFactory.CANCELPACKET);
205                     switch (code) {
206                         case CompleteOk:
207                             logger.info("Transfer cancel requested and done: {}",
208                                 runner);
209                             break;
210                         case TransferOk:
211                             logger.info("Transfer cancel requested but already finished: {}",
212                                 runner);
213                             break;
214                         default:
215                             logger.info("Transfer cancel requested but internal error: {}",
216                                 runner);
217                             break;
218                     }
219                 }
220             } else if (stop) {
221                 // Just stop the task
222                 // Send a request
223                 ErrorCode code = sendValid(LocalPacketFactory.STOPPACKET);
224                 switch (code) {
225                     case CompleteOk:
226                         logger.info("Transfer stop requested and done: {}",runner);
227                         break;
228                     case TransferOk:
229                         logger.info("Transfer stop requested but already finished: {}",
230                                 runner);
231                         break;
232                     default:
233                         logger.info("Transfer stop requested but internal error: {}",
234                                 runner);
235                         break;
236                 }
237             } else if (restart) {
238                 // Restart if already stopped and not finished
239                 ErrorCode code = sendValid(LocalPacketFactory.VALIDPACKET);
240                 switch (code) {
241                     case QueryStillRunning:
242                         logger.info("Transfer restart requested but already active and running: {}",
243                                 runner);
244                         break;
245                     case Running:
246                         logger.info("Transfer restart requested but already running: {}",
247                                 runner);
248                         break;
249                     case PreProcessingOk:
250                         logger.info("Transfer restart requested and restarted: {}",
251                                 runner);
252                         break;
253                     case CompleteOk:
254                         logger.info("Transfer restart requested but already finished: {}",
255                                 runner);
256                         break;
257                     case RemoteError:
258                         logger.info("Transfer restart requested but remote error: {}",
259                                 runner);
260                         break;
261                     case PassThroughMode:
262                         logger.info("Transfer not restarted since it is in PassThrough mode: {}",
263                                 runner);
264                         break;
265                     default:
266                         logger.info("Transfer restart requested but internal error: {}",
267                                 runner);
268                         break;
269                 }
270             }
271         } else {
272             // Only request
273             logger.info("Transfer information:\n    "+runner.toShortString());
274             future.setResult(new R66Result(null,true,runner.getErrorInfo(),runner));
275             future.setSuccess();
276         }
277     }
278     /**
279      * Set the runner to DONE
280      * @param runner
281      */
282     private void setDone(DbTaskRunner runner) {
283         if (runner.getUpdatedInfo() != UpdatedInfo.DONE) {
284             runner.changeUpdatedInfo(UpdatedInfo.DONE);
285             try {
286                 runner.saveStatus();
287             } catch (OpenR66RunnerErrorException e) {
288             }
289         }
290     }
291     private ErrorCode sendValid(byte code) {
292         DbHostAuth host;
293         host = R66Auth.getServerAuth(DbConstant.admin.session,
294                     this.requester);
295         if (host == null) {
296             logger.error("Requested host cannot be found: "+this.requester);
297             OpenR66Exception e =
298                 new OpenR66RunnerErrorException("Requested host cannot be found");
299             future.setResult(new R66Result(
300                     e,
301                     null, true,
302                     ErrorCode.TransferError, null));
303             future.setFailure(e);
304             return ErrorCode.Internal;
305         }
306         SocketAddress socketAddress = host.getSocketAddress();
307         boolean isSSL = host.isSsl();
308 
309         LocalChannelReference localChannelReference = networkTransaction
310             .createConnectionWithRetry(socketAddress,isSSL,future);
311         socketAddress = null;
312         if (localChannelReference == null) {
313             logger.debug("Cannot connect to "+host.toString());
314             host = null;
315             future.setResult(new R66Result(null, true,
316                     ErrorCode.ConnectionImpossible, null));
317             future.cancel();
318             return ErrorCode.Internal;
319        }
320         ValidPacket packet = new ValidPacket("Request on Transfer",
321                 this.requested+" "+this.requester+" "+this.specialId,
322                 code);
323         localChannelReference.sessionNewState(R66FiniteDualStates.VALIDOTHER);
324         try {
325             ChannelUtils.writeAbstractLocalPacket(localChannelReference, packet, false);
326         } catch (OpenR66ProtocolPacketException e) {
327             logger.error("Cannot transfer request to "+host.toString());
328             Channels.close(localChannelReference.getLocalChannel());
329             localChannelReference = null;
330             host = null;
331             packet = null;
332             logger.debug("Bad Protocol", e);
333             future.setResult(new R66Result(e, null, true,
334                     ErrorCode.TransferError, null));
335             future.setFailure(e);
336             return ErrorCode.Internal;
337         }
338         packet = null;
339         host = null;
340         future.awaitUninterruptibly();
341         logger.info("Request done with "+(future.isSuccess()?"success":"error"));
342 
343         Channels.close(localChannelReference.getLocalChannel());
344         localChannelReference = null;
345         R66Result result = future.getResult();
346         if (result != null) {
347             return result.code;
348         }
349         return ErrorCode.Internal;
350     }
351     /**
352      * @param args
353      */
354     public static void main(String[] args) {
355         InternalLoggerFactory.setDefaultFactory(new GgSlf4JLoggerFactory(null));
356         if (logger == null) {
357             logger = GgInternalLoggerFactory.getLogger(RequestTransfer.class);
358         }
359         if (! getParams(args)) {
360             logger.error("Wrong initialization");
361             if (DbConstant.admin != null && DbConstant.admin.isConnected) {
362                 DbConstant.admin.close();
363             }
364             ChannelUtils.stopLogger();
365             System.exit(1);
366         }
367         int value = 99;
368         try {
369             Configuration.configuration.pipelineInit();
370             NetworkTransaction networkTransaction = new NetworkTransaction();
371             R66Future result = new R66Future(true);
372             RequestTransfer requestTransfer =
373                 new RequestTransfer(result, sspecialId, srequested, srequester,
374                         scancel, sstop, srestart,
375                         networkTransaction);
376             requestTransfer.run();
377             result.awaitUninterruptibly();
378             R66Result finalValue = result.getResult();
379             if (scancel || sstop || srestart) {
380                 if (scancel) {
381                     if (result.isSuccess()) {
382                         value = 0;
383                         logger.warn("Transfer already finished:\n    "+
384                                 finalValue.runner.toShortString());
385                     } else {
386                         switch (finalValue.code) {
387                             case CompleteOk:
388                                 value = 0;
389                                 logger.warn("Transfer cancel requested and done:\n    "+
390                                         finalValue.runner.toShortString());
391                                 break;
392                             case TransferOk:
393                                 value = 3;
394                                 logger.warn("Transfer cancel requested but already finished:\n    "+
395                                         finalValue.runner.toShortString());
396                                 break;
397                             default:
398                                 value = 4;
399                                 logger.error("Transfer cancel requested but internal error:\n    "+
400                                         finalValue.runner.toShortString());
401                                 break;
402                         }
403                     }
404                 } else if (sstop) {
405                     switch (finalValue.code) {
406                         case CompleteOk:
407                             value = 0;
408                             logger.warn("Transfer stop requested and done:\n    "+
409                                 finalValue.runner.toShortString());
410                             break;
411                         case TransferOk:
412                             value = 0;
413                             logger.warn("Transfer stop requested but already finished:\n    "+
414                                 finalValue.runner.toShortString());
415                             break;
416                         default:
417                             value = 3;
418                             logger.error("Transfer stop requested but internal error:\n    "+
419                                 finalValue.runner.toShortString());
420                             break;
421                     }
422                 } else if (srestart) {
423                     switch (finalValue.code) {
424                         case QueryStillRunning:
425                             value = 0;
426                             logger.warn("Transfer restart requested but already active and running:\n    "+
427                                 finalValue.runner.toShortString());
428                             break;
429                         case Running:
430                             value = 0;
431                             logger.warn("Transfer restart requested but already running:\n    "+
432                                 finalValue.runner.toShortString());
433                             break;
434                         case PreProcessingOk:
435                             value = 0;
436                             logger.warn("Transfer restart requested and restarted:\n    "+
437                                 finalValue.runner.toShortString());
438                             break;
439                         case CompleteOk:
440                             value = 4;
441                             logger.warn("Transfer restart requested but already finished:\n    "+
442                                 finalValue.runner.toShortString());
443                             break;
444                         case RemoteError:
445                             value = 5;
446                             logger.error("Transfer restart requested but remote error:\n    "+
447                                 finalValue.runner.toShortString());
448                             break;
449                         case PassThroughMode:
450                             value = 6;
451                             logger.warn("Transfer not restarted since it is in PassThrough mode:\n    "+
452                                 finalValue.runner.toShortString());
453                             break;
454                         default:
455                             value = 3;
456                             logger.error("Transfer restart requested but internal error:\n    "+
457                                 finalValue.runner.toShortString());
458                             break;
459                     }
460                 }
461             } else {
462                 value = 0;
463                 // Only request
464                 logger.warn("Transfer information:\n    "+
465                                 finalValue.runner.toShortString());
466             }
467         } finally {
468             if (DbConstant.admin != null) {
469                 DbConstant.admin.close();
470             }
471             System.exit(value);
472         }
473     }
474 
475 }