1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.protocol.utils;
22
23 import goldengate.common.database.DbAdmin;
24 import goldengate.common.file.DataBlock;
25 import goldengate.common.logging.GgInternalLogger;
26 import goldengate.common.logging.GgInternalLoggerFactory;
27 import goldengate.common.logging.GgSlf4JLoggerFactory;
28
29 import java.net.InetAddress;
30 import java.net.InetSocketAddress;
31
32 import openr66.context.R66FiniteDualStates;
33 import openr66.context.task.localexec.LocalExecClient;
34 import openr66.database.DbConstant;
35 import openr66.database.data.DbTaskRunner;
36 import openr66.protocol.configuration.Configuration;
37 import openr66.protocol.exception.OpenR66ProtocolPacketException;
38 import openr66.protocol.localhandler.LocalChannelReference;
39 import openr66.protocol.localhandler.packet.AbstractLocalPacket;
40 import openr66.protocol.localhandler.packet.DataPacket;
41 import openr66.protocol.localhandler.packet.EndTransferPacket;
42 import openr66.protocol.localhandler.packet.LocalPacketFactory;
43 import openr66.protocol.localhandler.packet.RequestPacket;
44 import openr66.protocol.networkhandler.NetworkTransaction;
45 import openr66.protocol.networkhandler.packet.NetworkPacket;
46
47 import org.jboss.netty.buffer.ChannelBuffer;
48 import org.jboss.netty.buffer.ChannelBuffers;
49 import org.jboss.netty.channel.Channel;
50 import org.jboss.netty.channel.ChannelFactory;
51 import org.jboss.netty.channel.ChannelFuture;
52 import org.jboss.netty.channel.Channels;
53 import org.jboss.netty.channel.group.ChannelGroupFuture;
54 import org.jboss.netty.channel.group.ChannelGroupFutureListener;
55 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
56 import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
57 import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler;
58 import org.jboss.netty.handler.traffic.TrafficCounter;
59 import org.slf4j.LoggerFactory;
60
61 import ch.qos.logback.classic.LoggerContext;
62
63
64
65
66
67 public class ChannelUtils extends Thread {
68
69
70
71 private static final GgInternalLogger logger = GgInternalLoggerFactory
72 .getLogger(ChannelUtils.class);
73
74 public static final Integer NOCHANNEL = Integer.MIN_VALUE;
75
76
77
78
79
80
81
82 public static InetAddress getRemoteInetAddress(Channel channel) {
83 InetSocketAddress socketAddress = (InetSocketAddress) channel
84 .getRemoteAddress();
85 if (socketAddress == null) {
86 socketAddress = new InetSocketAddress(20);
87 }
88 return socketAddress.getAddress();
89 }
90
91
92
93
94
95
96
97 public static InetAddress getLocalInetAddress(Channel channel) {
98 final InetSocketAddress socketAddress = (InetSocketAddress) channel
99 .getLocalAddress();
100 return socketAddress.getAddress();
101 }
102
103
104
105
106
107
108
109 public static InetSocketAddress getRemoteInetSocketAddress(Channel channel) {
110 return (InetSocketAddress) channel.getRemoteAddress();
111 }
112
113
114
115
116
117
118
119 public static InetSocketAddress getLocalInetSocketAddress(Channel channel) {
120 return (InetSocketAddress) channel.getLocalAddress();
121 }
122
123
124
125
126
127
128 private static class R66ChannelGroupFutureListener implements
129 ChannelGroupFutureListener {
130 OrderedMemoryAwareThreadPoolExecutor pool;
131 String name;
132 ChannelFactory channelFactory;
133
134 public R66ChannelGroupFutureListener(
135 String name,
136 OrderedMemoryAwareThreadPoolExecutor pool,
137 ChannelFactory channelFactory) {
138 this.name = name;
139 this.pool = pool;
140 this.channelFactory = channelFactory;
141 }
142
143 public void operationComplete(ChannelGroupFuture future)
144 throws Exception {
145 logger.info("Start with shutdown external resources for "+name);
146 if (pool != null) {
147 pool.shutdownNow();
148 }
149 if (channelFactory != null) {
150 channelFactory.releaseExternalResources();
151 }
152 logger.info("Done with shutdown "+name);
153 }
154 }
155
156
157
158
159
160
161 private static int terminateCommandChannels() {
162 final int result = Configuration.configuration.getServerChannelGroup()
163 .size();
164 logger.info("ServerChannelGroup: " + result);
165 Configuration.configuration.getServerChannelGroup().close()
166 .addListener(
167 new R66ChannelGroupFutureListener(
168 "ServerChannelGroup",
169 Configuration.configuration
170 .getServerPipelineExecutor(),
171 Configuration.configuration
172 .getServerChannelFactory()));
173 return result;
174 }
175
176
177
178
179
180 private static int terminateHttpChannels() {
181 final int result = Configuration.configuration.getHttpChannelGroup()
182 .size();
183 logger.debug("HttpChannelGroup: " + result);
184 Configuration.configuration.getHttpChannelGroup().close()
185 .addListener(
186 new R66ChannelGroupFutureListener(
187 "HttpChannelGroup",
188 null,
189 Configuration.configuration
190 .getHttpChannelFactory()));
191 Configuration.configuration.getHttpsChannelFactory().releaseExternalResources();
192 return result;
193 }
194
195
196
197
198
199
200 public static int nbCommandChannels(Configuration configuration) {
201 return configuration.getServerChannelGroup().size();
202 }
203
204
205
206
207
208 public static void close(Channel channel) {
209 try {
210 Thread.sleep(Configuration.WAITFORNETOP);
211 } catch (InterruptedException e) {
212 Thread.currentThread().interrupt();
213 }
214 Channels.close(channel);
215 }
216
217
218
219
220
221
222
223
224 public static ChannelFuture writeBackDataBlock(
225 LocalChannelReference localChannelReference, DataBlock block)
226 throws OpenR66ProtocolPacketException {
227 ChannelBuffer md5 = ChannelBuffers.EMPTY_BUFFER;
228 DbTaskRunner runner = localChannelReference.getSession().getRunner();
229 if (RequestPacket.isMD5Mode(runner.getMode())) {
230 md5 = FileUtils.getHash(block.getBlock());
231 }
232 localChannelReference.sessionNewState(R66FiniteDualStates.DATAS);
233 DataPacket data = new DataPacket(runner.getRank(), block.getBlock()
234 .copy(), md5);
235 ChannelFuture future = writeAbstractLocalPacket(localChannelReference, data, false);
236 runner.incrementRank();
237 return future;
238 }
239
240
241
242
243
244
245
246 public static void writeEndTransfer(
247 LocalChannelReference localChannelReference)
248 throws OpenR66ProtocolPacketException {
249 EndTransferPacket packet = new EndTransferPacket(
250 LocalPacketFactory.REQUESTPACKET);
251 localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
252 writeAbstractLocalPacket(localChannelReference, packet, false);
253 }
254
255
256
257
258
259
260
261
262 public static ChannelFuture writeAbstractLocalPacket(
263 LocalChannelReference localChannelReference, AbstractLocalPacket packet,
264 boolean wait)
265 throws OpenR66ProtocolPacketException {
266 NetworkPacket networkPacket;
267 try {
268 networkPacket = new NetworkPacket(localChannelReference
269 .getLocalId(), localChannelReference.getRemoteId(), packet);
270 } catch (OpenR66ProtocolPacketException e) {
271 logger.error("Cannot construct message from " + packet.toString(),
272 e);
273 throw e;
274 }
275 if (wait) {
276 ChannelFuture future = Channels.write(localChannelReference.getNetworkChannel(), networkPacket);
277 try {
278 return future.await();
279 } catch (InterruptedException e) {
280 return future;
281 }
282 } else {
283 return Channels.write(localChannelReference.getNetworkChannel(), networkPacket);
284 }
285 }
286
287
288
289
290
291
292
293
294 public static ChannelFuture writeAbstractLocalPacketToLocal(
295 LocalChannelReference localChannelReference, AbstractLocalPacket packet)
296 throws OpenR66ProtocolPacketException {
297 return Channels.write(localChannelReference.getLocalChannel(), packet);
298 }
299
300
301
302
303
304
305
306
307 public static final long willBeWaitingWriting(LocalChannelReference localChannelReference, int size) {
308 ChannelTrafficShapingHandler cts = localChannelReference.getChannelTrafficShapingHandler();
309 return willBeWaitingWriting(cts, size);
310 }
311
312
313
314
315
316
317
318 public static final long willBeWaitingWriting(ChannelTrafficShapingHandler cts, int size) {
319 long currentTime = System.currentTimeMillis();
320 if (cts != null && Configuration.configuration.serverChannelWriteLimit > 0) {
321 TrafficCounter tc = cts.getTrafficCounter();
322 if (tc != null) {
323 long wait = waitTraffic(Configuration.configuration.serverChannelWriteLimit,
324 tc.getCurrentWrittenBytes()+size,
325 tc.getLastTime(), currentTime);
326 if (wait > 0) {
327 return wait;
328 }
329 }
330 }
331 if (Configuration.configuration.serverGlobalWriteLimit > 0) {
332 GlobalTrafficShapingHandler gts = Configuration.configuration.getGlobalTrafficShapingHandler();
333 if (gts != null) {
334 TrafficCounter tc = gts.getTrafficCounter();
335 if (tc != null) {
336 long wait = waitTraffic(Configuration.configuration.serverGlobalWriteLimit,
337 tc.getCurrentWrittenBytes()+size,
338 tc.getLastTime(), currentTime);
339 if (wait > 0) {
340 return wait;
341 }
342 }
343 }
344 }
345 return 0;
346 }
347
348 private static final long waitTraffic(long limit, long bytes, long lastTime,
349 long curtime) {
350 long interval = curtime - lastTime;
351 if (interval == 0) {
352
353 return 0;
354 }
355 return ((bytes * 1000 / limit - interval) / 10 ) * 10;
356 }
357
358
359
360
361 public static void exit() {
362 Configuration.configuration.constraintLimitHandler.release();
363
364 TransferUtils.stopSelectedTransfers(DbConstant.admin.session, 0,
365 null, null, null, null, null, null, null, null, null, true, true, true);
366 Configuration.configuration.isShutdown = true;
367 Configuration.configuration.prepareServerStop();
368 final long delay = Configuration.configuration.TIMEOUTCON;
369
370 Configuration.configuration.getLocalTransaction()
371 .shutdownLocalChannels();
372 logger.warn("Exit: Give a delay of " + delay + " ms");
373 try {
374 Thread.sleep(delay);
375 } catch (final InterruptedException e) {
376 }
377 NetworkTransaction.closeRetrieveExecutors();
378 Configuration.configuration.getLocalTransaction().debugPrintActiveLocalChannels();
379 Configuration.configuration.getGlobalTrafficShapingHandler()
380 .releaseExternalResources();
381 logger.debug("Exit Shutdown Command");
382 terminateCommandChannels();
383 logger.debug("Exit Shutdown Local");
384 Configuration.configuration.getLocalTransaction().closeAll();
385 logger.debug("Exit Shutdown Http");
386 terminateHttpChannels();
387 if (Configuration.configuration.useLocalExec) {
388 LocalExecClient.releaseResources();
389 }
390 DbAdmin.closeAllConnection();
391 Configuration.configuration.serverStop();
392 System.err.println("Exit end of Shutdown");
393 Thread.currentThread().interrupt();
394 }
395
396 public static void stopLogger() {
397 if (GgInternalLoggerFactory.getDefaultFactory() instanceof GgSlf4JLoggerFactory) {
398 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
399 lc.stop();
400 }
401 }
402
403
404
405
406 @Override
407 public void run() {
408 OpenR66SignalHandler.terminate(false);
409 }
410 }