1 /*
2 * Copyright 2009 Red Hat, Inc.
3 *
4 * Red Hat licenses this file to you under the Apache License, version 2.0
5 * (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelState;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.channel.SimpleChannelHandler;
28 import org.jboss.netty.logging.InternalLogger;
29 import org.jboss.netty.logging.InternalLoggerFactory;
30 import org.jboss.netty.util.DefaultObjectSizeEstimator;
31 import org.jboss.netty.util.ExternalResourceReleasable;
32 import org.jboss.netty.util.ObjectSizeEstimator;
33 import org.jboss.netty.util.Timeout;
34 import org.jboss.netty.util.Timer;
35 import org.jboss.netty.util.TimerTask;
36
37 /**
38 * AbstractTrafficShapingHandler allows to limit the global bandwidth
39 * (see {@link GlobalTrafficShapingHandler}) or per session
40 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41 * It allows too to implement an almost real time monitoring of the bandwidth using
42 * the monitors from {@link TrafficCounter} that will call back every checkInterval
43 * the method doAccounting of this handler.<br>
44 * <br>
45 *
46 * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47 * is the size of the object to be read or write accordingly to the type of
48 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49 *
50 * If you want for any particular reasons to stop the monitoring (accounting) or to change
51 * the read/write limit or the check interval, several methods allow that for you:<br>
52 * <ul>
53 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56 * <li></li>
57 * </ul>
58 *
59 * @author The Netty Project (netty-dev@lists.jboss.org)
60 * @author Frederic Bregier
61 * @version $Rev: 1225 $, $Date: 2012-05-20 10:48:53 +0200 (dim., 20 mai 2012) $
62 */
63 public abstract class AbstractTrafficShapingHandler extends
64 SimpleChannelHandler implements ExternalResourceReleasable {
65 /**
66 * Internal logger
67 */
68 static InternalLogger logger = InternalLoggerFactory
69 .getInstance(AbstractTrafficShapingHandler.class);
70
71 /**
72 * Default delay between two checks: 1s
73 */
74 public static final long DEFAULT_CHECK_INTERVAL = 1000;
75
76 /**
77 * Default minimal time to wait
78 */
79 private static final long MINIMAL_WAIT = 10;
80
81 /**
82 * Traffic Counter
83 */
84 protected TrafficCounter trafficCounter = null;
85
86 /**
87 * ObjectSizeEstimator
88 */
89 private ObjectSizeEstimator objectSizeEstimator = null;
90
91 /**
92 * Timer to associated to any TrafficCounter
93 */
94 protected Timer timer = null;
95 /**
96 * used in releaseExternalResources() to cancel the timer
97 */
98 volatile private Timeout timeout = null;
99 /**
100 * Limit in B/s to apply to write
101 */
102 private long writeLimit = 0;
103
104 /**
105 * Limit in B/s to apply to read
106 */
107 private long readLimit = 0;
108
109 /**
110 * Delay between two performance snapshots
111 */
112 protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
113
114 /**
115 * Boolean associated with the release of this TrafficShapingHandler.
116 * It will be true only once when the releaseExternalRessources is called
117 * to prevent waiting when shutdown.
118 */
119 private final AtomicBoolean release = new AtomicBoolean(false);
120
121 /**
122 * @param newObjectSizeEstimator
123 * @param newTimer
124 * @param newActive
125 * @param newWriteLimit
126 * @param newReadLimit
127 * @param newCheckInterval
128 */
129 private void init(ObjectSizeEstimator newObjectSizeEstimator,
130 Timer newTimer, long newWriteLimit, long newReadLimit,
131 long newCheckInterval) {
132 objectSizeEstimator = newObjectSizeEstimator;
133 timer = newTimer;
134 writeLimit = newWriteLimit;
135 readLimit = newReadLimit;
136 checkInterval = newCheckInterval;
137 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
138 }
139
140 /**
141 *
142 * @param newTrafficCounter the TrafficCounter to set
143 */
144 void setTrafficCounter(TrafficCounter newTrafficCounter) {
145 trafficCounter = newTrafficCounter;
146 }
147
148 /**
149 * Constructor using default {@link ObjectSizeEstimator}
150 *
151 * @param timer
152 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
153 * @param writeLimit
154 * 0 or a limit in bytes/s
155 * @param readLimit
156 * 0 or a limit in bytes/s
157 * @param checkInterval
158 * The delay between two computations of performances for
159 * channels or 0 if no stats are to be computed
160 */
161 public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
162 long readLimit, long checkInterval) {
163 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
164 checkInterval);
165 }
166
167 /**
168 * Constructor using the specified ObjectSizeEstimator
169 *
170 * @param objectSizeEstimator
171 * the {@link ObjectSizeEstimator} that will be used to compute
172 * the size of the message
173 * @param timer
174 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
175 * @param writeLimit
176 * 0 or a limit in bytes/s
177 * @param readLimit
178 * 0 or a limit in bytes/s
179 * @param checkInterval
180 * The delay between two computations of performances for
181 * channels or 0 if no stats are to be computed
182 */
183 public AbstractTrafficShapingHandler(
184 ObjectSizeEstimator objectSizeEstimator, Timer timer,
185 long writeLimit, long readLimit, long checkInterval) {
186 init(objectSizeEstimator, timer, writeLimit, readLimit,
187 checkInterval);
188 }
189
190 /**
191 * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
192 *
193 * @param timer
194 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
195 * @param writeLimit
196 * 0 or a limit in bytes/s
197 * @param readLimit
198 * 0 or a limit in bytes/s
199 */
200 public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201 long readLimit) {
202 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203 DEFAULT_CHECK_INTERVAL);
204 }
205
206 /**
207 * Constructor using the specified ObjectSizeEstimator and using default Check Interval
208 *
209 * @param objectSizeEstimator
210 * the {@link ObjectSizeEstimator} that will be used to compute
211 * the size of the message
212 * @param timer
213 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
214 * @param writeLimit
215 * 0 or a limit in bytes/s
216 * @param readLimit
217 * 0 or a limit in bytes/s
218 */
219 public AbstractTrafficShapingHandler(
220 ObjectSizeEstimator objectSizeEstimator, Timer timer,
221 long writeLimit, long readLimit) {
222 init(objectSizeEstimator, timer, writeLimit, readLimit,
223 DEFAULT_CHECK_INTERVAL);
224 }
225
226 /**
227 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
228 *
229 * @param timer
230 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
231 */
232 public AbstractTrafficShapingHandler(Timer timer) {
233 init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234 DEFAULT_CHECK_INTERVAL);
235 }
236
237 /**
238 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
239 *
240 * @param objectSizeEstimator
241 * the {@link ObjectSizeEstimator} that will be used to compute
242 * the size of the message
243 * @param timer
244 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
245 */
246 public AbstractTrafficShapingHandler(
247 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
249 }
250
251 /**
252 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
253 *
254 * @param timer
255 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
256 * @param checkInterval
257 * The delay between two computations of performances for
258 * channels or 0 if no stats are to be computed
259 */
260 public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
261 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
262 }
263
264 /**
265 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
266 *
267 * @param objectSizeEstimator
268 * the {@link ObjectSizeEstimator} that will be used to compute
269 * the size of the message
270 * @param timer
271 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
272 * @param checkInterval
273 * The delay between two computations of performances for
274 * channels or 0 if no stats are to be computed
275 */
276 public AbstractTrafficShapingHandler(
277 ObjectSizeEstimator objectSizeEstimator, Timer timer,
278 long checkInterval) {
279 init(objectSizeEstimator, timer, 0, 0, checkInterval);
280 }
281
282 /**
283 * Change the underlying limitations and check interval.
284 *
285 * @param newWriteLimit
286 * @param newReadLimit
287 * @param newCheckInterval
288 */
289 public void configure(long newWriteLimit, long newReadLimit,
290 long newCheckInterval) {
291 this.configure(newWriteLimit, newReadLimit);
292 this.configure(newCheckInterval);
293 }
294
295 /**
296 * Change the underlying limitations.
297 *
298 * @param newWriteLimit
299 * @param newReadLimit
300 */
301 public void configure(long newWriteLimit, long newReadLimit) {
302 writeLimit = newWriteLimit;
303 readLimit = newReadLimit;
304 if (trafficCounter != null) {
305 trafficCounter.resetAccounting(System.currentTimeMillis()+1);
306 }
307 }
308
309 /**
310 * Change the check interval.
311 *
312 * @param newCheckInterval
313 */
314 public void configure(long newCheckInterval) {
315 checkInterval = newCheckInterval;
316 if (trafficCounter != null) {
317 trafficCounter.configure(checkInterval);
318 }
319 }
320
321 /**
322 * Called each time the accounting is computed from the TrafficCounters.
323 * This method could be used for instance to implement almost real time accounting.
324 *
325 * @param counter
326 * the TrafficCounter that computes its performance
327 */
328 protected void doAccounting(TrafficCounter counter) {
329 // NOOP by default
330 }
331
332 /**
333 * Class to implement setReadable at fix time
334 */
335 private class ReopenReadTimerTask implements TimerTask {
336 ChannelHandlerContext ctx;
337 ReopenReadTimerTask(ChannelHandlerContext ctx) {
338 this.ctx = ctx;
339 }
340 public void run(Timeout timeoutArg) throws Exception {
341 //logger.warn("Start RRTT: "+release.get());
342 if (release.get()) {
343 return;
344 }
345 /*
346 logger.warn("WAKEUP! "+
347 (ctx != null && ctx.getChannel() != null &&
348 ctx.getChannel().isConnected()));
349 */
350 if (ctx != null && ctx.getChannel() != null &&
351 ctx.getChannel().isConnected()) {
352 //logger.warn(" setReadable TRUE: ");
353 // readSuspended = false;
354 ctx.setAttachment(null);
355 ctx.getChannel().setReadable(true);
356 }
357 }
358 }
359 /**
360 *
361 * @return the time that should be necessary to wait to respect limit. Can
362 * be negative time
363 */
364 private long getTimeToWait(long limit, long bytes, long lastTime,
365 long curtime) {
366 long interval = curtime - lastTime;
367 if (interval == 0) {
368 // Time is too short, so just lets continue
369 return 0;
370 }
371 return ((bytes * 1000 / limit - interval)/10)*10;
372 }
373
374 @Override
375 public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
376 throws Exception {
377 try {
378 long curtime = System.currentTimeMillis();
379 long size = objectSizeEstimator.estimateSize(arg1.getMessage());
380 if (trafficCounter != null) {
381 trafficCounter.bytesRecvFlowControl(arg0, size);
382 if (readLimit == 0) {
383 // no action
384 return;
385 }
386 // compute the number of ms to wait before reopening the channel
387 long wait = getTimeToWait(readLimit, trafficCounter
388 .getCurrentReadBytes(), trafficCounter.getLastTime(),
389 curtime);
390 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
391 Channel channel = arg0.getChannel();
392 // try to limit the traffic
393 if (channel != null && channel.isConnected()) {
394 // Channel version
395 if (timer == null) {
396 // Sleep since no executor
397 //logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
398 if (release.get()) {
399 return;
400 }
401 Thread.sleep(wait);
402 return;
403 }
404 if (arg0.getAttachment() == null) {
405 // readSuspended = true;
406 arg0.setAttachment(Boolean.TRUE);
407 channel.setReadable(false);
408 //logger.warn("Read will wakeup after "+wait+" ms "+this);
409 TimerTask timerTask = new ReopenReadTimerTask(arg0);
410 timeout =
411 timer.newTimeout(timerTask, wait, TimeUnit.MILLISECONDS);
412 } else {
413 // should be waiting: but can occurs sometime so as a FIX
414 //logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
415 if (release.get()) {
416 return;
417 }
418 Thread.sleep(wait);
419 }
420 } else {
421 // Not connected or no channel
422 //logger.warn("Read sleep "+wait+" ms for "+this);
423 if (release.get()) {
424 return;
425 }
426 Thread.sleep(wait);
427 }
428 }
429 }
430 } finally {
431 // The message is then just passed to the next handler
432 super.messageReceived(arg0, arg1);
433 }
434 }
435
436 @Override
437 public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
438 throws Exception {
439 try {
440 long curtime = System.currentTimeMillis();
441 long size = objectSizeEstimator.estimateSize(arg1.getMessage());
442 if (trafficCounter != null) {
443 trafficCounter.bytesWriteFlowControl(size);
444 if (writeLimit == 0) {
445 return;
446 }
447 // compute the number of ms to wait before continue with the channel
448 long wait = getTimeToWait(writeLimit, trafficCounter.getCurrentWrittenBytes(),
449 trafficCounter.getLastTime(),
450 curtime);
451 if (wait >= MINIMAL_WAIT) {
452 // Global or Channel
453 if (release.get()) {
454 return;
455 }
456 Thread.sleep(wait);
457 }
458 }
459 } finally {
460 // The message is then just passed to the next handler
461 super.writeRequested(arg0, arg1);
462 }
463 }
464
465 @Override
466 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
467 throws Exception {
468 if (e instanceof ChannelStateEvent) {
469 ChannelStateEvent cse = (ChannelStateEvent) e;
470 if (cse.getState() == ChannelState.INTEREST_OPS &&
471 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
472
473 // setReadable(true) requested
474 boolean readSuspended = ctx.getAttachment() != null;
475 if (readSuspended) {
476 // Drop the request silently if this handler has
477 // set the flag.
478 e.getFuture().setSuccess();
479 return;
480 }
481 }
482 }
483 super.handleDownstream(ctx, e);
484 }
485
486 /**
487 *
488 * @return the current TrafficCounter (if
489 * channel is still connected)
490 */
491 public TrafficCounter getTrafficCounter() {
492 return trafficCounter;
493 }
494
495 /* (non-Javadoc)
496 * @see org.jboss.netty.util.ExternalResourceReleasable#releaseExternalResources()
497 */
498 public void releaseExternalResources() {
499 if (trafficCounter != null) {
500 trafficCounter.stop();
501 }
502 release.set(true);
503 if (timeout != null) {
504 timeout.cancel();
505 }
506 }
507
508 @Override
509 public String toString() {
510 return "TrafficShaping with Write Limit: " + writeLimit +
511 " Read Limit: " + readLimit + " and Counter: " +
512 (trafficCounter != null? trafficCounter.toString() : "none");
513 }
514 }