1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.client;
22
23 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24 import goldengate.common.database.exception.GoldenGateDatabaseException;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27 import goldengate.common.logging.GgSlf4JLoggerFactory;
28
29 import java.net.SocketAddress;
30
31 import openr66.configuration.FileBasedConfiguration;
32 import openr66.context.ErrorCode;
33 import openr66.context.R66FiniteDualStates;
34 import openr66.context.R66Result;
35 import openr66.context.authentication.R66Auth;
36 import openr66.context.task.exception.OpenR66RunnerErrorException;
37 import openr66.database.DbConstant;
38 import openr66.database.data.DbHostAuth;
39 import openr66.database.data.DbTaskRunner;
40 import openr66.protocol.configuration.Configuration;
41 import openr66.protocol.exception.OpenR66DatabaseGlobalException;
42 import openr66.protocol.exception.OpenR66Exception;
43 import openr66.protocol.exception.OpenR66ProtocolPacketException;
44 import openr66.protocol.localhandler.LocalChannelReference;
45 import openr66.protocol.localhandler.packet.LocalPacketFactory;
46 import openr66.protocol.localhandler.packet.ValidPacket;
47 import openr66.protocol.networkhandler.NetworkTransaction;
48 import openr66.protocol.utils.ChannelUtils;
49 import openr66.protocol.utils.R66Future;
50
51 import org.jboss.netty.channel.Channels;
52 import org.jboss.netty.logging.InternalLoggerFactory;
53
54
55
56
57
58
59
60 public class RequestTransfer implements Runnable {
61
62
63
64 static volatile GgInternalLogger logger;
65
66 protected final NetworkTransaction networkTransaction;
67 final R66Future future;
68 final long specialId;
69 String requested = null;
70 String requester = null;
71 boolean cancel = false;
72 boolean stop = false;
73 boolean restart = false;
74
75 static long sspecialId;
76 static String srequested = null;
77 static String srequester = null;
78 static boolean scancel = false;
79 static boolean sstop = false;
80 static boolean srestart = false;
81
82
83
84
85
86
87 protected static boolean getParams(String []args) {
88 if (args.length < 5) {
89 logger
90 .error("Needs at least 5 arguments:\n" +
91 " the XML client configuration file,\n" +
92 " '-id' the transfer Id,\n" +
93 " '-to' the requested host Id or '-from' the requester host Id " +
94 "(localhost will be the opposite),\n" +
95 "Other options (only one):\n" +
96 " '-cancel' to cancel completely the transfer,\n" +
97 " '-stop' to stop the transfer (maybe restarted),\n" +
98 " '-restart' to restart if possible a transfer");
99 return false;
100 }
101 if (! FileBasedConfiguration
102 .setClientConfigurationFromXml(Configuration.configuration, args[0])) {
103 logger
104 .error("Needs a correct configuration file as first argument");
105 return false;
106 }
107 for (int i = 1; i < args.length; i++) {
108 if (args[i].equalsIgnoreCase("-id")) {
109 i++;
110 sspecialId = Long.parseLong(args[i]);
111 } else if (args[i].equalsIgnoreCase("-to")) {
112 i++;
113 srequested = args[i];
114 try {
115 srequester = Configuration.configuration.getHostId(DbConstant.admin.session,
116 srequested);
117 } catch (GoldenGateDatabaseException e) {
118 logger.error("Cannot get Host Id: "+srequester,e);
119 return false;
120 }
121 } else if (args[i].equalsIgnoreCase("-from")) {
122 i++;
123 srequester = args[i];
124 try {
125 srequested = Configuration.configuration.getHostId(DbConstant.admin.session,
126 srequester);
127 } catch (GoldenGateDatabaseException e) {
128 logger.error("Cannot get Host Id: "+srequested,e);
129 return false;
130 }
131 } else if (args[i].equalsIgnoreCase("-cancel")) {
132 scancel = true;
133 } else if (args[i].equalsIgnoreCase("-stop")) {
134 sstop = true;
135 } else if (args[i].equalsIgnoreCase("-restart")) {
136 srestart = true;
137 }
138 }
139 if ((scancel && srestart) || (scancel && sstop) || (srestart && sstop)) {
140 logger.error("Cannot cancel or restart or stop at the same time");
141 return false;
142 }
143 if (sspecialId == DbConstant.ILLEGALVALUE || srequested == null) {
144 logger.error("TransferId and Requested/Requester HostId must be set");
145 return false;
146 }
147
148 return true;
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162 public RequestTransfer(R66Future future, long specialId, String requested, String requester,
163 boolean cancel, boolean stop, boolean restart,
164 NetworkTransaction networkTransaction) {
165 this.future = future;
166 this.specialId = specialId;
167 this.requested = requested;
168 this.requester = requester;
169 this.cancel = cancel;
170 this.stop = stop;
171 this.restart = restart;
172 this.networkTransaction = networkTransaction;
173 }
174
175
176 public void run() {
177 if (logger == null) {
178 logger = GgInternalLoggerFactory.getLogger(RequestTransfer.class);
179 }
180 DbTaskRunner runner = null;
181 try {
182 runner = new DbTaskRunner(DbConstant.admin.session,null,null,
183 specialId,requester,requested);
184 } catch (GoldenGateDatabaseException e) {
185 logger.error("Cannot find the transfer");
186 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
187 ErrorCode.Internal, null));
188 future.setFailure(e);
189 return;
190 }
191 if (cancel || stop || restart) {
192 if (cancel) {
193
194 if (runner.isAllDone()) {
195
196 setDone(runner);
197 logger.info("Transfer already finished: "+runner.toString());
198 future.setResult(new R66Result(null,true,ErrorCode.TransferOk, runner));
199 future.getResult().runner = runner;
200 future.setSuccess();
201 return;
202 } else {
203
204 ErrorCode code = sendValid(LocalPacketFactory.CANCELPACKET);
205 switch (code) {
206 case CompleteOk:
207 logger.info("Transfer cancel requested and done: {}",
208 runner);
209 break;
210 case TransferOk:
211 logger.info("Transfer cancel requested but already finished: {}",
212 runner);
213 break;
214 default:
215 logger.info("Transfer cancel requested but internal error: {}",
216 runner);
217 break;
218 }
219 }
220 } else if (stop) {
221
222
223 ErrorCode code = sendValid(LocalPacketFactory.STOPPACKET);
224 switch (code) {
225 case CompleteOk:
226 logger.info("Transfer stop requested and done: {}",runner);
227 break;
228 case TransferOk:
229 logger.info("Transfer stop requested but already finished: {}",
230 runner);
231 break;
232 default:
233 logger.info("Transfer stop requested but internal error: {}",
234 runner);
235 break;
236 }
237 } else if (restart) {
238
239 ErrorCode code = sendValid(LocalPacketFactory.VALIDPACKET);
240 switch (code) {
241 case QueryStillRunning:
242 logger.info("Transfer restart requested but already active and running: {}",
243 runner);
244 break;
245 case Running:
246 logger.info("Transfer restart requested but already running: {}",
247 runner);
248 break;
249 case PreProcessingOk:
250 logger.info("Transfer restart requested and restarted: {}",
251 runner);
252 break;
253 case CompleteOk:
254 logger.info("Transfer restart requested but already finished: {}",
255 runner);
256 break;
257 case RemoteError:
258 logger.info("Transfer restart requested but remote error: {}",
259 runner);
260 break;
261 case PassThroughMode:
262 logger.info("Transfer not restarted since it is in PassThrough mode: {}",
263 runner);
264 break;
265 default:
266 logger.info("Transfer restart requested but internal error: {}",
267 runner);
268 break;
269 }
270 }
271 } else {
272
273 logger.info("Transfer information:\n "+runner.toShortString());
274 future.setResult(new R66Result(null,true,runner.getErrorInfo(),runner));
275 future.setSuccess();
276 }
277 }
278
279
280
281
282 private void setDone(DbTaskRunner runner) {
283 if (runner.getUpdatedInfo() != UpdatedInfo.DONE) {
284 runner.changeUpdatedInfo(UpdatedInfo.DONE);
285 try {
286 runner.saveStatus();
287 } catch (OpenR66RunnerErrorException e) {
288 }
289 }
290 }
291 private ErrorCode sendValid(byte code) {
292 DbHostAuth host;
293 host = R66Auth.getServerAuth(DbConstant.admin.session,
294 this.requester);
295 if (host == null) {
296 logger.error("Requested host cannot be found: "+this.requester);
297 OpenR66Exception e =
298 new OpenR66RunnerErrorException("Requested host cannot be found");
299 future.setResult(new R66Result(
300 e,
301 null, true,
302 ErrorCode.TransferError, null));
303 future.setFailure(e);
304 return ErrorCode.Internal;
305 }
306 SocketAddress socketAddress = host.getSocketAddress();
307 boolean isSSL = host.isSsl();
308
309 LocalChannelReference localChannelReference = networkTransaction
310 .createConnectionWithRetry(socketAddress,isSSL,future);
311 socketAddress = null;
312 if (localChannelReference == null) {
313 logger.debug("Cannot connect to "+host.toString());
314 host = null;
315 future.setResult(new R66Result(null, true,
316 ErrorCode.ConnectionImpossible, null));
317 future.cancel();
318 return ErrorCode.Internal;
319 }
320 ValidPacket packet = new ValidPacket("Request on Transfer",
321 this.requested+" "+this.requester+" "+this.specialId,
322 code);
323 localChannelReference.sessionNewState(R66FiniteDualStates.VALIDOTHER);
324 try {
325 ChannelUtils.writeAbstractLocalPacket(localChannelReference, packet, false);
326 } catch (OpenR66ProtocolPacketException e) {
327 logger.error("Cannot transfer request to "+host.toString());
328 Channels.close(localChannelReference.getLocalChannel());
329 localChannelReference = null;
330 host = null;
331 packet = null;
332 logger.debug("Bad Protocol", e);
333 future.setResult(new R66Result(e, null, true,
334 ErrorCode.TransferError, null));
335 future.setFailure(e);
336 return ErrorCode.Internal;
337 }
338 packet = null;
339 host = null;
340 future.awaitUninterruptibly();
341 logger.info("Request done with "+(future.isSuccess()?"success":"error"));
342
343 Channels.close(localChannelReference.getLocalChannel());
344 localChannelReference = null;
345 R66Result result = future.getResult();
346 if (result != null) {
347 return result.code;
348 }
349 return ErrorCode.Internal;
350 }
351
352
353
354 public static void main(String[] args) {
355 InternalLoggerFactory.setDefaultFactory(new GgSlf4JLoggerFactory(null));
356 if (logger == null) {
357 logger = GgInternalLoggerFactory.getLogger(RequestTransfer.class);
358 }
359 if (! getParams(args)) {
360 logger.error("Wrong initialization");
361 if (DbConstant.admin != null && DbConstant.admin.isConnected) {
362 DbConstant.admin.close();
363 }
364 ChannelUtils.stopLogger();
365 System.exit(1);
366 }
367 int value = 99;
368 try {
369 Configuration.configuration.pipelineInit();
370 NetworkTransaction networkTransaction = new NetworkTransaction();
371 R66Future result = new R66Future(true);
372 RequestTransfer requestTransfer =
373 new RequestTransfer(result, sspecialId, srequested, srequester,
374 scancel, sstop, srestart,
375 networkTransaction);
376 requestTransfer.run();
377 result.awaitUninterruptibly();
378 R66Result finalValue = result.getResult();
379 if (scancel || sstop || srestart) {
380 if (scancel) {
381 if (result.isSuccess()) {
382 value = 0;
383 logger.warn("Transfer already finished:\n "+
384 finalValue.runner.toShortString());
385 } else {
386 switch (finalValue.code) {
387 case CompleteOk:
388 value = 0;
389 logger.warn("Transfer cancel requested and done:\n "+
390 finalValue.runner.toShortString());
391 break;
392 case TransferOk:
393 value = 3;
394 logger.warn("Transfer cancel requested but already finished:\n "+
395 finalValue.runner.toShortString());
396 break;
397 default:
398 value = 4;
399 logger.error("Transfer cancel requested but internal error:\n "+
400 finalValue.runner.toShortString());
401 break;
402 }
403 }
404 } else if (sstop) {
405 switch (finalValue.code) {
406 case CompleteOk:
407 value = 0;
408 logger.warn("Transfer stop requested and done:\n "+
409 finalValue.runner.toShortString());
410 break;
411 case TransferOk:
412 value = 0;
413 logger.warn("Transfer stop requested but already finished:\n "+
414 finalValue.runner.toShortString());
415 break;
416 default:
417 value = 3;
418 logger.error("Transfer stop requested but internal error:\n "+
419 finalValue.runner.toShortString());
420 break;
421 }
422 } else if (srestart) {
423 switch (finalValue.code) {
424 case QueryStillRunning:
425 value = 0;
426 logger.warn("Transfer restart requested but already active and running:\n "+
427 finalValue.runner.toShortString());
428 break;
429 case Running:
430 value = 0;
431 logger.warn("Transfer restart requested but already running:\n "+
432 finalValue.runner.toShortString());
433 break;
434 case PreProcessingOk:
435 value = 0;
436 logger.warn("Transfer restart requested and restarted:\n "+
437 finalValue.runner.toShortString());
438 break;
439 case CompleteOk:
440 value = 4;
441 logger.warn("Transfer restart requested but already finished:\n "+
442 finalValue.runner.toShortString());
443 break;
444 case RemoteError:
445 value = 5;
446 logger.error("Transfer restart requested but remote error:\n "+
447 finalValue.runner.toShortString());
448 break;
449 case PassThroughMode:
450 value = 6;
451 logger.warn("Transfer not restarted since it is in PassThrough mode:\n "+
452 finalValue.runner.toShortString());
453 break;
454 default:
455 value = 3;
456 logger.error("Transfer restart requested but internal error:\n "+
457 finalValue.runner.toShortString());
458 break;
459 }
460 }
461 } else {
462 value = 0;
463
464 logger.warn("Transfer information:\n "+
465 finalValue.runner.toShortString());
466 }
467 } finally {
468 if (DbConstant.admin != null) {
469 DbConstant.admin.close();
470 }
471 System.exit(value);
472 }
473 }
474
475 }