View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import java.util.concurrent.TimeUnit;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelState;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.MessageEvent;
27  import org.jboss.netty.channel.SimpleChannelHandler;
28  import org.jboss.netty.logging.InternalLogger;
29  import org.jboss.netty.logging.InternalLoggerFactory;
30  import org.jboss.netty.util.DefaultObjectSizeEstimator;
31  import org.jboss.netty.util.ExternalResourceReleasable;
32  import org.jboss.netty.util.ObjectSizeEstimator;
33  import org.jboss.netty.util.Timeout;
34  import org.jboss.netty.util.Timer;
35  import org.jboss.netty.util.TimerTask;
36  
37  /**
38   * AbstractTrafficShapingHandler allows to limit the global bandwidth
39   * (see {@link GlobalTrafficShapingHandler}) or per session
40   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41   * It allows too to implement an almost real time monitoring of the bandwidth using
42   * the monitors from {@link TrafficCounter} that will call back every checkInterval
43   * the method doAccounting of this handler.<br>
44   * <br>
45   *
46   * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47   * is the size of the object to be read or write accordingly to the type of
48   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49   *
50   * If you want for any particular reasons to stop the monitoring (accounting) or to change
51   * the read/write limit or the check interval, several methods allow that for you:<br>
52   * <ul>
53   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56   * <li></li>
57   * </ul>
58   *
59   * @author The Netty Project (netty-dev@lists.jboss.org)
60   * @author Frederic Bregier
61   * @version $Rev: 1225 $, $Date: 2012-05-20 10:48:53 +0200 (dim., 20 mai 2012) $
62   */
63  public abstract class AbstractTrafficShapingHandler extends
64          SimpleChannelHandler implements ExternalResourceReleasable {
65      /**
66       * Internal logger
67       */
68      static InternalLogger logger = InternalLoggerFactory
69              .getInstance(AbstractTrafficShapingHandler.class);
70  
71      /**
72       * Default delay between two checks: 1s
73       */
74      public static final long DEFAULT_CHECK_INTERVAL = 1000;
75  
76      /**
77       * Default minimal time to wait
78       */
79      private static final long MINIMAL_WAIT = 10;
80  
81      /**
82       * Traffic Counter
83       */
84      protected TrafficCounter trafficCounter = null;
85  
86      /**
87       * ObjectSizeEstimator
88       */
89      private ObjectSizeEstimator objectSizeEstimator = null;
90  
91      /**
92       * Timer to associated to any TrafficCounter
93       */
94      protected Timer timer = null;
95      /**
96       * used in releaseExternalResources() to cancel the timer
97       */
98      volatile private Timeout timeout = null;
99      /**
100      * Limit in B/s to apply to write
101      */
102     private long writeLimit = 0;
103 
104     /**
105      * Limit in B/s to apply to read
106      */
107     private long readLimit = 0;
108 
109     /**
110      * Delay between two performance snapshots
111      */
112     protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
113 
114     /**
115      * Boolean associated with the release of this TrafficShapingHandler.
116      * It will be true only once when the releaseExternalRessources is called
117      * to prevent waiting when shutdown.
118      */
119     private final AtomicBoolean release = new AtomicBoolean(false);
120 
121     /**
122      * @param newObjectSizeEstimator
123      * @param newTimer
124      * @param newActive
125      * @param newWriteLimit
126      * @param newReadLimit
127      * @param newCheckInterval
128      */
129      private void init(ObjectSizeEstimator newObjectSizeEstimator,
130              Timer newTimer, long newWriteLimit, long newReadLimit,
131              long newCheckInterval) {
132          objectSizeEstimator = newObjectSizeEstimator;
133          timer = newTimer;
134          writeLimit = newWriteLimit;
135          readLimit = newReadLimit;
136          checkInterval = newCheckInterval;
137          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
138      }
139 
140     /**
141      *
142      * @param newTrafficCounter the TrafficCounter to set
143      */
144     void setTrafficCounter(TrafficCounter newTrafficCounter) {
145         trafficCounter = newTrafficCounter;
146     }
147 
148     /**
149      * Constructor using default {@link ObjectSizeEstimator}
150      *
151      * @param timer
152      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
153      * @param writeLimit
154      *          0 or a limit in bytes/s
155      * @param readLimit
156      *          0 or a limit in bytes/s
157      * @param checkInterval
158      *          The delay between two computations of performances for
159      *            channels or 0 if no stats are to be computed
160      */
161     public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
162             long readLimit, long checkInterval) {
163         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
164                 checkInterval);
165     }
166 
167     /**
168      * Constructor using the specified ObjectSizeEstimator
169      *
170      * @param objectSizeEstimator
171      *            the {@link ObjectSizeEstimator} that will be used to compute
172      *            the size of the message
173      * @param timer
174      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
175      * @param writeLimit
176      *          0 or a limit in bytes/s
177      * @param readLimit
178      *          0 or a limit in bytes/s
179      * @param checkInterval
180      *          The delay between two computations of performances for
181      *            channels or 0 if no stats are to be computed
182      */
183     public AbstractTrafficShapingHandler(
184             ObjectSizeEstimator objectSizeEstimator, Timer timer,
185             long writeLimit, long readLimit, long checkInterval) {
186         init(objectSizeEstimator, timer, writeLimit, readLimit,
187                 checkInterval);
188     }
189 
190     /**
191      * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
192      *
193      * @param timer
194      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
195      * @param writeLimit
196      *          0 or a limit in bytes/s
197      * @param readLimit
198      *          0 or a limit in bytes/s
199      */
200     public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201             long readLimit) {
202         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203                 DEFAULT_CHECK_INTERVAL);
204     }
205 
206     /**
207      * Constructor using the specified ObjectSizeEstimator and using default Check Interval
208      *
209      * @param objectSizeEstimator
210      *            the {@link ObjectSizeEstimator} that will be used to compute
211      *            the size of the message
212      * @param timer
213      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
214      * @param writeLimit
215      *          0 or a limit in bytes/s
216      * @param readLimit
217      *          0 or a limit in bytes/s
218      */
219     public AbstractTrafficShapingHandler(
220             ObjectSizeEstimator objectSizeEstimator, Timer timer,
221             long writeLimit, long readLimit) {
222         init(objectSizeEstimator, timer, writeLimit, readLimit,
223                 DEFAULT_CHECK_INTERVAL);
224     }
225 
226     /**
227      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
228      *
229      * @param timer
230      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
231      */
232     public AbstractTrafficShapingHandler(Timer timer) {
233         init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234                 DEFAULT_CHECK_INTERVAL);
235     }
236 
237     /**
238      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
239      *
240      * @param objectSizeEstimator
241      *            the {@link ObjectSizeEstimator} that will be used to compute
242      *            the size of the message
243      * @param timer
244      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
245      */
246     public AbstractTrafficShapingHandler(
247             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248         init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
249     }
250 
251     /**
252      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
253      *
254      * @param timer
255      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
256      * @param checkInterval
257      *          The delay between two computations of performances for
258      *            channels or 0 if no stats are to be computed
259      */
260     public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
261         init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
262     }
263 
264     /**
265      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
266      *
267      * @param objectSizeEstimator
268      *            the {@link ObjectSizeEstimator} that will be used to compute
269      *            the size of the message
270      * @param timer
271      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
272      * @param checkInterval
273      *          The delay between two computations of performances for
274      *            channels or 0 if no stats are to be computed
275      */
276     public AbstractTrafficShapingHandler(
277             ObjectSizeEstimator objectSizeEstimator, Timer timer,
278             long checkInterval) {
279         init(objectSizeEstimator, timer, 0, 0, checkInterval);
280     }
281 
282     /**
283      * Change the underlying limitations and check interval.
284      *
285      * @param newWriteLimit
286      * @param newReadLimit
287      * @param newCheckInterval
288      */
289     public void configure(long newWriteLimit, long newReadLimit,
290             long newCheckInterval) {
291         this.configure(newWriteLimit, newReadLimit);
292         this.configure(newCheckInterval);
293     }
294 
295     /**
296      * Change the underlying limitations.
297      *
298      * @param newWriteLimit
299      * @param newReadLimit
300      */
301     public void configure(long newWriteLimit, long newReadLimit) {
302         writeLimit = newWriteLimit;
303         readLimit = newReadLimit;
304         if (trafficCounter != null) {
305             trafficCounter.resetAccounting(System.currentTimeMillis()+1);
306         }
307     }
308 
309     /**
310      * Change the check interval.
311      *
312      * @param newCheckInterval
313      */
314     public void configure(long newCheckInterval) {
315         checkInterval = newCheckInterval;
316         if (trafficCounter != null) {
317             trafficCounter.configure(checkInterval);
318         }
319     }
320 
321     /**
322      * Called each time the accounting is computed from the TrafficCounters.
323      * This method could be used for instance to implement almost real time accounting.
324      *
325      * @param counter
326      *            the TrafficCounter that computes its performance
327      */
328     protected void doAccounting(TrafficCounter counter) {
329         // NOOP by default
330     }
331 
332     /**
333      * Class to implement setReadable at fix time
334      */
335     private class ReopenReadTimerTask implements TimerTask {
336         ChannelHandlerContext ctx;
337         ReopenReadTimerTask(ChannelHandlerContext ctx) {
338             this.ctx = ctx;
339         }
340         public void run(Timeout timeoutArg) throws Exception {
341             //logger.warn("Start RRTT: "+release.get());
342             if (release.get()) {
343                 return;
344             }
345             /*
346             logger.warn("WAKEUP! "+
347                     (ctx != null && ctx.getChannel() != null &&
348                             ctx.getChannel().isConnected()));
349              */
350             if (ctx != null && ctx.getChannel() != null &&
351                     ctx.getChannel().isConnected()) {
352                 //logger.warn(" setReadable TRUE: ");
353                 // readSuspended = false;
354                 ctx.setAttachment(null);
355                 ctx.getChannel().setReadable(true);
356             }
357         }
358     }
359     /**
360     *
361     * @return the time that should be necessary to wait to respect limit. Can
362     *         be negative time
363     */
364     private long getTimeToWait(long limit, long bytes, long lastTime,
365             long curtime) {
366         long interval = curtime - lastTime;
367         if (interval == 0) {
368             // Time is too short, so just lets continue
369             return 0;
370         }
371         return ((bytes * 1000 / limit - interval)/10)*10;
372     }
373 
374     @Override
375     public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
376             throws Exception {
377         try {
378             long curtime = System.currentTimeMillis();
379             long size = objectSizeEstimator.estimateSize(arg1.getMessage());
380             if (trafficCounter != null) {
381                 trafficCounter.bytesRecvFlowControl(arg0, size);
382                 if (readLimit == 0) {
383                     // no action
384                     return;
385                 }
386                 // compute the number of ms to wait before reopening the channel
387                 long wait = getTimeToWait(readLimit, trafficCounter
388                         .getCurrentReadBytes(), trafficCounter.getLastTime(),
389                         curtime);
390                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
391                     Channel channel = arg0.getChannel();
392                     // try to limit the traffic
393                     if (channel != null && channel.isConnected()) {
394                         // Channel version
395                         if (timer == null) {
396                             // Sleep since no executor
397                             //logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
398                             if (release.get()) {
399                                 return;
400                             }
401                             Thread.sleep(wait);
402                             return;
403                         }
404                         if (arg0.getAttachment() == null) {
405                             // readSuspended = true;
406                             arg0.setAttachment(Boolean.TRUE);
407                             channel.setReadable(false);
408                             //logger.warn("Read will wakeup after "+wait+" ms "+this);
409                             TimerTask timerTask = new ReopenReadTimerTask(arg0);
410                             timeout = 
411                                 timer.newTimeout(timerTask, wait, TimeUnit.MILLISECONDS);
412                         } else {
413                             // should be waiting: but can occurs sometime so as a FIX
414                             //logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
415                             if (release.get()) {
416                                 return;
417                             }
418                             Thread.sleep(wait);
419                         }
420                     } else {
421                         // Not connected or no channel
422                         //logger.warn("Read sleep "+wait+" ms for "+this);
423                         if (release.get()) {
424                             return;
425                         }
426                         Thread.sleep(wait);
427                     }
428                 }
429             }
430         } finally {
431             // The message is then just passed to the next handler
432             super.messageReceived(arg0, arg1);
433         }
434     }
435 
436     @Override
437     public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
438             throws Exception {
439         try {
440             long curtime = System.currentTimeMillis();
441             long size = objectSizeEstimator.estimateSize(arg1.getMessage());
442             if (trafficCounter != null) {
443                 trafficCounter.bytesWriteFlowControl(size);
444                 if (writeLimit == 0) {
445                     return;
446                 }
447                 // compute the number of ms to wait before continue with the channel
448                 long wait = getTimeToWait(writeLimit, trafficCounter.getCurrentWrittenBytes(), 
449                         trafficCounter.getLastTime(),
450                         curtime);
451                 if (wait >= MINIMAL_WAIT) {
452                     // Global or Channel
453                     if (release.get()) {
454                         return;
455                     }
456                     Thread.sleep(wait);
457                 }
458             }
459         } finally {
460             // The message is then just passed to the next handler
461             super.writeRequested(arg0, arg1);
462         }
463     }
464 
465     @Override
466     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
467             throws Exception {
468         if (e instanceof ChannelStateEvent) {
469             ChannelStateEvent cse = (ChannelStateEvent) e;
470             if (cse.getState() == ChannelState.INTEREST_OPS &&
471                     (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
472 
473                 // setReadable(true) requested
474                 boolean readSuspended = ctx.getAttachment() != null;
475                 if (readSuspended) {
476                     // Drop the request silently if this handler has
477                     // set the flag.
478                     e.getFuture().setSuccess();
479                     return;
480                 }
481             }
482         }
483         super.handleDownstream(ctx, e);
484     }
485 
486     /**
487      *
488      * @return the current TrafficCounter (if
489      *         channel is still connected)
490      */
491     public TrafficCounter getTrafficCounter() {
492         return trafficCounter;
493     }
494 
495     /* (non-Javadoc)
496      * @see org.jboss.netty.util.ExternalResourceReleasable#releaseExternalResources()
497      */
498     public void releaseExternalResources() {
499         if (trafficCounter != null) {
500             trafficCounter.stop();
501         }
502         release.set(true);
503         if (timeout != null) {
504             timeout.cancel();
505         }
506     }
507 
508     @Override
509     public String toString() {
510         return "TrafficShaping with Write Limit: " + writeLimit +
511                 " Read Limit: " + readLimit + " and Counter: " +
512                 (trafficCounter != null? trafficCounter.toString() : "none");
513     }
514 }