View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.protocol.utils;
22  
23  import goldengate.common.database.DbAdmin;
24  import goldengate.common.file.DataBlock;
25  import goldengate.common.logging.GgInternalLogger;
26  import goldengate.common.logging.GgInternalLoggerFactory;
27  import goldengate.common.logging.GgSlf4JLoggerFactory;
28  
29  import java.net.InetAddress;
30  import java.net.InetSocketAddress;
31  
32  import openr66.context.R66FiniteDualStates;
33  import openr66.context.task.localexec.LocalExecClient;
34  import openr66.database.DbConstant;
35  import openr66.database.data.DbTaskRunner;
36  import openr66.protocol.configuration.Configuration;
37  import openr66.protocol.exception.OpenR66ProtocolPacketException;
38  import openr66.protocol.localhandler.LocalChannelReference;
39  import openr66.protocol.localhandler.packet.AbstractLocalPacket;
40  import openr66.protocol.localhandler.packet.DataPacket;
41  import openr66.protocol.localhandler.packet.EndTransferPacket;
42  import openr66.protocol.localhandler.packet.LocalPacketFactory;
43  import openr66.protocol.localhandler.packet.RequestPacket;
44  import openr66.protocol.networkhandler.NetworkTransaction;
45  import openr66.protocol.networkhandler.packet.NetworkPacket;
46  
47  import org.jboss.netty.buffer.ChannelBuffer;
48  import org.jboss.netty.buffer.ChannelBuffers;
49  import org.jboss.netty.channel.Channel;
50  import org.jboss.netty.channel.ChannelFactory;
51  import org.jboss.netty.channel.ChannelFuture;
52  import org.jboss.netty.channel.Channels;
53  import org.jboss.netty.channel.group.ChannelGroupFuture;
54  import org.jboss.netty.channel.group.ChannelGroupFutureListener;
55  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
56  import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
57  import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler;
58  import org.jboss.netty.handler.traffic.TrafficCounter;
59  import org.slf4j.LoggerFactory;
60  
61  import ch.qos.logback.classic.LoggerContext;
62  
63  /**
64   * Channel Utils
65   * @author Frederic Bregier
66   */
67  public class ChannelUtils extends Thread {
68      /**
69       * Internal Logger
70       */
71      private static final GgInternalLogger logger = GgInternalLoggerFactory
72              .getLogger(ChannelUtils.class);
73  
74      public static final Integer NOCHANNEL = Integer.MIN_VALUE;
75  
76      /**
77       * Get the Remote InetAddress
78       *
79       * @param channel
80       * @return the remote InetAddress
81       */
82      public static InetAddress getRemoteInetAddress(Channel channel) {
83          InetSocketAddress socketAddress = (InetSocketAddress) channel
84                  .getRemoteAddress();
85          if (socketAddress == null) {
86              socketAddress = new InetSocketAddress(20);
87          }
88          return socketAddress.getAddress();
89      }
90  
91      /**
92       * Get the Local InetAddress
93       *
94       * @param channel
95       * @return the local InetAddress
96       */
97      public static InetAddress getLocalInetAddress(Channel channel) {
98          final InetSocketAddress socketAddress = (InetSocketAddress) channel
99                  .getLocalAddress();
100         return socketAddress.getAddress();
101     }
102 
103     /**
104      * Get the Remote InetSocketAddress
105      *
106      * @param channel
107      * @return the remote InetSocketAddress
108      */
109     public static InetSocketAddress getRemoteInetSocketAddress(Channel channel) {
110         return (InetSocketAddress) channel.getRemoteAddress();
111     }
112 
113     /**
114      * Get the Local InetSocketAddress
115      *
116      * @param channel
117      * @return the local InetSocketAddress
118      */
119     public static InetSocketAddress getLocalInetSocketAddress(Channel channel) {
120         return (InetSocketAddress) channel.getLocalAddress();
121     }
122 
123     /**
124      * Finalize resources attached to handlers
125      *
126      * @author Frederic Bregier
127      */
128     private static class R66ChannelGroupFutureListener implements
129             ChannelGroupFutureListener {
130         OrderedMemoryAwareThreadPoolExecutor pool;
131         String name;
132         ChannelFactory channelFactory;
133 
134         public R66ChannelGroupFutureListener(
135                 String name,
136                 OrderedMemoryAwareThreadPoolExecutor pool,
137                 ChannelFactory channelFactory) {
138             this.name = name;
139             this.pool = pool;
140             this.channelFactory = channelFactory;
141         }
142 
143         public void operationComplete(ChannelGroupFuture future)
144                 throws Exception {
145             logger.info("Start with shutdown external resources for "+name);
146             if (pool != null) {
147                 pool.shutdownNow();
148             }
149             if (channelFactory != null) {
150                 channelFactory.releaseExternalResources();
151             }
152             logger.info("Done with shutdown "+name);
153         }
154     }
155 
156     /**
157      * Terminate all registered channels
158      *
159      * @return the number of previously registered network channels
160      */
161     private static int terminateCommandChannels() {
162         final int result = Configuration.configuration.getServerChannelGroup()
163                 .size();
164         logger.info("ServerChannelGroup: " + result);
165         Configuration.configuration.getServerChannelGroup().close()
166                 .addListener(
167                         new R66ChannelGroupFutureListener(
168                                 "ServerChannelGroup",
169                                 Configuration.configuration
170                                         .getServerPipelineExecutor(),
171                                 Configuration.configuration
172                                         .getServerChannelFactory()));
173         return result;
174     }
175     /**
176      * Terminate all registered Http channels
177      *
178      * @return the number of previously registered http network channels
179      */
180     private static int terminateHttpChannels() {
181         final int result = Configuration.configuration.getHttpChannelGroup()
182                 .size();
183         logger.debug("HttpChannelGroup: " + result);
184         Configuration.configuration.getHttpChannelGroup().close()
185                 .addListener(
186                         new R66ChannelGroupFutureListener(
187                                 "HttpChannelGroup",
188                                 null,
189                                 Configuration.configuration
190                                         .getHttpChannelFactory()));
191         Configuration.configuration.getHttpsChannelFactory().releaseExternalResources();
192         return result;
193     }
194     /**
195      * Return the current number of network connections
196      *
197      * @param configuration
198      * @return the current number of network connections
199      */
200     public static int nbCommandChannels(Configuration configuration) {
201         return configuration.getServerChannelGroup().size();
202     }
203 
204     /**
205      *
206      * @param channel
207      */
208     public static void close(Channel channel) {
209         try {
210             Thread.sleep(Configuration.WAITFORNETOP);
211         } catch (InterruptedException e) {
212             Thread.currentThread().interrupt();
213         }
214         Channels.close(channel);
215     }
216 
217     /**
218      *
219      * @param localChannelReference
220      * @param block
221      * @return the ChannelFuture of this write operation
222      * @throws OpenR66ProtocolPacketException
223      */
224     public static ChannelFuture writeBackDataBlock(
225             LocalChannelReference localChannelReference, DataBlock block)
226             throws OpenR66ProtocolPacketException {
227         ChannelBuffer md5 = ChannelBuffers.EMPTY_BUFFER;
228         DbTaskRunner runner = localChannelReference.getSession().getRunner();
229         if (RequestPacket.isMD5Mode(runner.getMode())) {
230             md5 = FileUtils.getHash(block.getBlock());
231         }
232         localChannelReference.sessionNewState(R66FiniteDualStates.DATAS);
233         DataPacket data = new DataPacket(runner.getRank(), block.getBlock()
234                 .copy(), md5);
235         ChannelFuture future = writeAbstractLocalPacket(localChannelReference, data, false);
236         runner.incrementRank();
237         return future;
238     }
239 
240     /**
241      * Write the EndTransfer
242      *
243      * @param localChannelReference
244      * @throws OpenR66ProtocolPacketException
245      */
246     public static void writeEndTransfer(
247             LocalChannelReference localChannelReference)
248     throws OpenR66ProtocolPacketException {
249         EndTransferPacket packet = new EndTransferPacket(
250                 LocalPacketFactory.REQUESTPACKET);
251         localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
252         writeAbstractLocalPacket(localChannelReference, packet, false);
253     }
254     /**
255      * Write an AbstractLocalPacket to the network Channel
256      * @param localChannelReference
257      * @param packet
258      * @param wait
259      * @return the ChannelFuture on write operation
260      * @throws OpenR66ProtocolPacketException
261      */
262     public static ChannelFuture writeAbstractLocalPacket(
263             LocalChannelReference localChannelReference, AbstractLocalPacket packet,
264             boolean wait)
265     throws OpenR66ProtocolPacketException {
266         NetworkPacket networkPacket;
267         try {
268             networkPacket = new NetworkPacket(localChannelReference
269                     .getLocalId(), localChannelReference.getRemoteId(), packet);
270         } catch (OpenR66ProtocolPacketException e) {
271             logger.error("Cannot construct message from " + packet.toString(),
272                     e);
273             throw e;
274         }
275         if (wait) {
276             ChannelFuture future = Channels.write(localChannelReference.getNetworkChannel(), networkPacket);
277             try {
278                 return future.await();
279             } catch (InterruptedException e) {
280                 return future;
281             }
282         } else {
283             return Channels.write(localChannelReference.getNetworkChannel(), networkPacket);
284         }
285     }
286 
287     /**
288      * Write an AbstractLocalPacket to the Local Channel
289      * @param localChannelReference
290      * @param packet
291      * @return the ChannelFuture on write operation
292      * @throws OpenR66ProtocolPacketException
293      */
294     public static ChannelFuture writeAbstractLocalPacketToLocal(
295             LocalChannelReference localChannelReference, AbstractLocalPacket packet)
296     throws OpenR66ProtocolPacketException {
297         return Channels.write(localChannelReference.getLocalChannel(), packet);
298     }
299     
300 
301     /**
302      * Compute Wait for Traffic in Write (ugly turn around)
303      * @param localChannelReference
304      * @param size
305      * @return the wait in ms
306      */
307     public static final long willBeWaitingWriting(LocalChannelReference localChannelReference, int size) {
308         ChannelTrafficShapingHandler cts = localChannelReference.getChannelTrafficShapingHandler();
309         return willBeWaitingWriting(cts, size);
310     }
311 
312     /**
313      * Compute Wait for Traffic in Write (ugly turn around)
314      * @param cts
315      * @param size
316      * @return the wait in ms
317      */
318     public static final long willBeWaitingWriting(ChannelTrafficShapingHandler cts, int size) {
319         long currentTime = System.currentTimeMillis();
320         if (cts != null && Configuration.configuration.serverChannelWriteLimit > 0) {
321             TrafficCounter tc = cts.getTrafficCounter();
322             if (tc != null) {
323                 long wait = waitTraffic(Configuration.configuration.serverChannelWriteLimit, 
324                         tc.getCurrentWrittenBytes()+size, 
325                         tc.getLastTime(), currentTime);
326                 if (wait > 0) {
327                     return wait;
328                 }
329             }
330         }
331         if (Configuration.configuration.serverGlobalWriteLimit > 0) {
332             GlobalTrafficShapingHandler gts = Configuration.configuration.getGlobalTrafficShapingHandler();
333             if (gts != null) {
334                 TrafficCounter tc = gts.getTrafficCounter();
335                 if (tc != null) {
336                     long wait = waitTraffic(Configuration.configuration.serverGlobalWriteLimit, 
337                                 tc.getCurrentWrittenBytes()+size, 
338                                 tc.getLastTime(), currentTime);
339                     if (wait > 0) {
340                         return wait;
341                     }
342                 }
343             }
344         }
345         return 0;
346     }
347     
348     private static final long waitTraffic(long limit, long bytes, long lastTime,
349             long curtime) {
350         long interval = curtime - lastTime;
351         if (interval == 0) {
352             // Time is too short, so just lets continue
353             return 0;
354         }
355         return ((bytes * 1000 / limit - interval) / 10 ) * 10;
356     }
357 
358     /**
359      * Exit global ChannelFactory
360      */
361     public static void exit() {
362         Configuration.configuration.constraintLimitHandler.release();
363         // First try to StopAll
364         TransferUtils.stopSelectedTransfers(DbConstant.admin.session, 0,
365                 null, null, null, null, null, null, null, null, null, true, true, true);
366         Configuration.configuration.isShutdown = true;
367         Configuration.configuration.prepareServerStop();
368         final long delay = Configuration.configuration.TIMEOUTCON;
369         // Inform others that shutdown
370         Configuration.configuration.getLocalTransaction()
371                 .shutdownLocalChannels();
372         logger.warn("Exit: Give a delay of " + delay + " ms");
373         try {
374             Thread.sleep(delay);
375         } catch (final InterruptedException e) {
376         }
377         NetworkTransaction.closeRetrieveExecutors();
378         Configuration.configuration.getLocalTransaction().debugPrintActiveLocalChannels();
379         Configuration.configuration.getGlobalTrafficShapingHandler()
380                 .releaseExternalResources();
381         logger.debug("Exit Shutdown Command");
382         terminateCommandChannels();
383         logger.debug("Exit Shutdown Local");
384         Configuration.configuration.getLocalTransaction().closeAll();
385         logger.debug("Exit Shutdown Http");
386         terminateHttpChannels();
387         if (Configuration.configuration.useLocalExec) {
388             LocalExecClient.releaseResources();
389         }
390         DbAdmin.closeAllConnection();
391         Configuration.configuration.serverStop();
392         System.err.println("Exit end of Shutdown");
393         Thread.currentThread().interrupt();
394     }
395 
396     public static void stopLogger() {
397         if (GgInternalLoggerFactory.getDefaultFactory() instanceof GgSlf4JLoggerFactory) {
398             LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
399             lc.stop();
400         }
401     }
402     /**
403      * This function is the top function to be called when the server is to be
404      * shutdown.
405      */
406     @Override
407     public void run() {
408         OpenR66SignalHandler.terminate(false);
409     }
410 }