1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelState;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.channel.SimpleChannelHandler;
28 import org.jboss.netty.logging.InternalLogger;
29 import org.jboss.netty.logging.InternalLoggerFactory;
30 import org.jboss.netty.util.DefaultObjectSizeEstimator;
31 import org.jboss.netty.util.ExternalResourceReleasable;
32 import org.jboss.netty.util.ObjectSizeEstimator;
33 import org.jboss.netty.util.Timeout;
34 import org.jboss.netty.util.Timer;
35 import org.jboss.netty.util.TimerTask;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public abstract class AbstractTrafficShapingHandler extends
64 SimpleChannelHandler implements ExternalResourceReleasable {
65
66
67
68 static InternalLogger logger = InternalLoggerFactory
69 .getInstance(AbstractTrafficShapingHandler.class);
70
71
72
73
74 public static final long DEFAULT_CHECK_INTERVAL = 1000;
75
76
77
78
79 private static final long MINIMAL_WAIT = 10;
80
81
82
83
84 protected TrafficCounter trafficCounter = null;
85
86
87
88
89 private ObjectSizeEstimator objectSizeEstimator = null;
90
91
92
93
94 protected Timer timer = null;
95
96
97
98 volatile private Timeout timeout = null;
99
100
101
102 private long writeLimit = 0;
103
104
105
106
107 private long readLimit = 0;
108
109
110
111
112 protected long checkInterval = DEFAULT_CHECK_INTERVAL;
113
114
115
116
117
118
119 private final AtomicBoolean release = new AtomicBoolean(false);
120
121
122
123
124
125
126
127
128
129 private void init(ObjectSizeEstimator newObjectSizeEstimator,
130 Timer newTimer, long newWriteLimit, long newReadLimit,
131 long newCheckInterval) {
132 objectSizeEstimator = newObjectSizeEstimator;
133 timer = newTimer;
134 writeLimit = newWriteLimit;
135 readLimit = newReadLimit;
136 checkInterval = newCheckInterval;
137
138 }
139
140
141
142
143
144 void setTrafficCounter(TrafficCounter newTrafficCounter) {
145 trafficCounter = newTrafficCounter;
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
162 long readLimit, long checkInterval) {
163 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
164 checkInterval);
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 public AbstractTrafficShapingHandler(
184 ObjectSizeEstimator objectSizeEstimator, Timer timer,
185 long writeLimit, long readLimit, long checkInterval) {
186 init(objectSizeEstimator, timer, writeLimit, readLimit,
187 checkInterval);
188 }
189
190
191
192
193
194
195
196
197
198
199
200 public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201 long readLimit) {
202 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203 DEFAULT_CHECK_INTERVAL);
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 public AbstractTrafficShapingHandler(
220 ObjectSizeEstimator objectSizeEstimator, Timer timer,
221 long writeLimit, long readLimit) {
222 init(objectSizeEstimator, timer, writeLimit, readLimit,
223 DEFAULT_CHECK_INTERVAL);
224 }
225
226
227
228
229
230
231
232 public AbstractTrafficShapingHandler(Timer timer) {
233 init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234 DEFAULT_CHECK_INTERVAL);
235 }
236
237
238
239
240
241
242
243
244
245
246 public AbstractTrafficShapingHandler(
247 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
249 }
250
251
252
253
254
255
256
257
258
259
260 public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
261 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
262 }
263
264
265
266
267
268
269
270
271
272
273
274
275
276 public AbstractTrafficShapingHandler(
277 ObjectSizeEstimator objectSizeEstimator, Timer timer,
278 long checkInterval) {
279 init(objectSizeEstimator, timer, 0, 0, checkInterval);
280 }
281
282
283
284
285
286
287
288
289 public void configure(long newWriteLimit, long newReadLimit,
290 long newCheckInterval) {
291 this.configure(newWriteLimit, newReadLimit);
292 this.configure(newCheckInterval);
293 }
294
295
296
297
298
299
300
301 public void configure(long newWriteLimit, long newReadLimit) {
302 writeLimit = newWriteLimit;
303 readLimit = newReadLimit;
304 if (trafficCounter != null) {
305 trafficCounter.resetAccounting(System.currentTimeMillis()+1);
306 }
307 }
308
309
310
311
312
313
314 public void configure(long newCheckInterval) {
315 checkInterval = newCheckInterval;
316 if (trafficCounter != null) {
317 trafficCounter.configure(checkInterval);
318 }
319 }
320
321
322
323
324
325
326
327
328 protected void doAccounting(TrafficCounter counter) {
329
330 }
331
332
333
334
335 private class ReopenReadTimerTask implements TimerTask {
336 ChannelHandlerContext ctx;
337 ReopenReadTimerTask(ChannelHandlerContext ctx) {
338 this.ctx = ctx;
339 }
340 public void run(Timeout timeoutArg) throws Exception {
341
342 if (release.get()) {
343 return;
344 }
345
346
347
348
349
350 if (ctx != null && ctx.getChannel() != null &&
351 ctx.getChannel().isConnected()) {
352
353
354 ctx.setAttachment(null);
355 ctx.getChannel().setReadable(true);
356 }
357 }
358 }
359
360
361
362
363
364 private long getTimeToWait(long limit, long bytes, long lastTime,
365 long curtime) {
366 long interval = curtime - lastTime;
367 if (interval == 0) {
368
369 return 0;
370 }
371 return ((bytes * 1000 / limit - interval)/10)*10;
372 }
373
374 @Override
375 public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
376 throws Exception {
377 try {
378 long curtime = System.currentTimeMillis();
379 long size = objectSizeEstimator.estimateSize(arg1.getMessage());
380 if (trafficCounter != null) {
381 trafficCounter.bytesRecvFlowControl(arg0, size);
382 if (readLimit == 0) {
383
384 return;
385 }
386
387 long wait = getTimeToWait(readLimit, trafficCounter
388 .getCurrentReadBytes(), trafficCounter.getLastTime(),
389 curtime);
390 if (wait >= MINIMAL_WAIT) {
391 Channel channel = arg0.getChannel();
392
393 if (channel != null && channel.isConnected()) {
394
395 if (timer == null) {
396
397
398 if (release.get()) {
399 return;
400 }
401 Thread.sleep(wait);
402 return;
403 }
404 if (arg0.getAttachment() == null) {
405
406 arg0.setAttachment(Boolean.TRUE);
407 channel.setReadable(false);
408
409 TimerTask timerTask = new ReopenReadTimerTask(arg0);
410 timeout =
411 timer.newTimeout(timerTask, wait, TimeUnit.MILLISECONDS);
412 } else {
413
414
415 if (release.get()) {
416 return;
417 }
418 Thread.sleep(wait);
419 }
420 } else {
421
422
423 if (release.get()) {
424 return;
425 }
426 Thread.sleep(wait);
427 }
428 }
429 }
430 } finally {
431
432 super.messageReceived(arg0, arg1);
433 }
434 }
435
436 @Override
437 public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
438 throws Exception {
439 try {
440 long curtime = System.currentTimeMillis();
441 long size = objectSizeEstimator.estimateSize(arg1.getMessage());
442 if (trafficCounter != null) {
443 trafficCounter.bytesWriteFlowControl(size);
444 if (writeLimit == 0) {
445 return;
446 }
447
448 long wait = getTimeToWait(writeLimit, trafficCounter.getCurrentWrittenBytes(),
449 trafficCounter.getLastTime(),
450 curtime);
451 if (wait >= MINIMAL_WAIT) {
452
453 if (release.get()) {
454 return;
455 }
456 Thread.sleep(wait);
457 }
458 }
459 } finally {
460
461 super.writeRequested(arg0, arg1);
462 }
463 }
464
465 @Override
466 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
467 throws Exception {
468 if (e instanceof ChannelStateEvent) {
469 ChannelStateEvent cse = (ChannelStateEvent) e;
470 if (cse.getState() == ChannelState.INTEREST_OPS &&
471 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
472
473
474 boolean readSuspended = ctx.getAttachment() != null;
475 if (readSuspended) {
476
477
478 e.getFuture().setSuccess();
479 return;
480 }
481 }
482 }
483 super.handleDownstream(ctx, e);
484 }
485
486
487
488
489
490
491 public TrafficCounter getTrafficCounter() {
492 return trafficCounter;
493 }
494
495
496
497
498 public void releaseExternalResources() {
499 if (trafficCounter != null) {
500 trafficCounter.stop();
501 }
502 release.set(true);
503 if (timeout != null) {
504 timeout.cancel();
505 }
506 }
507
508 @Override
509 public String toString() {
510 return "TrafficShaping with Write Limit: " + writeLimit +
511 " Read Limit: " + readLimit + " and Counter: " +
512 (trafficCounter != null? trafficCounter.toString() : "none");
513 }
514 }