1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 import org.jboss.netty.channel.ChannelHandlerContext;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.Timer;
25 import org.jboss.netty.util.TimerTask;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 public class TrafficCounter {
41
42
43
44 private final AtomicLong currentWrittenBytes = new AtomicLong(0);
45
46
47
48
49 private final AtomicLong currentReadBytes = new AtomicLong(0);
50
51
52
53
54 private final AtomicLong cumulativeWrittenBytes = new AtomicLong(0);
55
56
57
58
59 private final AtomicLong cumulativeReadBytes = new AtomicLong(0);
60
61
62
63
64 private long lastCumulativeTime;
65
66
67
68
69 private long lastWriteThroughput = 0;
70
71
72
73
74 private long lastReadThroughput = 0;
75
76
77
78
79 private final AtomicLong lastTime = new AtomicLong(0);
80
81
82
83
84 private long lastWrittenBytes = 0;
85
86
87
88
89 private long lastReadBytes = 0;
90
91
92
93
94 AtomicLong checkInterval = new AtomicLong(
95 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
96
97
98
99
100
101
102 private final String name;
103
104
105
106
107 private final AbstractTrafficShapingHandler trafficShapingHandler;
108
109
110
111
112 private final Timer timer;
113
114
115
116 private TimerTask timerTask;
117
118
119
120 volatile private Timeout timeout = null;
121
122
123
124
125 AtomicBoolean monitorActive = new AtomicBoolean(false);
126
127
128
129
130
131 private static class TrafficMonitoringTask implements TimerTask {
132
133
134
135 private final AbstractTrafficShapingHandler trafficShapingHandler1;
136
137
138
139
140 private final TrafficCounter counter;
141
142
143
144
145
146 protected TrafficMonitoringTask(
147 AbstractTrafficShapingHandler trafficShapingHandler,
148 TrafficCounter counter) {
149 trafficShapingHandler1 = trafficShapingHandler;
150 this.counter = counter;
151 }
152
153 public void run(Timeout timeout) throws Exception {
154 if (!counter.monitorActive.get()) {
155 return;
156 }
157 long endTime = System.currentTimeMillis();
158 counter.resetAccounting(endTime);
159 if (trafficShapingHandler1 != null) {
160 trafficShapingHandler1.doAccounting(counter);
161 }
162 timeout =
163 counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
164 }
165 }
166
167
168
169
170
171 public void start() {
172 synchronized (lastTime) {
173 if (monitorActive.get()) {
174 return;
175 }
176 lastTime.set(System.currentTimeMillis());
177 if (checkInterval.get() > 0) {
178 monitorActive.set(true);
179 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
180 timeout =
181 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
182 }
183 }
184 }
185
186
187
188
189
190 public void stop() {
191 synchronized (lastTime) {
192 if (!monitorActive.get()) {
193 return;
194 }
195 monitorActive.set(false);
196 resetAccounting(System.currentTimeMillis());
197 if (trafficShapingHandler != null) {
198 trafficShapingHandler.doAccounting(this);
199 }
200 if (timeout != null) {
201 timeout.cancel();
202 }
203 }
204 }
205
206
207
208
209
210
211 void resetAccounting(long newLastTime) {
212 synchronized (lastTime) {
213 long interval = newLastTime - lastTime.getAndSet(newLastTime);
214 if (interval == 0) {
215
216 return;
217 }
218 lastReadBytes = currentReadBytes.getAndSet(0);
219 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
220 lastReadThroughput = lastReadBytes / interval * 1000;
221
222 lastWriteThroughput = lastWrittenBytes / interval * 1000;
223
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238 public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
239 Timer timer, String name, long checkInterval) {
240 this.trafficShapingHandler = trafficShapingHandler;
241 this.timer = timer;
242 this.name = name;
243 lastCumulativeTime = System.currentTimeMillis();
244 configure(checkInterval);
245 }
246
247
248
249
250
251
252
253 public void configure(long newcheckInterval) {
254 long newInterval = (newcheckInterval/10)*10;
255 if (checkInterval.get() != newInterval) {
256 checkInterval.set(newInterval);
257 if (newInterval <= 0) {
258 stop();
259
260 lastTime.set(System.currentTimeMillis());
261 } else {
262
263 start();
264 }
265 }
266 }
267
268
269
270
271
272
273
274
275
276
277 void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv)
278 throws InterruptedException {
279 currentReadBytes.addAndGet(recv);
280 cumulativeReadBytes.addAndGet(recv);
281 }
282
283
284
285
286
287
288
289
290 void bytesWriteFlowControl(long write) throws InterruptedException {
291 currentWrittenBytes.addAndGet(write);
292 cumulativeWrittenBytes.addAndGet(write);
293 }
294
295
296
297
298
299
300 public long getCheckInterval() {
301 return checkInterval.get();
302 }
303
304
305
306
307
308 public long getLastReadThroughput() {
309 return lastReadThroughput;
310 }
311
312
313
314
315
316 public long getLastWriteThroughput() {
317 return lastWriteThroughput;
318 }
319
320
321
322
323
324 public long getLastReadBytes() {
325 return lastReadBytes;
326 }
327
328
329
330
331
332 public long getLastWrittenBytes() {
333 return lastWrittenBytes;
334 }
335
336
337
338
339
340 public long getCurrentReadBytes() {
341 return currentReadBytes.get();
342 }
343
344
345
346
347
348 public long getCurrentWrittenBytes() {
349 return currentWrittenBytes.get();
350 }
351
352
353
354
355 public long getLastTime() {
356 return lastTime.get();
357 }
358
359
360
361
362 public long getCumulativeWrittenBytes() {
363 return cumulativeWrittenBytes.get();
364 }
365
366
367
368
369 public long getCumulativeReadBytes() {
370 return cumulativeReadBytes.get();
371 }
372
373
374
375
376
377 public long getLastCumulativeTime() {
378 return lastCumulativeTime;
379 }
380
381
382
383
384 public void resetCumulativeTime() {
385 lastCumulativeTime = System.currentTimeMillis();
386 cumulativeReadBytes.set(0);
387 cumulativeWrittenBytes.set(0);
388 }
389
390
391
392
393 public String getName() {
394 return name;
395 }
396
397
398
399
400 @Override
401 public String toString() {
402 return "Monitor " + name + " Current Speed Read: " +
403 (lastReadThroughput >> 10) + " KB/s, Write: " +
404 (lastWriteThroughput >> 10) + " KB/s Current Read: " +
405 (currentReadBytes.get() >> 10) + " KB Current Write: " +
406 (currentWrittenBytes.get() >> 10) + " KB";
407 }
408 }