1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package goldengate.common.database;
21
22 import java.sql.Connection;
23 import java.sql.SQLException;
24 import java.util.ArrayDeque;
25 import java.util.Iterator;
26 import java.util.Queue;
27 import java.util.concurrent.Semaphore;
28 import java.util.concurrent.TimeUnit;
29
30 import javax.sql.ConnectionEvent;
31 import javax.sql.ConnectionEventListener;
32 import javax.sql.ConnectionPoolDataSource;
33 import javax.sql.PooledConnection;
34
35 import org.jboss.netty.util.Timeout;
36 import org.jboss.netty.util.Timer;
37 import org.jboss.netty.util.TimerTask;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class DbConnectionPool {
56 private ConnectionPoolDataSource dataSource;
57
58 private int maxConnections;
59
60 private int timeout;
61
62 private long timeOutForceClose = 300000;
63
64
65 private Semaphore semaphore;
66
67 private Queue<Con> recycledConnections;
68
69 private int activeConnections;
70
71 private PoolConnectionEventListener poolConnectionEventListener;
72
73 private boolean isDisposed;
74
75 static class Con {
76 final PooledConnection pooledCon;
77
78 long lastRecyle;
79
80 Con(PooledConnection pooledCon) {
81 this.pooledCon = pooledCon;
82 lastRecyle = System.currentTimeMillis();
83 }
84
85 @Override
86 public boolean equals(Object o) {
87 if (this == o) return true;
88 if (o == null || ! (o instanceof Con)) return false;
89
90 Con con = (Con) o;
91
92 return pooledCon.equals(con.pooledCon);
93 }
94
95 @Override
96 public int hashCode() {
97 return pooledCon.hashCode();
98 }
99 }
100
101
102
103
104
105
106 private static class TimerTaskCheckConnections implements TimerTask {
107 DbConnectionPool pool;
108 Timer timer;
109 long delay;
110
111
112
113
114
115
116
117 private TimerTaskCheckConnections(Timer timer, long delay, DbConnectionPool pool) {
118 if (pool == null || timer == null || delay < 1000) {
119 throw new IllegalArgumentException("Invalid values. Need pool, timer and delay >= 1000");
120 }
121 this.pool = pool;
122 this.timer = timer;
123 this.delay = delay;
124 }
125
126 public void run(Timeout timeout) throws Exception {
127 Iterator<Con> conIterator = pool.recycledConnections.iterator();
128 long now = System.currentTimeMillis();
129 while (conIterator.hasNext()) {
130 Con c = conIterator.next();
131 if (c.lastRecyle + pool.timeOutForceClose < now) {
132 conIterator.remove();
133 pool.closeConnectionNoEx(c.pooledCon);
134 } else {
135 try {
136 if (! c.pooledCon.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
137 conIterator.remove();
138 pool.closeConnectionNoEx(c.pooledCon);
139 }
140 } catch (SQLException e) {
141 conIterator.remove();
142 pool.closeConnectionNoEx(c.pooledCon);
143 }
144 }
145 }
146 timer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
147 }
148
149 }
150
151
152
153
154 public synchronized void freeIdleConnections() {
155 Iterator<Con> conIterator = recycledConnections.iterator();
156 long now = System.currentTimeMillis();
157 while (conIterator.hasNext()) {
158 Con c = conIterator.next();
159 if (c.lastRecyle + timeOutForceClose < now) {
160 conIterator.remove();
161 closeConnectionNoEx(c.pooledCon);
162 }
163 }
164 }
165
166
167
168
169
170 public static class TimeoutException extends RuntimeException {
171 private static final long serialVersionUID = 1;
172
173 public TimeoutException() {
174 super("Timeout while waiting for a free database connection.");
175 }
176 }
177
178
179
180
181
182
183
184
185 public DbConnectionPool(ConnectionPoolDataSource dataSource) {
186 this(dataSource, 0, DbConstant.DELAYMAXCONNECTION);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200 public DbConnectionPool(ConnectionPoolDataSource dataSource, Timer timer, long delay) {
201 this(dataSource, 0, (int) (delay/1000));
202 timer.newTimeout(new TimerTaskCheckConnections(timer, delay, this),
203 delay, TimeUnit.MILLISECONDS);
204 }
205
206
207
208
209
210
211
212
213
214
215
216 public DbConnectionPool(ConnectionPoolDataSource dataSource,
217 int maxConnections) {
218 this(dataSource, maxConnections, DbConstant.DELAYMAXCONNECTION);
219 }
220
221
222
223
224
225
226
227
228
229
230
231 public DbConnectionPool(ConnectionPoolDataSource dataSource,
232 int maxConnections, int timeout) {
233 this.dataSource = dataSource;
234 this.maxConnections = maxConnections;
235 this.timeout = timeout;
236 if (maxConnections != 0) {
237
238
239 if (timeout <= 0) {
240 throw new IllegalArgumentException("Invalid timeout value.");
241 }
242 semaphore = new Semaphore(maxConnections, true);
243 }
244 recycledConnections = new ArrayDeque<Con>();
245 poolConnectionEventListener = new PoolConnectionEventListener();
246 }
247
248
249
250
251
252 public int getMaxConnections() {
253 return this.maxConnections;
254 }
255
256
257
258
259
260 public long getLoginTimeout() {
261 return this.timeout;
262 }
263
264
265
266
267
268 public long getTimeoutForceClose() {
269 return this.timeOutForceClose;
270 }
271
272
273
274
275
276
277
278 public synchronized void dispose() throws SQLException {
279 if (isDisposed) return;
280 isDisposed = true;
281 SQLException e = null;
282 while (!recycledConnections.isEmpty()) {
283 Con c = recycledConnections.remove();
284 PooledConnection pconn = c.pooledCon;
285 try {
286 pconn.close();
287 } catch (SQLException e2) {
288 if (e == null) e = e2;
289 }
290 }
291 if (e != null) throw e;
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 public Connection getConnection() throws SQLException {
309
310
311 synchronized (this) {
312 if (isDisposed)
313 throw new IllegalStateException(
314 "Connection pool has been disposed.");
315 }
316 if (semaphore != null) {
317 try {
318 if (!semaphore.tryAcquire(timeout, TimeUnit.SECONDS))
319 throw new TimeoutException();
320 } catch (InterruptedException e) {
321 throw new RuntimeException(
322 "Interrupted while waiting for a database connection.",
323 e);
324 }
325 }
326 boolean ok = false;
327 try {
328 Connection conn = getConnection2();
329 ok = true;
330 return conn;
331 } finally {
332 if (semaphore != null) {
333 if (!ok) semaphore.release();
334 }
335 }
336 }
337
338 private synchronized Connection getConnection2() throws SQLException {
339 if (isDisposed)
340 throw new IllegalStateException(
341 "Connection pool has been disposed.");
342
343 long time = System.currentTimeMillis() + timeout*1000;
344 while (true) {
345 PooledConnection pconn;
346 if (!recycledConnections.isEmpty()) {
347 pconn = recycledConnections.remove().pooledCon;
348 } else {
349 pconn = dataSource.getPooledConnection();
350 }
351
352 Connection conn = pconn.getConnection();
353 if (conn.isValid(DbConstant.VALIDTESTDURATION)) {
354 activeConnections ++;
355 pconn.addConnectionEventListener(poolConnectionEventListener);
356 assertInnerState();
357 return conn;
358 }
359 if (time > System.currentTimeMillis()) {
360
361 break;
362 }
363 }
364
365 throw new SQLException("Could not get a valid connection before timeout");
366 }
367
368 private synchronized void recycleConnection(PooledConnection pconn) {
369 if (isDisposed) {
370 disposeConnection(pconn);
371 return;
372 }
373 try {
374 if (!pconn.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
375 disposeConnection(pconn);
376 return;
377 }
378 } catch (SQLException e) {
379 disposeConnection(pconn);
380 return;
381 }
382 if (activeConnections <= 0) throw new AssertionError();
383 activeConnections --;
384 if (semaphore != null) {
385 semaphore.release();
386 }
387 recycledConnections.add(new Con(pconn));
388 assertInnerState();
389 }
390
391 private synchronized void disposeConnection(PooledConnection pconn) {
392 if (activeConnections <= 0) throw new AssertionError();
393 activeConnections --;
394 if (semaphore != null) {
395 semaphore.release();
396 }
397 closeConnectionNoEx(pconn);
398 assertInnerState();
399 }
400
401 private void closeConnectionNoEx(PooledConnection pconn) {
402 try {
403 pconn.close();
404 } catch (SQLException e) {
405
406 }
407 }
408
409 private void assertInnerState() {
410 if (activeConnections < 0) throw new AssertionError();
411 if (semaphore != null) {
412 if (activeConnections + recycledConnections.size() > maxConnections)
413 throw new AssertionError();
414 if (activeConnections + semaphore.availablePermits() > maxConnections)
415 throw new AssertionError();
416 }
417 }
418
419 private class PoolConnectionEventListener implements
420 ConnectionEventListener {
421 public void connectionClosed(ConnectionEvent event) {
422 PooledConnection pconn = (PooledConnection) event.getSource();
423 pconn.removeConnectionEventListener(this);
424 recycleConnection(pconn);
425 }
426
427 public void connectionErrorOccurred(ConnectionEvent event) {
428 PooledConnection pconn = (PooledConnection) event.getSource();
429 pconn.removeConnectionEventListener(this);
430 disposeConnection(pconn);
431 }
432 }
433
434
435
436
437
438
439
440
441
442 public synchronized int getActiveConnections() {
443 return activeConnections;
444 }
445 }