1 package test.udp.traffic;
2
3 import java.net.SocketAddress;
4 import java.util.Collections;
5 import java.util.LinkedList;
6 import java.util.Random;
7 import java.util.concurrent.TimeUnit;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.Lock;
10 import java.util.concurrent.locks.ReentrantLock;
11
12 import org.jboss.netty.buffer.ChannelBuffer;
13 import org.jboss.netty.buffer.ChannelBuffers;
14 import org.jboss.netty.channel.Channel;
15 import org.jboss.netty.channel.ChannelFuture;
16 import org.jboss.netty.channel.ChannelHandlerContext;
17 import org.jboss.netty.channel.DownstreamMessageEvent;
18 import org.jboss.netty.channel.MessageEvent;
19 import org.jboss.netty.util.HashedWheelTimer;
20 import org.jboss.netty.util.Timeout;
21 import org.jboss.netty.util.Timer;
22 import org.jboss.netty.util.TimerTask;
23
24 public class LeakyBucketHandler
25 {
26 public enum OverflowStrategy
27 {
28 WAIT,
29 DISCARD,
30 IGNORE,
31 ERROR
32 }
33
34
35 class QueueEvent
36 {
37 MessageEvent _e;
38 ChannelHandlerContext _ctx;
39 }
40
41
42 int _bucketSize = 2*1024;
43
44 OverflowStrategy _overflowStrategy = OverflowStrategy.WAIT;
45
46 double _maxOutputRate = 1024;
47
48 int _maxBufferSize = 512;
49
50
51 int _minBufferSplit = 512;
52
53 long _timerTick = 100;
54
55
56 Timer _timer = new HashedWheelTimer();
57 LinkedList<MessageEvent> _queue = new LinkedList<MessageEvent>();
58 volatile int _currentBucketSize;
59 Lock _dataLock = new ReentrantLock();
60 Condition _queueNotEmpty = _dataLock.newCondition();
61 Condition _bucketNotFull = _dataLock.newCondition();
62 volatile int _waitingSize;
63 volatile boolean _stoped = true;
64 volatile Timeout _timeout;
65 volatile int _waitingTicks = 1;
66 volatile long _maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;
67
68 TimerTask _timerTickTask = new TimerTask()
69 {
70
71 @Override
72 public void run(Timeout timeout) throws Exception
73 {
74 if (_stoped)
75 return;
76 doTimeTick();
77 }
78
79 };
80
81 synchronized public void submit(MessageEvent e)
82 {
83 System.out.println("+ input "+((ChannelBuffer)e.getMessage()).readableBytes());
84 _dataLock.lock();
85 try
86 {
87 ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
88
89 if (_currentBucketSize == 0 || buffer.readableBytes() + _currentBucketSize < _bucketSize)
90 {
91
92 queue(e);
93 }
94 else
95
96 doOverflow(e);
97 }
98 finally
99 {
100 _dataLock.unlock();
101 }
102 System.out.println("- input");
103 }
104
105 private void queue(MessageEvent e)
106 {
107
108 System.out.println("+ q");
109 ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
110 _currentBucketSize += buffer.readableBytes();
111 _queue.addFirst(e);
112 _queueNotEmpty.signal();
113 System.out.println("- q");
114 }
115
116 private void doOverflow(MessageEvent e)
117 {
118
119 switch (_overflowStrategy)
120 {
121 case WAIT: doWait(e); queue(e); break;
122 case DISCARD: break;
123 case IGNORE: queue(e);
124 case ERROR: throw new RuntimeException("Bucket Overflow");
125 }
126 }
127
128 private void doWait(MessageEvent e)
129 {
130 System.out.println("+ w");
131 ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
132 _waitingSize = buffer.readableBytes();
133 try
134 {
135 _bucketNotFull.await();
136 }
137 catch (InterruptedException e1)
138 {
139 e1.printStackTrace();
140 }
141 _waitingSize = 0;
142 System.out.println("- w");
143 }
144
145 private void doTimeTick()
146 {
147 System.out.println("+ t");
148 _dataLock.lock();
149 try
150 {
151 while (_queue.isEmpty())
152 try
153 {
154 System.out.println("emtpy");
155 _queueNotEmpty.await();
156 }
157 catch (InterruptedException e1)
158 {
159 e1.printStackTrace();
160 }
161 System.out.println("working");
162
163 long max = _maxBytesPerTick * _waitingTicks;
164 System.out.println("max "+max);
165
166 while (max > 0 && !_queue.isEmpty())
167 {
168 MessageEvent e = _queue.getLast();
169 ChannelBuffer b = (ChannelBuffer) e.getMessage();
170
171 int toSendSize = (int) Math.min(max, b.readableBytes());
172 toSendSize = Math.min(toSendSize, _maxBufferSize);
173
174 if (toSendSize < _minBufferSplit && toSendSize != b.readableBytes() && toSendSize != _maxBufferSize)
175 {
176
177 _waitingTicks++;
178 break;
179 }
180 _waitingTicks = 1;
181
182 if (toSendSize == b.readableBytes())
183 {
184
185 _queue.removeLast();
186 System.out.println("remove buffer");
187 output(e, b);
188 }
189 else
190 {
191
192
193
194 ChannelBuffer toSendBuffer = b.slice(b.writerIndex()-toSendSize, toSendSize);
195
196 b.writerIndex(b.writerIndex()-toSendSize);
197 output(e, toSendBuffer);
198 }
199 max -= toSendSize;
200 _currentBucketSize -= toSendSize;
201 }
202
203 if (_waitingSize != 0 && _waitingSize+_currentBucketSize<_bucketSize)
204 _bucketNotFull.signal();
205
206 }
207 finally
208 {
209 _dataLock.unlock();
210 }
211 if (!_stoped)
212 setTimer();
213 System.out.println("- t");
214 }
215
216 private void setTimer()
217 {
218 _timeout = _timer.newTimeout(_timerTickTask, _timerTick, TimeUnit.MILLISECONDS);
219 }
220
221 public void start()
222 {
223 if (!_stoped)
224 return;
225 _stoped = false;
226 setTimer();
227 }
228
229 public void stop()
230 {
231 _stoped = true;
232 if (_timeout != null)
233 _timeout.cancel();
234 _timeout = null;
235
236 }
237
238 private void output(MessageEvent e, ChannelBuffer b)
239 {
240
241 System.out.println("output "+b.readableBytes());
242 }
243
244 public void setMaxRate(double rate)
245 {
246 _maxOutputRate = rate;
247 _maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;
248 }
249
250 public void setTimerTick(long timerTick)
251 {
252 _timerTick = timerTick;
253 _maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;
254 }
255
256 static class TestMessageEvent implements MessageEvent
257 {
258 Object _m;
259 TestMessageEvent(ChannelBuffer b)
260 {
261 _m = b;
262 }
263
264 @Override
265 public Object getMessage()
266 {
267
268 return _m;
269 }
270
271 @Override
272 public SocketAddress getRemoteAddress()
273 {
274
275 return null;
276 }
277
278 @Override
279 public Channel getChannel()
280 {
281
282 return null;
283 }
284
285 @Override
286 public ChannelFuture getFuture()
287 {
288
289 return null;
290 }
291
292 }
293
294 public static void main(String[] args)
295 {
296 ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
297 for (int i = 0; i< 200; i++)
298 {
299 cb.writeInt(i);
300 }
301 LeakyBucketHandler h = new LeakyBucketHandler();
302 h.start();
303 Random r = new Random(System.currentTimeMillis());
304 while (true)
305 {
306 int size = r.nextInt(800);
307 if (size == 0)
308 size = 10;
309 MessageEvent e = new TestMessageEvent(ChannelBuffers.wrappedBuffer(cb).slice(0, size));
310 h.submit(e);
311 try
312 {
313 long s = (long) (500*r.nextFloat());
314 s = (s/10)*10;
315 System.out.println("sleep "+s);
316 Thread.sleep(s);
317 }
318 catch (InterruptedException e1)
319 {
320
321 e1.printStackTrace();
322 }
323
324 }
325 }
326
327
328 }