1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package openr66.commander;
21
22 import goldengate.common.database.data.AbstractDbData;
23 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
24 import goldengate.common.database.exception.GoldenGateDatabaseException;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27
28 import java.net.SocketAddress;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31
32 import openr66.client.RecvThroughHandler;
33 import openr66.context.ErrorCode;
34 import openr66.context.R66FiniteDualStates;
35 import openr66.context.R66Result;
36 import openr66.context.authentication.R66Auth;
37 import openr66.context.task.exception.OpenR66RunnerErrorException;
38 import openr66.database.DbConstant;
39 import openr66.database.data.DbHostAuth;
40 import openr66.database.data.DbTaskRunner;
41 import openr66.database.data.DbTaskRunner.TASKSTEP;
42 import openr66.protocol.configuration.Configuration;
43 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
44 import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
45 import openr66.protocol.exception.OpenR66ProtocolPacketException;
46 import openr66.protocol.localhandler.LocalChannelReference;
47 import openr66.protocol.localhandler.packet.RequestPacket;
48 import openr66.protocol.networkhandler.NetworkTransaction;
49 import openr66.protocol.utils.ChannelUtils;
50 import openr66.protocol.utils.R66Future;
51 import openr66.protocol.utils.TransferUtils;
52
53 import org.jboss.netty.channel.Channels;
54
55
56
57
58
59
60
61 public class ClientRunner extends Thread {
62
63
64
65 private static final GgInternalLogger logger = GgInternalLoggerFactory
66 .getLogger(ClientRunner.class);
67
68 private static final ConcurrentHashMap<String, Integer> taskRunnerRetryHashMap = new ConcurrentHashMap<String, Integer>();
69
70 public static ConcurrentLinkedQueue<ClientRunner> activeRunners = null;
71
72 private final NetworkTransaction networkTransaction;
73
74 private final DbTaskRunner taskRunner;
75
76 private final R66Future futureRequest;
77
78 private RecvThroughHandler handler = null;
79
80 private boolean isSendThroughMode = false;
81
82 private LocalChannelReference localChannelReference = null;
83
84 public ClientRunner(NetworkTransaction networkTransaction,
85 DbTaskRunner taskRunner, R66Future futureRequest) {
86 this.networkTransaction = networkTransaction;
87 this.taskRunner = taskRunner;
88 this.futureRequest = futureRequest;
89 }
90
91
92
93
94 public NetworkTransaction getNetworkTransaction() {
95 return networkTransaction;
96 }
97
98
99
100
101 public DbTaskRunner getTaskRunner() {
102 return taskRunner;
103 }
104
105
106
107
108 public LocalChannelReference getLocalChannelReference() {
109 return localChannelReference;
110 }
111
112
113
114
115
116
117 @Override
118 public void run() {
119 if (Configuration.configuration.isShutdown) {
120 taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
121 try {
122 taskRunner.update();
123 } catch (GoldenGateDatabaseException e) {
124 }
125 return;
126 }
127 try {
128 if (activeRunners != null) {
129 activeRunners.add(this);
130 }
131 R66Future transfer;
132 try {
133 transfer = this.runTransfer();
134 } catch (OpenR66RunnerErrorException e) {
135 logger.error("Runner Error: {} {}", e.getMessage(),
136 taskRunner.toShortString());
137 return;
138 } catch (OpenR66ProtocolNoConnectionException e) {
139 logger.error("No connection Error {}", e.getMessage());
140 if (localChannelReference != null) {
141 localChannelReference.setErrorMessage(
142 ErrorCode.ConnectionImpossible.mesg,
143 ErrorCode.ConnectionImpossible);
144 }
145 taskRunner.setErrorTask(localChannelReference);
146 try {
147 taskRunner.saveStatus();
148 taskRunner.run();
149 } catch (OpenR66RunnerErrorException e1) {
150 this.changeUpdatedInfo(UpdatedInfo.INERROR,
151 ErrorCode.ConnectionImpossible);
152 }
153 return;
154 } catch (OpenR66ProtocolPacketException e) {
155 logger.error("Protocol Error", e);
156 return;
157 } catch (OpenR66ProtocolNotYetConnectionException e) {
158 logger.warn("No connection warning {}", e.getMessage());
159 return;
160 }
161 R66Result result = transfer.getResult();
162 if (result != null) {
163 if (result.code == ErrorCode.QueryAlreadyFinished) {
164 logger.warn("TRANSFER RESULT:\n " +
165 (transfer.isSuccess()? "SUCCESS" : "FAILURE") +
166 "\n " + ErrorCode.QueryAlreadyFinished.mesg +
167 ":" +
168 (result != null? result.toString() : "no result"));
169 } else {
170 if (transfer.isSuccess()) {
171 logger.info("TRANSFER RESULT:\n SUCCESS\n " +
172 (result != null? result.toString()
173 : "no result"));
174 } else {
175 logger.error("TRANSFER RESULT:\n FAILURE\n " +
176 (result != null? result.toString()
177 : "no result"));
178 }
179 }
180 } else {
181 if (transfer.isSuccess()) {
182 logger.warn("TRANSFER REQUESTED RESULT:\n SUCCESS\n no result");
183 } else {
184 logger.error("TRANSFER REQUESTED RESULT:\n FAILURE\n no result");
185 }
186 }
187 transfer = null;
188 Thread.currentThread().setName(
189 "Finished_" + Thread.currentThread().getName());
190 } finally {
191 if (activeRunners != null) {
192 activeRunners.remove(this);
193 }
194 }
195 }
196
197
198
199
200
201
202
203 public boolean incrementTaskRunerTry(DbTaskRunner runner, int limit) {
204 String key = runner.getKey();
205 Integer tries = taskRunnerRetryHashMap.get(key);
206 logger.debug("try to find integer: " + tries);
207 if (tries == null) {
208 tries = new Integer(1);
209 } else {
210 tries = tries + 1;
211 }
212 if (limit <= tries) {
213 taskRunnerRetryHashMap.remove(key);
214 return false;
215 } else {
216 taskRunnerRetryHashMap.put(key, tries);
217 return true;
218 }
219 }
220
221
222
223
224
225
226
227
228
229
230
231 public R66Future runTransfer() throws OpenR66RunnerErrorException,
232 OpenR66ProtocolNoConnectionException,
233 OpenR66ProtocolPacketException,
234 OpenR66ProtocolNotYetConnectionException {
235 logger.debug("Start attempt Transfer");
236 localChannelReference = initRequest();
237 try {
238 localChannelReference.getFutureValidRequest().await();
239 } catch (InterruptedException e) {
240 }
241 if (localChannelReference.getFutureValidRequest().isSuccess()) {
242 return finishTransfer(true, localChannelReference);
243 } else if (localChannelReference.getFutureValidRequest().getResult().code == ErrorCode.ServerOverloaded) {
244 return tryAgainTransferOnOverloaded(true, localChannelReference);
245 } else
246 return finishTransfer(true, localChannelReference);
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262 public R66Future tryAgainTransferOnOverloaded(boolean retry,
263 LocalChannelReference localChannelReference)
264 throws OpenR66RunnerErrorException,
265 OpenR66ProtocolNoConnectionException,
266 OpenR66ProtocolPacketException,
267 OpenR66ProtocolNotYetConnectionException {
268 if (this.localChannelReference == null) {
269 this.localChannelReference = localChannelReference;
270 }
271 boolean incRetry = incrementTaskRunerTry(taskRunner,
272 Configuration.RETRYNB);
273 logger.debug("tryAgainTransferOnOverloaded: " + retry + ":" + incRetry);
274 switch (taskRunner.getUpdatedInfo()) {
275 case DONE:
276 case INERROR:
277 case INTERRUPTED:
278 break;
279 default:
280 this.changeUpdatedInfo(UpdatedInfo.INERROR,
281 ErrorCode.ServerOverloaded);
282 }
283
284 if (retry && incRetry) {
285 try {
286 Thread.sleep(Configuration.configuration.constraintLimitHandler
287 .getSleepTime());
288 } catch (InterruptedException e) {
289 }
290 return runTransfer();
291 } else {
292 if (localChannelReference == null) {
293 taskRunner
294 .setLocalChannelReference(new LocalChannelReference());
295 }
296 taskRunner.getLocalChannelReference().setErrorMessage(
297 ErrorCode.ConnectionImpossible.mesg,
298 ErrorCode.ConnectionImpossible);
299 this.taskRunner.setErrorTask(localChannelReference);
300 this.taskRunner.run();
301 throw new OpenR66ProtocolNoConnectionException(
302 "End of retry on ServerOverloaded");
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 public R66Future finishTransfer(boolean retry,
320 LocalChannelReference localChannelReference)
321 throws OpenR66RunnerErrorException {
322 if (this.localChannelReference == null) {
323 this.localChannelReference = localChannelReference;
324 }
325 R66Future transfer = localChannelReference.getFutureRequest();
326 try {
327 transfer.await();
328 } catch (InterruptedException e1) {
329 }
330 taskRunnerRetryHashMap.remove(taskRunner.getKey());
331 logger.info("Request done with {}", (transfer.isSuccess()? "success"
332 : "error"));
333 Channels.close(localChannelReference.getLocalChannel());
334
335 if (transfer.isSuccess()) {
336 try {
337 taskRunner.select();
338 } catch (GoldenGateDatabaseException e) {
339 logger.debug("Not a problem but cannot find at the end the task");
340 taskRunner.setFrom(transfer.runner);
341 }
342 this.changeUpdatedInfo(UpdatedInfo.DONE, ErrorCode.CompleteOk);
343 } else {
344 try {
345 taskRunner.select();
346 } catch (GoldenGateDatabaseException e) {
347 logger.debug("Not a problem but cannot find at the end the task");
348 taskRunner.setFrom(transfer.runner);
349 }
350
351 if (transfer.getResult() == null) {
352 switch (taskRunner.getUpdatedInfo()) {
353 case DONE:
354 R66Result ok = new R66Result(null, true,
355 ErrorCode.CompleteOk, taskRunner);
356 transfer.setResult(ok);
357 transfer.setSuccess();
358 this.changeUpdatedInfo(UpdatedInfo.DONE,
359 ErrorCode.CompleteOk);
360 break;
361 case INERROR:
362 case INTERRUPTED:
363 default:
364 R66Result error = new R66Result(null, true,
365 ErrorCode.Internal, taskRunner);
366 transfer.setResult(error);
367 transfer.cancel();
368 this.changeUpdatedInfo(UpdatedInfo.INERROR,
369 ErrorCode.Internal);
370 }
371 return transfer;
372 }
373 if (transfer.getResult().code == ErrorCode.QueryAlreadyFinished) {
374
375 logger.warn("WARN QueryAlreadyFinished:\n " +
376 transfer.toString() + "\n " +
377 taskRunner.toShortString());
378 try {
379 TransferUtils.finalizeTaskWithNoSession(taskRunner,
380 localChannelReference);
381 } catch (OpenR66RunnerErrorException e) {
382 this.taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
383 try {
384 this.taskRunner.update();
385 } catch (GoldenGateDatabaseException e1) {
386 }
387 }
388 } else {
389 switch (taskRunner.getUpdatedInfo()) {
390 case DONE:
391 case INERROR:
392 case INTERRUPTED:
393 break;
394 default:
395 this.changeUpdatedInfo(UpdatedInfo.INERROR,
396 transfer.getResult().code);
397 }
398 }
399 }
400 return transfer;
401 }
402
403
404
405
406
407
408
409
410
411
412 public LocalChannelReference initRequest()
413 throws OpenR66ProtocolNoConnectionException,
414 OpenR66RunnerErrorException, OpenR66ProtocolPacketException,
415 OpenR66ProtocolNotYetConnectionException {
416 this.changeUpdatedInfo(UpdatedInfo.RUNNING, ErrorCode.Running);
417 long id = taskRunner.getSpecialId();
418 String tid;
419 if (id == DbConstant.ILLEGALVALUE) {
420 tid = taskRunner.getRuleId() + "_" + taskRunner.getMode() +
421 "_NEWTRANSFER";
422 } else {
423 tid = taskRunner.getRuleId() + "_" + taskRunner.getMode() + "_" +
424 id;
425 }
426 Thread.currentThread().setName(tid);
427 logger.debug("Will run {}", this.taskRunner);
428 boolean restartPost = false;
429 if (taskRunner.getGloballaststep() == TASKSTEP.POSTTASK.ordinal()) {
430
431 if (!taskRunner.isSelfRequested()) {
432
433 restartPost = true;
434 }
435 }
436 if (taskRunner.isSelfRequested()) {
437
438 logger.warn("Requested host cannot initiate itself the request");
439 this.changeUpdatedInfo(UpdatedInfo.INERROR,
440 ErrorCode.LoopSelfRequestedHost);
441 throw new OpenR66ProtocolNoConnectionException(
442 "Requested host cannot initiate itself the request");
443 }
444 DbHostAuth host = R66Auth.getServerAuth(DbConstant.admin.session,
445 taskRunner.getRequested());
446 if (host == null) {
447 logger.error("Requested host cannot be found: " +
448 taskRunner.getRequested());
449 this.changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.NotKnownHost);
450 throw new OpenR66ProtocolNoConnectionException(
451 "Requested host cannot be found " +
452 taskRunner.getRequested());
453 }
454 if (host.isClient()) {
455 logger.debug("Cannot initiate a connection with a client: {}", host);
456 this.changeUpdatedInfo(UpdatedInfo.INERROR,
457 ErrorCode.ConnectionImpossible);
458 throw new OpenR66ProtocolNoConnectionException(
459 "Cannot connect to client " + host.toString());
460 }
461 SocketAddress socketAddress = host.getSocketAddress();
462 boolean isSSL = host.isSsl();
463
464 LocalChannelReference localChannelReference = networkTransaction
465 .createConnectionWithRetry(socketAddress, isSSL, futureRequest);
466 taskRunner.setLocalChannelReference(localChannelReference);
467 socketAddress = null;
468 if (localChannelReference == null) {
469
470
471 String retry;
472 if (incrementTaskRunerTry(taskRunner, Configuration.RETRYNB)) {
473 logger.debug("Will retry since Cannot connect to {}", host);
474 retry = " but will retry";
475
476 try {
477 Thread.sleep(Configuration.configuration.delayRetry);
478 } catch (InterruptedException e) {
479 }
480 this.changeUpdatedInfo(UpdatedInfo.TOSUBMIT,
481 ErrorCode.ConnectionImpossible);
482 throw new OpenR66ProtocolNotYetConnectionException(
483 "Cannot connect to server " + host.toString() + retry);
484 } else {
485 logger.debug(
486 "Will not retry since limit of connection attemtps is reached for {}",
487 host);
488 retry = " and retries limit is reached so stop here";
489 this.changeUpdatedInfo(UpdatedInfo.INERROR,
490 ErrorCode.ConnectionImpossible);
491 taskRunner
492 .setLocalChannelReference(new LocalChannelReference());
493 throw new OpenR66ProtocolNoConnectionException(
494 "Cannot connect to server " + host.toString() + retry);
495 }
496 }
497 if (handler != null) {
498 localChannelReference.setRecvThroughHandler(handler);
499 }
500 localChannelReference.setSendThroughMode(isSendThroughMode);
501 if (restartPost) {
502 RequestPacket request = taskRunner.getRequest();
503 logger.debug("Will send request {} ", request);
504 localChannelReference.setClientRunner(this);
505 localChannelReference.sessionNewState(R66FiniteDualStates.REQUESTR);
506 try {
507 ChannelUtils.writeAbstractLocalPacket(localChannelReference,
508 request, false);
509 } catch (OpenR66ProtocolPacketException e) {
510
511 logger.warn("Cannot transfer request to " + host.toString());
512 this.changeUpdatedInfo(UpdatedInfo.INTERRUPTED,
513 ErrorCode.Internal);
514 Channels.close(localChannelReference.getLocalChannel());
515 localChannelReference = null;
516 host = null;
517 request = null;
518 throw e;
519 }
520 logger.debug("Wait for request to {}", host);
521 request = null;
522 host = null;
523 return localChannelReference;
524 }
525
526
527 if (!taskRunner.isSender() &&
528 (taskRunner.getGloballaststep() == TASKSTEP.TRANSFERTASK
529 .ordinal())) {
530 logger.debug(
531 "Requester is not Sender so decrease if possible the rank {}",
532 taskRunner);
533 taskRunner.restartRank();
534 taskRunner.saveStatus();
535 logger.debug(
536 "Requester is not Sender so new rank is " +
537 taskRunner.getRank() + " {}", taskRunner);
538 }
539 RequestPacket request = taskRunner.getRequest();
540 logger.debug("Will send request {} {}", request, localChannelReference);
541 localChannelReference.setClientRunner(this);
542 localChannelReference.sessionNewState(R66FiniteDualStates.REQUESTR);
543 try {
544 ChannelUtils.writeAbstractLocalPacket(localChannelReference,
545 request, false);
546 } catch (OpenR66ProtocolPacketException e) {
547
548 logger.warn("Cannot transfer request to " + host.toString());
549 this.changeUpdatedInfo(UpdatedInfo.INTERRUPTED, ErrorCode.Internal);
550 Channels.close(localChannelReference.getLocalChannel());
551 localChannelReference = null;
552 host = null;
553 request = null;
554 throw e;
555 }
556 logger.debug("Wait for request to {}", host);
557 request = null;
558 host = null;
559 return localChannelReference;
560 }
561
562
563
564
565
566
567 public void changeUpdatedInfo(AbstractDbData.UpdatedInfo info,
568 ErrorCode code) {
569 this.taskRunner.changeUpdatedInfo(info);
570 this.taskRunner.setErrorExecutionStatus(code);
571 try {
572 this.taskRunner.update();
573 } catch (GoldenGateDatabaseException e) {
574 }
575 }
576
577
578
579
580
581 public void setRecvThroughHandler(RecvThroughHandler handler) {
582 this.handler = handler;
583 }
584 public void setSendThroughMode() {
585 isSendThroughMode = true;
586 }
587 public boolean getSendThroughMode() {
588 return isSendThroughMode;
589 }
590 }