View Javadoc

1   package test.udp.traffic;
2   
3   import java.net.SocketAddress;
4   import java.util.Collections;
5   import java.util.LinkedList;
6   import java.util.Random;
7   import java.util.concurrent.TimeUnit;
8   import java.util.concurrent.locks.Condition;
9   import java.util.concurrent.locks.Lock;
10  import java.util.concurrent.locks.ReentrantLock;
11  
12  import org.jboss.netty.buffer.ChannelBuffer;
13  import org.jboss.netty.buffer.ChannelBuffers;
14  import org.jboss.netty.channel.Channel;
15  import org.jboss.netty.channel.ChannelFuture;
16  import org.jboss.netty.channel.ChannelHandlerContext;
17  import org.jboss.netty.channel.DownstreamMessageEvent;
18  import org.jboss.netty.channel.MessageEvent;
19  import org.jboss.netty.util.HashedWheelTimer;
20  import org.jboss.netty.util.Timeout;
21  import org.jboss.netty.util.Timer;
22  import org.jboss.netty.util.TimerTask;
23  
24  public class LeakyBucketHandler
25  {
26  	public enum OverflowStrategy
27  	{
28  		WAIT,
29  		DISCARD,
30  		IGNORE,
31  		ERROR
32  	}
33  	
34  	// TODO object to queue: ctx and message event, so that we can send up/down pipline using ctx
35  	class QueueEvent
36  	{
37  		MessageEvent _e;
38  		ChannelHandlerContext _ctx;
39  	}
40  	
41  	// maximal bucket size before overflow
42  	int _bucketSize = 2*1024;
43  	// what do we do if bucket overflows ?
44  	OverflowStrategy _overflowStrategy = OverflowStrategy.WAIT;
45  	// maximal output flow rate in bytes/s
46  	double _maxOutputRate = 1024;
47  	 // limit tx buffer size (required for mobile udp)
48  	int _maxBufferSize = 512;
49  	// avoid output of small buffers per tick. if required wait some ticks before output
50  	// output buffers smaller than minBufferSplit, only for complete buffers from the queue.
51  	int _minBufferSplit = 512;
52  	// bucket leak interval
53  	long _timerTick = 100;
54  
55  	
56  	Timer _timer = new HashedWheelTimer(); //TODO will be set in constructor
57  	LinkedList<MessageEvent> _queue = new LinkedList<MessageEvent>();
58  	volatile int _currentBucketSize;
59  	Lock _dataLock = new ReentrantLock();
60  	Condition _queueNotEmpty = _dataLock.newCondition();
61  	Condition _bucketNotFull = _dataLock.newCondition();
62  	volatile int _waitingSize;
63  	volatile boolean _stoped = true;
64  	volatile Timeout _timeout;
65  	volatile int _waitingTicks = 1;
66  	volatile long _maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;
67  
68  	TimerTask _timerTickTask = new TimerTask()
69  	{
70  
71  		@Override
72  		public void run(Timeout timeout) throws Exception
73  		{
74  			if (_stoped)
75  				return;
76  			doTimeTick();
77  		}
78  		
79  	};
80  	
81  	synchronized public void submit(MessageEvent e)
82  	{
83  		System.out.println("+ input "+((ChannelBuffer)e.getMessage()).readableBytes());
84  		_dataLock.lock();
85  		try
86  		{
87  		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
88  		// do we have enough space in the buffer ?
89  		if (_currentBucketSize == 0 || buffer.readableBytes() + _currentBucketSize < _bucketSize)
90  		{
91  			// yes ! queue the data
92  			queue(e);
93  		}
94  		else
95  			// no! handle overflow
96  			doOverflow(e);
97  		}
98  		finally
99  		{
100 			_dataLock.unlock();
101 		}
102 		System.out.println("- input");
103 	}
104 
105 	private void queue(MessageEvent e)
106 	{
107 		// note: we are safe, lock was set in caller
108 		System.out.println("+ q");
109 		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
110 		_currentBucketSize += buffer.readableBytes();
111 		_queue.addFirst(e);
112 		_queueNotEmpty.signal();	
113 		System.out.println("- q");
114 	}
115 
116 	private void doOverflow(MessageEvent e)
117 	{
118 		// TODO log overflow
119 		switch (_overflowStrategy)
120 		{
121 		case WAIT: doWait(e); queue(e); break;
122 		case DISCARD: break;
123 		case IGNORE: queue(e);
124 		case ERROR: throw new RuntimeException("Bucket Overflow");
125 		}
126 	}
127 
128 	private void doWait(MessageEvent e)
129 	{
130 		System.out.println("+ w");
131 		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
132 		_waitingSize = buffer.readableBytes();
133 		try
134 		{
135 			_bucketNotFull.await();
136 		}
137 		catch (InterruptedException e1)
138 		{
139 			e1.printStackTrace();
140 		}
141 		_waitingSize = 0;
142 		System.out.println("- w");
143 	}
144 	
145 	private void doTimeTick()
146 	{
147 		System.out.println("+ t");
148 		_dataLock.lock();
149 		try
150 		{
151 			while (_queue.isEmpty())
152 				try
153 				{
154 					System.out.println("emtpy");
155 					_queueNotEmpty.await();
156 				}
157 				catch (InterruptedException e1)
158 				{
159 					e1.printStackTrace();
160 				}
161 			System.out.println("working");
162 			// max bytes we can send in this tick
163 			long max = _maxBytesPerTick * _waitingTicks;
164 			System.out.println("max  "+max);
165 			// TODO check if channel writeable
166 			while (max > 0 && !_queue.isEmpty())
167 			{
168 				MessageEvent e = _queue.getLast();
169 				ChannelBuffer b = (ChannelBuffer) e.getMessage();
170 				// how many bytes will we send in this tick ?
171 				int toSendSize = (int) Math.min(max, b.readableBytes());
172 				toSendSize = Math.min(toSendSize, _maxBufferSize);
173 				// if we need to split the current buffer make sure the split is not too small
174 				if (toSendSize < _minBufferSplit && toSendSize != b.readableBytes() && toSendSize != _maxBufferSize)
175 				{
176 					// wait for next tick and allow more bytes in next tick
177 					_waitingTicks++;
178 					break;
179 				}
180 				_waitingTicks = 1;
181 				// do we need to split the current buffer ?
182 				if (toSendSize == b.readableBytes())
183 				{
184 					// no! output the complete buffer
185 					_queue.removeLast();
186 					System.out.println("remove buffer");
187 					output(e, b);
188 				}
189 				else
190 				{
191 					// yes, split the buffer
192 					// output the tail of the current buffer
193 					// TODO when spliting: handle future correctly: create new one
194 					ChannelBuffer toSendBuffer = b.slice(b.writerIndex()-toSendSize, toSendSize);
195 					// keep the rest of the buffer
196 					b.writerIndex(b.writerIndex()-toSendSize);
197 					output(e, toSendBuffer);
198 				}
199 				max -= toSendSize;
200 				_currentBucketSize -= toSendSize;
201 			}
202 			// is someone waiting to add a buffer to the queue ?
203 			if (_waitingSize != 0 && _waitingSize+_currentBucketSize<_bucketSize)
204 				_bucketNotFull.signal();				
205 			
206 		}
207 		finally
208 		{
209 			_dataLock.unlock();
210 		}
211 		if (!_stoped)
212 			setTimer();
213 		System.out.println("- t");
214 	}
215 
216 	private void setTimer()
217 	{
218 		_timeout = _timer.newTimeout(_timerTickTask, _timerTick, TimeUnit.MILLISECONDS);
219 	}
220 	
221 	public void start()
222 	{
223 		if (!_stoped)
224 			return;
225 		_stoped = false;
226 		setTimer();
227 	}
228 	
229 	public void stop()
230 	{
231 		_stoped = true;
232 		if (_timeout != null)
233 			_timeout.cancel();
234 		_timeout = null;
235 		
236 	}
237 
238 	private void output(MessageEvent e, ChannelBuffer b)
239 	{
240 		// TODO output up/downstream using QueueEvent
241 		System.out.println("output "+b.readableBytes());
242 	}
243 	
244 	public void setMaxRate(double rate)
245 	{
246 		_maxOutputRate  = rate;
247 		_maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;
248 	}
249 	
250 	public void setTimerTick(long timerTick)
251 	{
252 		_timerTick = timerTick;
253 		_maxBytesPerTick = (long)(_maxOutputRate * _timerTick) / 1000;		
254 	}
255 
256 	static class TestMessageEvent implements MessageEvent
257 	{
258 		Object _m;
259 		TestMessageEvent(ChannelBuffer b)
260 		{
261 			_m = b;
262 		}
263 
264 		@Override
265 		public Object getMessage()
266 		{
267 			// TODO Auto-generated method stub
268 			return _m;
269 		}
270 
271 		@Override
272 		public SocketAddress getRemoteAddress()
273 		{
274 			// TODO Auto-generated method stub
275 			return null;
276 		}
277 
278 		@Override
279 		public Channel getChannel()
280 		{
281 			// TODO Auto-generated method stub
282 			return null;
283 		}
284 
285 		@Override
286 		public ChannelFuture getFuture()
287 		{
288 			// TODO Auto-generated method stub
289 			return null;
290 		}
291 		
292 	}
293 	
294 	public static void main(String[] args)
295 	{
296 		ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
297 		for (int i = 0; i< 200; i++)
298 		{
299 			cb.writeInt(i);
300 		}
301 		LeakyBucketHandler h = new LeakyBucketHandler();
302 		h.start();
303 		Random r = new Random(System.currentTimeMillis());
304 		while (true)
305 		{
306 			int size = r.nextInt(800);
307 			if (size == 0)
308 				size = 10;
309 			MessageEvent e = new TestMessageEvent(ChannelBuffers.wrappedBuffer(cb).slice(0, size));
310 			h.submit(e);
311 			try
312 			{
313 				long s = (long) (500*r.nextFloat());
314 				s = (s/10)*10;
315 				System.out.println("sleep "+s);
316 				Thread.sleep(s);
317 			}
318 			catch (InterruptedException e1)
319 			{
320 				// TODO Auto-generated catch block
321 				e1.printStackTrace();
322 			}
323 			
324 		}
325 	}
326 
327 
328 }