View Javadoc

1   /**
2    * This file is part of GoldenGate Project (named also GoldenGate or GG).
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author
5    * tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    * 
8    * All GoldenGate Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   * 
13   * GoldenGate is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Public License along with
18   * GoldenGate . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package openr66.protocol.localhandler;
21  
22  import goldengate.common.logging.GgInternalLogger;
23  import goldengate.common.logging.GgInternalLoggerFactory;
24  
25  import java.util.Collection;
26  import java.util.Iterator;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import openr66.context.ErrorCode;
32  import openr66.context.R66FiniteDualStates;
33  import openr66.context.R66Result;
34  import openr66.context.R66Session;
35  import openr66.context.task.exception.OpenR66RunnerErrorException;
36  import openr66.database.data.DbTaskRunner;
37  import openr66.protocol.configuration.Configuration;
38  import openr66.protocol.exception.OpenR66ProtocolPacketException;
39  import openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
40  import openr66.protocol.exception.OpenR66ProtocolShutdownException;
41  import openr66.protocol.exception.OpenR66ProtocolSystemException;
42  import openr66.protocol.localhandler.packet.LocalPacketFactory;
43  import openr66.protocol.localhandler.packet.StartupPacket;
44  import openr66.protocol.localhandler.packet.ValidPacket;
45  import openr66.protocol.networkhandler.NetworkTransaction;
46  import openr66.protocol.networkhandler.packet.NetworkPacket;
47  import openr66.protocol.utils.R66Future;
48  
49  import org.jboss.netty.bootstrap.ClientBootstrap;
50  import org.jboss.netty.bootstrap.ServerBootstrap;
51  import org.jboss.netty.buffer.ChannelBuffer;
52  import org.jboss.netty.channel.Channel;
53  import org.jboss.netty.channel.ChannelFactory;
54  import org.jboss.netty.channel.ChannelFuture;
55  import org.jboss.netty.channel.ChannelFutureListener;
56  import org.jboss.netty.channel.Channels;
57  import org.jboss.netty.channel.group.ChannelGroup;
58  import org.jboss.netty.channel.group.DefaultChannelGroup;
59  import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
60  import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
61  import org.jboss.netty.channel.local.LocalAddress;
62  import org.jboss.netty.util.Timeout;
63  import org.jboss.netty.util.TimerTask;
64  
65  /**
66   * This class handles Local Transaction connections
67   * 
68   * @author frederic bregier
69   */
70  public class LocalTransaction {
71      /**
72       * Internal Logger
73       */
74      private static final GgInternalLogger logger = GgInternalLoggerFactory
75              .getLogger(LocalTransaction.class);
76  
77      /**
78       * HashMap of LocalChannelReference using LocalChannelId
79       */
80      final ConcurrentHashMap<Integer, LocalChannelReference> localChannelHashMap = new ConcurrentHashMap<Integer, LocalChannelReference>();
81  
82      /**
83       * HashMap of Validation of LocalChannelReference using LocalChannelId
84       */
85      final ConcurrentHashMap<Integer, R66Future> validLocalChannelHashMap = new ConcurrentHashMap<Integer, R66Future>();
86  
87      /**
88       * HashMap of LocalChannelReference using requested_requester_specialId
89       */
90      final ConcurrentHashMap<String, LocalChannelReference> localChannelHashMapExternal = new ConcurrentHashMap<String, LocalChannelReference>();
91  
92      /**
93       * Remover from HashMap
94       */
95      private final ChannelFutureListener remover = new ChannelFutureListener() {
96          public void operationComplete(ChannelFuture future) {
97              remove(future.getChannel());
98          }
99      };
100 
101     private final ChannelFactory channelServerFactory = new DefaultLocalServerChannelFactory();
102 
103     private final ServerBootstrap serverBootstrap = new ServerBootstrap(
104             channelServerFactory);
105 
106     private final Channel serverChannel;
107 
108     private final LocalAddress socketLocalServerAddress = new LocalAddress("0");
109 
110     private final ChannelFactory channelClientFactory = new DefaultLocalClientChannelFactory();
111 
112     private final ClientBootstrap clientBootstrap = new ClientBootstrap(
113             channelClientFactory);
114 
115     private final ChannelGroup localChannelGroup = new DefaultChannelGroup(
116             "LocalChannels");
117 
118     /**
119      * Constructor
120      */
121     public LocalTransaction() {
122         serverBootstrap.setPipelineFactory(new LocalServerPipelineFactory());
123         serverBootstrap.setOption("connectTimeoutMillis",
124                 Configuration.configuration.TIMEOUTCON);
125         serverChannel = serverBootstrap.bind(socketLocalServerAddress);
126         localChannelGroup.add(serverChannel);
127         clientBootstrap.setPipelineFactory(new LocalClientPipelineFactory());
128     }
129 
130     /**
131      * Get the corresponding LocalChannelReference
132      * 
133      * @param remoteId
134      * @param localId
135      * @return the LocalChannelReference
136      * @throws OpenR66ProtocolSystemException
137      */
138     public LocalChannelReference getClient(Integer remoteId, Integer localId)
139             throws OpenR66ProtocolSystemException {
140         LocalChannelReference localChannelReference = getFromId(localId);
141         if (localChannelReference != null) {
142             if (localChannelReference.getRemoteId() != remoteId) {
143                 localChannelReference.setRemoteId(remoteId);
144             }
145             return localChannelReference;
146         }
147         throw new OpenR66ProtocolSystemException(
148                 "Cannot find LocalChannelReference");
149     }
150 
151     /**
152      * Create a new Client
153      * 
154      * @param networkChannel
155      * @param remoteId
156      * @param futureRequest
157      * @return the LocalChannelReference
158      * @throws OpenR66ProtocolSystemException
159      */
160     public LocalChannelReference createNewClient(Channel networkChannel,
161             Integer remoteId, R66Future futureRequest)
162             throws OpenR66ProtocolSystemException {
163         ChannelFuture channelFuture = null;
164         logger.debug("Status LocalChannelServer: {} {}", serverChannel
165                 .getClass().getName(), serverChannel.getConfig()
166                 .getConnectTimeoutMillis() + " " + serverChannel.isBound());
167         R66Future validLCR = new R66Future(true);
168         validLocalChannelHashMap.put(remoteId, validLCR);
169         for (int i = 0; i < Configuration.RETRYNB; i ++) {
170             channelFuture = clientBootstrap.connect(socketLocalServerAddress);
171             try {
172                 channelFuture.await();
173             } catch (InterruptedException e1) {
174                 validLCR.cancel();
175                 validLocalChannelHashMap.remove(remoteId);
176                 logger.error("LocalChannelServer Interrupted: " +
177                         serverChannel.getClass().getName() + " " +
178                         serverChannel.getConfig().getConnectTimeoutMillis() +
179                         " " + serverChannel.isBound());
180                 throw new OpenR66ProtocolSystemException(
181                         "Interruption - Cannot connect to local handler: " +
182                                 socketLocalServerAddress + " " +
183                                 serverChannel.isBound() + " " + serverChannel,
184                         e1);
185             }
186             if (channelFuture.isSuccess()) {
187                 final Channel channel = channelFuture.getChannel();
188                 localChannelGroup.add(channel);
189                 final LocalChannelReference localChannelReference = new LocalChannelReference(
190                         channel, networkChannel, remoteId, futureRequest);
191                 logger.debug("Create LocalChannel entry: " + i + " {}",
192                         localChannelReference);
193                 channel.getCloseFuture().addListener(remover);
194                 localChannelHashMap.put(channel.getId(), localChannelReference);
195                 try {
196                     NetworkTransaction.addLocalChannelToNetworkChannel(
197                             networkChannel, channel);
198                 } catch (OpenR66ProtocolRemoteShutdownException e) {
199                     validLCR.cancel();
200                     validLocalChannelHashMap.remove(remoteId);
201                     Channels.close(channel);
202                     throw new OpenR66ProtocolSystemException(
203                             "Cannot connect to local handler", e);
204                 }
205                 // Now send first a Startup message
206                 StartupPacket startup = new StartupPacket(
207                         localChannelReference.getLocalId());
208                 try {
209                     Channels.write(channel, startup).await();
210                 } catch (InterruptedException e) {
211                     logger.error("Can't connect to local server due to interruption" + i);
212                     validLCR.cancel();
213                     validLocalChannelHashMap.remove(remoteId);
214                     throw new OpenR66ProtocolSystemException(
215                             "Cannot connect to local handler", e);
216                 }
217                 validLCR.setSuccess();
218                 return localChannelReference;
219             } else {
220                 logger.error("Can't connect to local server " + i);
221             }
222             try {
223                 Thread.sleep(Configuration.RETRYINMS);
224             } catch (InterruptedException e) {
225                 validLCR.cancel();
226                 validLocalChannelHashMap.remove(remoteId);
227                 throw new OpenR66ProtocolSystemException(
228                         "Cannot connect to local handler", e);
229             }
230         }
231         validLCR.cancel();
232         validLocalChannelHashMap.remove(remoteId);
233         logger.error("LocalChannelServer: " +
234                 serverChannel.getClass().getName() + " " +
235                 serverChannel.getConfig().getConnectTimeoutMillis() + " " +
236                 serverChannel.isBound());
237         throw new OpenR66ProtocolSystemException(
238                 "Cannot connect to local handler: " + socketLocalServerAddress +
239                         " " + serverChannel.isBound() + " " + serverChannel,
240                 channelFuture.getCause());
241     }
242 
243     /**
244      * 
245      * @param id
246      * @return the LocalChannelReference
247      */
248     public LocalChannelReference getFromId(Integer id) {
249         for (int i = 0; i < Configuration.RETRYNB * 4; i ++) {
250             LocalChannelReference lcr = localChannelHashMap.get(id);
251             if (lcr == null) {
252                 /*
253                  * R66Future future = validLocalChannelHashMap.get(id);
254                  * logger.debug("DEBUG Future ValidLocalChannel: not found: " +
255                  * id + (future != null)); if (future != null) { try {
256                  * future.await(Configuration.configuration.TIMEOUTCON); } catch
257                  * (InterruptedException e) { return
258                  * localChannelHashMap.get(id); }
259                  * logger.debug("DEBUG Future ValidLocalChannel: " + id +
260                  * future.isDone() + ":" + future.isSuccess()); if
261                  * (future.isSuccess()) { return localChannelHashMap.get(id); }
262                  * else if (future.isFailed()) { return null; } } else {
263                  * logger.debug("DEBUG Future ValidLocalChannel: Sleep" + id);
264                  * try { Thread.sleep(Configuration.RETRYINMS); } catch
265                  * (InterruptedException e) { } continue; }
266                  */
267                 try {
268                     Thread.sleep(Configuration.RETRYINMS * 2);
269                 } catch (InterruptedException e) {
270                 }
271             } else {
272                 return lcr;
273             }
274         }
275         return localChannelHashMap.get(id);
276     }
277 
278     /**
279      * Remove one local channel
280      * 
281      * @param channel
282      */
283     public void remove(Channel channel) {
284         LocalChannelReference localChannelReference = localChannelHashMap
285                 .remove(channel.getId());
286         if (localChannelReference != null) {
287             logger.debug("Remove LocalChannel");
288             R66Future validLCR = validLocalChannelHashMap
289                     .remove(localChannelReference.getRemoteId());
290             if (validLCR != null) {
291                 validLCR.cancel();
292             }
293             DbTaskRunner runner = null;
294             if (localChannelReference.getSession() != null) {
295                 runner = localChannelReference.getSession().getRunner();
296             }
297             R66Result result = new R66Result(
298                     new OpenR66ProtocolSystemException(
299                             "While closing Local Channel"),
300                     localChannelReference.getSession(), false,
301                     ErrorCode.ConnectionImpossible, runner);
302             localChannelReference.validateConnection(false, result);
303             if (localChannelReference.getSession() != null) {
304                 if (runner != null) {
305                     String key = runner.getKey();
306                     localChannelHashMapExternal.remove(key);
307                 }
308             }
309         }
310     }
311 
312     /**
313      * 
314      * @param runner
315      * @param lcr
316      */
317     public void setFromId(DbTaskRunner runner, LocalChannelReference lcr) {
318         String key = runner.getKey();
319         localChannelHashMapExternal.put(key, lcr);
320     }
321 
322     /**
323      * 
324      * @param key
325      *            as "requested requester specialId"
326      * @return the LocalChannelReference
327      */
328     public LocalChannelReference getFromRequest(String key) {
329         return localChannelHashMapExternal.get(key);
330     }
331 
332     /**
333      * 
334      * @return the number of active local channels
335      */
336     public int getNumberLocalChannel() {
337         return localChannelHashMap.size();
338     }
339 
340     private static class CloseLocalChannelsFromNetworkChannelTast implements TimerTask {
341 
342         LocalTransaction localTransaction;
343         AtomicInteger semaphore;
344         LocalChannelReference localChannelReference;
345         boolean analysis;
346         
347         public CloseLocalChannelsFromNetworkChannelTast(
348                 LocalTransaction localTransaction,
349                 AtomicInteger semaphore,
350                 LocalChannelReference localChannelReference) {
351             this.localTransaction = localTransaction;
352             this.semaphore = semaphore;
353             this.localChannelReference = localChannelReference;
354             analysis = true;
355         }
356         
357         public void run(Timeout timeout) {
358             // give a chance for the LocalChannel to stop normally
359             if (analysis) {
360                 boolean wait = false;
361                 if (!localChannelReference.getFutureRequest().isDone()) {
362                     if (localChannelReference.getFutureValidRequest().isDone() &&
363                             localChannelReference.getFutureValidRequest()
364                                     .isFailed()) {
365                         logger.debug("Already currently on finalize");
366                         wait = true;
367                     } else {
368                         R66Result finalValue = new R66Result(
369                                 localChannelReference.getSession(), true,
370                                 ErrorCode.Shutdown, null);
371                         if (localChannelReference.getSession() != null) {
372                             try {
373                                 localChannelReference.getSession()
374                                         .tryFinalizeRequest(finalValue);
375                             } catch (OpenR66RunnerErrorException e) {
376                             } catch (OpenR66ProtocolSystemException e) {
377                             }
378                         }
379                     }
380                 }
381                 if (wait) {
382                     this.analysis = false;
383                     Configuration.configuration.getTimerClose().newTimeout(this, 
384                             Configuration.RETRYINMS * 10, TimeUnit.MILLISECONDS);
385                     return;
386                 }
387             }
388             logger.debug("Will close local channel");
389             try {
390                 Channels.close(localChannelReference.getLocalChannel()).await();
391             } catch (InterruptedException e) {
392             }
393             localTransaction.remove(localChannelReference.getLocalChannel());
394             semaphore.decrementAndGet();
395         }
396         
397     }
398     /**
399      * Close all Local Channels from the NetworkChannel
400      * 
401      * @param networkChannel
402      */
403     public void closeLocalChannelsFromNetworkChannel(Channel networkChannel) {
404         Collection<LocalChannelReference> collection = localChannelHashMap
405                 .values();
406         AtomicInteger semaphore = new AtomicInteger();
407         Iterator<LocalChannelReference> iterator = collection.iterator();
408         while (iterator.hasNext()) {
409             LocalChannelReference localChannelReference = iterator.next();
410             if (localChannelReference.getNetworkChannel().compareTo(
411                     networkChannel) == 0) {
412                 semaphore.incrementAndGet();
413                 CloseLocalChannelsFromNetworkChannelTast task = 
414                     new CloseLocalChannelsFromNetworkChannelTast(this, 
415                             semaphore, localChannelReference);
416                 Configuration.configuration.getTimerClose().newTimeout(task, 
417                         Configuration.RETRYINMS * 10, TimeUnit.MILLISECONDS);
418             }
419         }
420         while (true) {
421             if (semaphore.get() == 0) {
422                 break;
423             }
424             try {
425                 Thread.sleep(Configuration.RETRYINMS*2);
426             } catch (InterruptedException e) {
427                 break;
428             }
429         }
430     }
431 
432     /**
433      * Debug function (while shutdown for instance)
434      */
435     public void debugPrintActiveLocalChannels() {
436         Collection<LocalChannelReference> collection = localChannelHashMap
437                 .values();
438         Iterator<LocalChannelReference> iterator = collection.iterator();
439         while (iterator.hasNext()) {
440             LocalChannelReference localChannelReference = iterator.next();
441             logger.debug("Will close local channel: {}", localChannelReference);
442             logger.debug(
443                     " Containing: {}",
444                     (localChannelReference.getSession() != null? localChannelReference
445                             .getSession() : "no session"));
446         }
447     }
448 
449     /**
450      * Informs all remote client that the server is shutting down
451      */
452     public void shutdownLocalChannels() {
453         Collection<LocalChannelReference> collection = localChannelHashMap
454                 .values();
455         Iterator<LocalChannelReference> iterator = collection.iterator();
456         ValidPacket packet = new ValidPacket("Shutdown forced", null,
457                 LocalPacketFactory.SHUTDOWNPACKET);
458         ChannelBuffer buffer = null;
459         while (iterator.hasNext()) {
460             LocalChannelReference localChannelReference = iterator.next();
461             logger.debug("Inform Shutdown {}", localChannelReference);
462             packet.setSmiddle(null);
463             // If a transfer is running, save the current rank and inform remote
464             // host
465             if (localChannelReference.getSession() != null) {
466                 R66Session session = localChannelReference.getSession();
467                 DbTaskRunner runner = session.getRunner();
468                 if (runner != null && runner.isInTransfer()) {
469                     if (!runner.isSender()) {
470                         int newrank = runner.getRank();
471                         packet.setSmiddle(Integer.toString(newrank));
472                     }
473                     // Save File status
474                     try {
475                         runner.saveStatus();
476                     } catch (OpenR66RunnerErrorException e) {
477                     }
478                     R66Result result = new R66Result(
479                             new OpenR66ProtocolShutdownException(), session,
480                             true, ErrorCode.Shutdown, runner);
481                     result.other = packet;
482                     try {
483                         buffer = packet.getLocalPacket();
484                     } catch (OpenR66ProtocolPacketException e1) {
485                     }
486                     localChannelReference
487                             .sessionNewState(R66FiniteDualStates.SHUTDOWN);
488                     NetworkPacket message = new NetworkPacket(
489                             localChannelReference.getLocalId(),
490                             localChannelReference.getRemoteId(),
491                             packet.getType(), buffer);
492                     try {
493                         Channels.write(localChannelReference.getNetworkChannel(),
494                                 message).await();
495                     } catch (InterruptedException e1) {
496                     }
497                     try {
498                         session.setFinalizeTransfer(false, result);
499                     } catch (OpenR66RunnerErrorException e) {
500                     } catch (OpenR66ProtocolSystemException e) {
501                     }
502                 }
503                 Channels.close(localChannelReference.getLocalChannel());
504                 continue;
505             }
506             try {
507                 buffer = packet.getLocalPacket();
508             } catch (OpenR66ProtocolPacketException e1) {
509             }
510             NetworkPacket message = new NetworkPacket(
511                     localChannelReference.getLocalId(),
512                     localChannelReference.getRemoteId(), packet.getType(),
513                     buffer);
514             Channels.write(localChannelReference.getNetworkChannel(), message);
515         }
516     }
517 
518     /**
519      * Close All Local Channels
520      */
521     public void closeAll() {
522         logger.debug("close All Local Channels");
523         localChannelGroup.close().awaitUninterruptibly();
524         clientBootstrap.releaseExternalResources();
525         channelClientFactory.releaseExternalResources();
526         serverBootstrap.releaseExternalResources();
527         channelServerFactory.releaseExternalResources();
528     }
529 
530 }