1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package openr66.protocol.localhandler;
21
22 import goldengate.common.logging.GgInternalLogger;
23 import goldengate.common.logging.GgInternalLoggerFactory;
24
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import openr66.context.ErrorCode;
32 import openr66.context.R66FiniteDualStates;
33 import openr66.context.R66Result;
34 import openr66.context.R66Session;
35 import openr66.context.task.exception.OpenR66RunnerErrorException;
36 import openr66.database.data.DbTaskRunner;
37 import openr66.protocol.configuration.Configuration;
38 import openr66.protocol.exception.OpenR66ProtocolPacketException;
39 import openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
40 import openr66.protocol.exception.OpenR66ProtocolShutdownException;
41 import openr66.protocol.exception.OpenR66ProtocolSystemException;
42 import openr66.protocol.localhandler.packet.LocalPacketFactory;
43 import openr66.protocol.localhandler.packet.StartupPacket;
44 import openr66.protocol.localhandler.packet.ValidPacket;
45 import openr66.protocol.networkhandler.NetworkTransaction;
46 import openr66.protocol.networkhandler.packet.NetworkPacket;
47 import openr66.protocol.utils.R66Future;
48
49 import org.jboss.netty.bootstrap.ClientBootstrap;
50 import org.jboss.netty.bootstrap.ServerBootstrap;
51 import org.jboss.netty.buffer.ChannelBuffer;
52 import org.jboss.netty.channel.Channel;
53 import org.jboss.netty.channel.ChannelFactory;
54 import org.jboss.netty.channel.ChannelFuture;
55 import org.jboss.netty.channel.ChannelFutureListener;
56 import org.jboss.netty.channel.Channels;
57 import org.jboss.netty.channel.group.ChannelGroup;
58 import org.jboss.netty.channel.group.DefaultChannelGroup;
59 import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
60 import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
61 import org.jboss.netty.channel.local.LocalAddress;
62 import org.jboss.netty.util.Timeout;
63 import org.jboss.netty.util.TimerTask;
64
65
66
67
68
69
70 public class LocalTransaction {
71
72
73
74 private static final GgInternalLogger logger = GgInternalLoggerFactory
75 .getLogger(LocalTransaction.class);
76
77
78
79
80 final ConcurrentHashMap<Integer, LocalChannelReference> localChannelHashMap = new ConcurrentHashMap<Integer, LocalChannelReference>();
81
82
83
84
85 final ConcurrentHashMap<Integer, R66Future> validLocalChannelHashMap = new ConcurrentHashMap<Integer, R66Future>();
86
87
88
89
90 final ConcurrentHashMap<String, LocalChannelReference> localChannelHashMapExternal = new ConcurrentHashMap<String, LocalChannelReference>();
91
92
93
94
95 private final ChannelFutureListener remover = new ChannelFutureListener() {
96 public void operationComplete(ChannelFuture future) {
97 remove(future.getChannel());
98 }
99 };
100
101 private final ChannelFactory channelServerFactory = new DefaultLocalServerChannelFactory();
102
103 private final ServerBootstrap serverBootstrap = new ServerBootstrap(
104 channelServerFactory);
105
106 private final Channel serverChannel;
107
108 private final LocalAddress socketLocalServerAddress = new LocalAddress("0");
109
110 private final ChannelFactory channelClientFactory = new DefaultLocalClientChannelFactory();
111
112 private final ClientBootstrap clientBootstrap = new ClientBootstrap(
113 channelClientFactory);
114
115 private final ChannelGroup localChannelGroup = new DefaultChannelGroup(
116 "LocalChannels");
117
118
119
120
121 public LocalTransaction() {
122 serverBootstrap.setPipelineFactory(new LocalServerPipelineFactory());
123 serverBootstrap.setOption("connectTimeoutMillis",
124 Configuration.configuration.TIMEOUTCON);
125 serverChannel = serverBootstrap.bind(socketLocalServerAddress);
126 localChannelGroup.add(serverChannel);
127 clientBootstrap.setPipelineFactory(new LocalClientPipelineFactory());
128 }
129
130
131
132
133
134
135
136
137
138 public LocalChannelReference getClient(Integer remoteId, Integer localId)
139 throws OpenR66ProtocolSystemException {
140 LocalChannelReference localChannelReference = getFromId(localId);
141 if (localChannelReference != null) {
142 if (localChannelReference.getRemoteId() != remoteId) {
143 localChannelReference.setRemoteId(remoteId);
144 }
145 return localChannelReference;
146 }
147 throw new OpenR66ProtocolSystemException(
148 "Cannot find LocalChannelReference");
149 }
150
151
152
153
154
155
156
157
158
159
160 public LocalChannelReference createNewClient(Channel networkChannel,
161 Integer remoteId, R66Future futureRequest)
162 throws OpenR66ProtocolSystemException {
163 ChannelFuture channelFuture = null;
164 logger.debug("Status LocalChannelServer: {} {}", serverChannel
165 .getClass().getName(), serverChannel.getConfig()
166 .getConnectTimeoutMillis() + " " + serverChannel.isBound());
167 R66Future validLCR = new R66Future(true);
168 validLocalChannelHashMap.put(remoteId, validLCR);
169 for (int i = 0; i < Configuration.RETRYNB; i ++) {
170 channelFuture = clientBootstrap.connect(socketLocalServerAddress);
171 try {
172 channelFuture.await();
173 } catch (InterruptedException e1) {
174 validLCR.cancel();
175 validLocalChannelHashMap.remove(remoteId);
176 logger.error("LocalChannelServer Interrupted: " +
177 serverChannel.getClass().getName() + " " +
178 serverChannel.getConfig().getConnectTimeoutMillis() +
179 " " + serverChannel.isBound());
180 throw new OpenR66ProtocolSystemException(
181 "Interruption - Cannot connect to local handler: " +
182 socketLocalServerAddress + " " +
183 serverChannel.isBound() + " " + serverChannel,
184 e1);
185 }
186 if (channelFuture.isSuccess()) {
187 final Channel channel = channelFuture.getChannel();
188 localChannelGroup.add(channel);
189 final LocalChannelReference localChannelReference = new LocalChannelReference(
190 channel, networkChannel, remoteId, futureRequest);
191 logger.debug("Create LocalChannel entry: " + i + " {}",
192 localChannelReference);
193 channel.getCloseFuture().addListener(remover);
194 localChannelHashMap.put(channel.getId(), localChannelReference);
195 try {
196 NetworkTransaction.addLocalChannelToNetworkChannel(
197 networkChannel, channel);
198 } catch (OpenR66ProtocolRemoteShutdownException e) {
199 validLCR.cancel();
200 validLocalChannelHashMap.remove(remoteId);
201 Channels.close(channel);
202 throw new OpenR66ProtocolSystemException(
203 "Cannot connect to local handler", e);
204 }
205
206 StartupPacket startup = new StartupPacket(
207 localChannelReference.getLocalId());
208 try {
209 Channels.write(channel, startup).await();
210 } catch (InterruptedException e) {
211 logger.error("Can't connect to local server due to interruption" + i);
212 validLCR.cancel();
213 validLocalChannelHashMap.remove(remoteId);
214 throw new OpenR66ProtocolSystemException(
215 "Cannot connect to local handler", e);
216 }
217 validLCR.setSuccess();
218 return localChannelReference;
219 } else {
220 logger.error("Can't connect to local server " + i);
221 }
222 try {
223 Thread.sleep(Configuration.RETRYINMS);
224 } catch (InterruptedException e) {
225 validLCR.cancel();
226 validLocalChannelHashMap.remove(remoteId);
227 throw new OpenR66ProtocolSystemException(
228 "Cannot connect to local handler", e);
229 }
230 }
231 validLCR.cancel();
232 validLocalChannelHashMap.remove(remoteId);
233 logger.error("LocalChannelServer: " +
234 serverChannel.getClass().getName() + " " +
235 serverChannel.getConfig().getConnectTimeoutMillis() + " " +
236 serverChannel.isBound());
237 throw new OpenR66ProtocolSystemException(
238 "Cannot connect to local handler: " + socketLocalServerAddress +
239 " " + serverChannel.isBound() + " " + serverChannel,
240 channelFuture.getCause());
241 }
242
243
244
245
246
247
248 public LocalChannelReference getFromId(Integer id) {
249 for (int i = 0; i < Configuration.RETRYNB * 4; i ++) {
250 LocalChannelReference lcr = localChannelHashMap.get(id);
251 if (lcr == null) {
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267 try {
268 Thread.sleep(Configuration.RETRYINMS * 2);
269 } catch (InterruptedException e) {
270 }
271 } else {
272 return lcr;
273 }
274 }
275 return localChannelHashMap.get(id);
276 }
277
278
279
280
281
282
283 public void remove(Channel channel) {
284 LocalChannelReference localChannelReference = localChannelHashMap
285 .remove(channel.getId());
286 if (localChannelReference != null) {
287 logger.debug("Remove LocalChannel");
288 R66Future validLCR = validLocalChannelHashMap
289 .remove(localChannelReference.getRemoteId());
290 if (validLCR != null) {
291 validLCR.cancel();
292 }
293 DbTaskRunner runner = null;
294 if (localChannelReference.getSession() != null) {
295 runner = localChannelReference.getSession().getRunner();
296 }
297 R66Result result = new R66Result(
298 new OpenR66ProtocolSystemException(
299 "While closing Local Channel"),
300 localChannelReference.getSession(), false,
301 ErrorCode.ConnectionImpossible, runner);
302 localChannelReference.validateConnection(false, result);
303 if (localChannelReference.getSession() != null) {
304 if (runner != null) {
305 String key = runner.getKey();
306 localChannelHashMapExternal.remove(key);
307 }
308 }
309 }
310 }
311
312
313
314
315
316
317 public void setFromId(DbTaskRunner runner, LocalChannelReference lcr) {
318 String key = runner.getKey();
319 localChannelHashMapExternal.put(key, lcr);
320 }
321
322
323
324
325
326
327
328 public LocalChannelReference getFromRequest(String key) {
329 return localChannelHashMapExternal.get(key);
330 }
331
332
333
334
335
336 public int getNumberLocalChannel() {
337 return localChannelHashMap.size();
338 }
339
340 private static class CloseLocalChannelsFromNetworkChannelTast implements TimerTask {
341
342 LocalTransaction localTransaction;
343 AtomicInteger semaphore;
344 LocalChannelReference localChannelReference;
345 boolean analysis;
346
347 public CloseLocalChannelsFromNetworkChannelTast(
348 LocalTransaction localTransaction,
349 AtomicInteger semaphore,
350 LocalChannelReference localChannelReference) {
351 this.localTransaction = localTransaction;
352 this.semaphore = semaphore;
353 this.localChannelReference = localChannelReference;
354 analysis = true;
355 }
356
357 public void run(Timeout timeout) {
358
359 if (analysis) {
360 boolean wait = false;
361 if (!localChannelReference.getFutureRequest().isDone()) {
362 if (localChannelReference.getFutureValidRequest().isDone() &&
363 localChannelReference.getFutureValidRequest()
364 .isFailed()) {
365 logger.debug("Already currently on finalize");
366 wait = true;
367 } else {
368 R66Result finalValue = new R66Result(
369 localChannelReference.getSession(), true,
370 ErrorCode.Shutdown, null);
371 if (localChannelReference.getSession() != null) {
372 try {
373 localChannelReference.getSession()
374 .tryFinalizeRequest(finalValue);
375 } catch (OpenR66RunnerErrorException e) {
376 } catch (OpenR66ProtocolSystemException e) {
377 }
378 }
379 }
380 }
381 if (wait) {
382 this.analysis = false;
383 Configuration.configuration.getTimerClose().newTimeout(this,
384 Configuration.RETRYINMS * 10, TimeUnit.MILLISECONDS);
385 return;
386 }
387 }
388 logger.debug("Will close local channel");
389 try {
390 Channels.close(localChannelReference.getLocalChannel()).await();
391 } catch (InterruptedException e) {
392 }
393 localTransaction.remove(localChannelReference.getLocalChannel());
394 semaphore.decrementAndGet();
395 }
396
397 }
398
399
400
401
402
403 public void closeLocalChannelsFromNetworkChannel(Channel networkChannel) {
404 Collection<LocalChannelReference> collection = localChannelHashMap
405 .values();
406 AtomicInteger semaphore = new AtomicInteger();
407 Iterator<LocalChannelReference> iterator = collection.iterator();
408 while (iterator.hasNext()) {
409 LocalChannelReference localChannelReference = iterator.next();
410 if (localChannelReference.getNetworkChannel().compareTo(
411 networkChannel) == 0) {
412 semaphore.incrementAndGet();
413 CloseLocalChannelsFromNetworkChannelTast task =
414 new CloseLocalChannelsFromNetworkChannelTast(this,
415 semaphore, localChannelReference);
416 Configuration.configuration.getTimerClose().newTimeout(task,
417 Configuration.RETRYINMS * 10, TimeUnit.MILLISECONDS);
418 }
419 }
420 while (true) {
421 if (semaphore.get() == 0) {
422 break;
423 }
424 try {
425 Thread.sleep(Configuration.RETRYINMS*2);
426 } catch (InterruptedException e) {
427 break;
428 }
429 }
430 }
431
432
433
434
435 public void debugPrintActiveLocalChannels() {
436 Collection<LocalChannelReference> collection = localChannelHashMap
437 .values();
438 Iterator<LocalChannelReference> iterator = collection.iterator();
439 while (iterator.hasNext()) {
440 LocalChannelReference localChannelReference = iterator.next();
441 logger.debug("Will close local channel: {}", localChannelReference);
442 logger.debug(
443 " Containing: {}",
444 (localChannelReference.getSession() != null? localChannelReference
445 .getSession() : "no session"));
446 }
447 }
448
449
450
451
452 public void shutdownLocalChannels() {
453 Collection<LocalChannelReference> collection = localChannelHashMap
454 .values();
455 Iterator<LocalChannelReference> iterator = collection.iterator();
456 ValidPacket packet = new ValidPacket("Shutdown forced", null,
457 LocalPacketFactory.SHUTDOWNPACKET);
458 ChannelBuffer buffer = null;
459 while (iterator.hasNext()) {
460 LocalChannelReference localChannelReference = iterator.next();
461 logger.debug("Inform Shutdown {}", localChannelReference);
462 packet.setSmiddle(null);
463
464
465 if (localChannelReference.getSession() != null) {
466 R66Session session = localChannelReference.getSession();
467 DbTaskRunner runner = session.getRunner();
468 if (runner != null && runner.isInTransfer()) {
469 if (!runner.isSender()) {
470 int newrank = runner.getRank();
471 packet.setSmiddle(Integer.toString(newrank));
472 }
473
474 try {
475 runner.saveStatus();
476 } catch (OpenR66RunnerErrorException e) {
477 }
478 R66Result result = new R66Result(
479 new OpenR66ProtocolShutdownException(), session,
480 true, ErrorCode.Shutdown, runner);
481 result.other = packet;
482 try {
483 buffer = packet.getLocalPacket();
484 } catch (OpenR66ProtocolPacketException e1) {
485 }
486 localChannelReference
487 .sessionNewState(R66FiniteDualStates.SHUTDOWN);
488 NetworkPacket message = new NetworkPacket(
489 localChannelReference.getLocalId(),
490 localChannelReference.getRemoteId(),
491 packet.getType(), buffer);
492 try {
493 Channels.write(localChannelReference.getNetworkChannel(),
494 message).await();
495 } catch (InterruptedException e1) {
496 }
497 try {
498 session.setFinalizeTransfer(false, result);
499 } catch (OpenR66RunnerErrorException e) {
500 } catch (OpenR66ProtocolSystemException e) {
501 }
502 }
503 Channels.close(localChannelReference.getLocalChannel());
504 continue;
505 }
506 try {
507 buffer = packet.getLocalPacket();
508 } catch (OpenR66ProtocolPacketException e1) {
509 }
510 NetworkPacket message = new NetworkPacket(
511 localChannelReference.getLocalId(),
512 localChannelReference.getRemoteId(), packet.getType(),
513 buffer);
514 Channels.write(localChannelReference.getNetworkChannel(), message);
515 }
516 }
517
518
519
520
521 public void closeAll() {
522 logger.debug("close All Local Channels");
523 localChannelGroup.close().awaitUninterruptibly();
524 clientBootstrap.releaseExternalResources();
525 channelClientFactory.releaseExternalResources();
526 serverBootstrap.releaseExternalResources();
527 channelServerFactory.releaseExternalResources();
528 }
529
530 }