View Javadoc

1   /**
2    * This file is part of GoldenGate Project (named also GoldenGate or GG).
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author
5    * tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    * 
8    * All GoldenGate Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   * 
13   * GoldenGate is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Public License along with
18   * GoldenGate . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package goldengate.common.database;
21  
22  import java.sql.Connection;
23  import java.sql.SQLException;
24  import java.util.ArrayDeque;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.Semaphore;
28  import java.util.concurrent.TimeUnit;
29  
30  import javax.sql.ConnectionEvent;
31  import javax.sql.ConnectionEventListener;
32  import javax.sql.ConnectionPoolDataSource;
33  import javax.sql.PooledConnection;
34  
35  import org.jboss.netty.util.Timeout;
36  import org.jboss.netty.util.Timer;
37  import org.jboss.netty.util.TimerTask;
38  
39  /**
40   * 
41   * A simple standalone JDBC connection pool manager.
42   * <p/>
43   * The public methods of this class are thread-safe.
44   * <p/>
45   * Nothe that JDBC4 is needed and isValid() must be implemented (not yet in PostGre in April 2012)
46   * <p/>
47   * 
48   * @author Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland<br>
49   *         Multi-licensed: EPL/LGPL/MPL.
50   * @author Frederic Bregier <br>
51   *         Add TimerTask support to close after some "delay" any still connected
52   *         sessions
53   * 
54   */
55  public class DbConnectionPool {
56      private ConnectionPoolDataSource dataSource;
57  
58      private int maxConnections;
59  
60      private int timeout;
61  
62      private long timeOutForceClose = 300000; // 5 minutes
63  
64      // private PrintWriter logWriter;
65      private Semaphore semaphore;
66  
67      private Queue<Con> recycledConnections;
68  
69      private int activeConnections;
70  
71      private PoolConnectionEventListener poolConnectionEventListener;
72  
73      private boolean isDisposed;
74  
75      static class Con {
76          final PooledConnection pooledCon;
77  
78          long lastRecyle;
79  
80          Con(PooledConnection pooledCon) {
81              this.pooledCon = pooledCon;
82              lastRecyle = System.currentTimeMillis();
83          }
84  
85          @Override
86          public boolean equals(Object o) {
87              if (this == o) return true;
88              if (o == null || ! (o instanceof Con)) return false;
89  
90              Con con = (Con) o;
91  
92              return pooledCon.equals(con.pooledCon);
93          }
94  
95          @Override
96          public int hashCode() {
97              return pooledCon.hashCode();
98          }
99      }
100 
101     /**
102      * Class to check validity of connections in the pool
103      * @author Frederic Bregier
104      *
105      */
106     private static class TimerTaskCheckConnections implements TimerTask {
107         DbConnectionPool pool;
108         Timer timer;
109         long delay;
110 
111         /**
112          * 
113          * @param timer
114          * @param delay
115          * @param pool
116          */
117         private TimerTaskCheckConnections(Timer timer, long delay, DbConnectionPool pool) {
118             if (pool == null || timer == null || delay < 1000) {
119                 throw new IllegalArgumentException("Invalid values. Need pool, timer and delay >= 1000");
120             }
121             this.pool = pool;
122             this.timer = timer;
123             this.delay = delay;
124         }
125 
126         public void run(Timeout timeout) throws Exception {
127             Iterator<Con> conIterator = pool.recycledConnections.iterator();
128             long now = System.currentTimeMillis();
129             while (conIterator.hasNext()) {
130                 Con c = conIterator.next();
131                 if (c.lastRecyle + pool.timeOutForceClose < now) {
132                     conIterator.remove();
133                     pool.closeConnectionNoEx(c.pooledCon);
134                 } else {
135                     try {
136                         if (! c.pooledCon.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
137                             conIterator.remove();
138                             pool.closeConnectionNoEx(c.pooledCon);
139                         }
140                     } catch (SQLException e) {
141                         conIterator.remove();
142                         pool.closeConnectionNoEx(c.pooledCon);
143                     }
144                 }
145             }
146             timer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
147         }
148 
149     }
150 
151     /**
152      * Release all idle connections
153      */
154     public synchronized void freeIdleConnections() {
155         Iterator<Con> conIterator = recycledConnections.iterator();
156         long now = System.currentTimeMillis();
157         while (conIterator.hasNext()) {
158             Con c = conIterator.next();
159             if (c.lastRecyle + timeOutForceClose < now) {
160                 conIterator.remove();
161                 closeConnectionNoEx(c.pooledCon);
162             }
163         }
164     }
165 
166     /**
167      * Thrown in when no free connection becomes available within
168      * <code>timeout</code> seconds.
169      */
170     public static class TimeoutException extends RuntimeException {
171         private static final long serialVersionUID = 1;
172 
173         public TimeoutException() {
174             super("Timeout while waiting for a free database connection.");
175         }
176     }
177 
178     /**
179      * Constructs a MiniConnectionPoolManager object with no timeout and no
180      * limit.
181      * 
182      * @param dataSource
183      *            the data source for the connections.
184      */
185     public DbConnectionPool(ConnectionPoolDataSource dataSource) {
186         this(dataSource, 0, DbConstant.DELAYMAXCONNECTION);
187     }
188 
189     /**
190      * Constructs a MiniConnectionPoolManager object with no timeout and no
191      * limit.
192      * 
193      * @param dataSource
194      *            the data source for the connections.
195      * @param timer
196      * @param delay in ms period of time to check existing connections and limit to get a 
197      *          new connection
198      *            
199      */
200     public DbConnectionPool(ConnectionPoolDataSource dataSource, Timer timer, long delay) {
201         this(dataSource, 0, (int) (delay/1000));
202         timer.newTimeout(new TimerTaskCheckConnections(timer, delay, this), 
203                 delay, TimeUnit.MILLISECONDS);
204     }
205 
206 
207     /**
208      * Constructs a MiniConnectionPoolManager object with a timeout of 
209      * DbConstant.DELAYMAXCONNECTION seconds.
210      * 
211      * @param dataSource
212      *            the data source for the connections.
213      * @param maxConnections
214      *            the maximum number of connections. 0 means no limit
215      */
216     public DbConnectionPool(ConnectionPoolDataSource dataSource,
217             int maxConnections) {
218         this(dataSource, maxConnections, DbConstant.DELAYMAXCONNECTION);
219     }
220 
221     /**
222      * Constructs a ConnectionPool object.
223      * 
224      * @param dataSource
225      *            the data source for the connections.
226      * @param maxConnections
227      *            the maximum number of connections. 0 means no limit
228      * @param timeout
229      *            the maximum time in seconds to wait for a free connection.
230      */
231     public DbConnectionPool(ConnectionPoolDataSource dataSource,
232             int maxConnections, int timeout) {
233         this.dataSource = dataSource;
234         this.maxConnections = maxConnections;
235         this.timeout = timeout;
236         if (maxConnections != 0) {
237             // if (maxConnections < 1) throw new
238             // IllegalArgumentException("Invalid maxConnections value.");
239             if (timeout <= 0) {
240                 throw new IllegalArgumentException("Invalid timeout value.");
241             }
242             semaphore = new Semaphore(maxConnections, true);
243         }
244         recycledConnections = new ArrayDeque<Con>();
245         poolConnectionEventListener = new PoolConnectionEventListener();
246     }
247 
248     /**
249      * 
250      * @return the max number of connections
251      */
252     public int getMaxConnections() {
253         return this.maxConnections;
254     }
255     
256     /**
257      * 
258      * @return the Login Timeout in second
259      */
260     public long getLoginTimeout() {
261         return this.timeout;
262     }
263     
264     /**
265      * 
266      * @return the Force Close Timeout in ms
267      */
268     public long getTimeoutForceClose() {
269         return this.timeOutForceClose;
270     }
271 
272     /**
273      * Closes all unused pooled connections.
274      * 
275      * @throws java.sql.SQLException
276      *             //
277      */
278     public synchronized void dispose() throws SQLException {
279         if (isDisposed) return;
280         isDisposed = true;
281         SQLException e = null;
282         while (!recycledConnections.isEmpty()) {
283             Con c = recycledConnections.remove();
284             PooledConnection pconn = c.pooledCon;
285             try {
286                 pconn.close();
287             } catch (SQLException e2) {
288                 if (e == null) e = e2;
289             }
290         }
291         if (e != null) throw e;
292     }
293 
294     /**
295      * Retrieves a connection from the connection pool. If
296      * <code>maxConnections</code> connections are already in use, the method
297      * waits until a connection becomes available or <code>timeout</code>
298      * seconds elapsed. When the application is finished using the connection,
299      * it must close it in order to return it to the pool.
300      * 
301      * @return a new Connection object.
302      * @throws TimeoutException
303      *             when no connection becomes available within
304      *             <code>timeout</code> seconds.
305      * @throws java.sql.SQLException
306      *             //
307      */
308     public Connection getConnection() throws SQLException {
309         // This routine is unsynchronized, because semaphore.tryAcquire() may
310         // block.
311         synchronized (this) {
312             if (isDisposed)
313                 throw new IllegalStateException(
314                         "Connection pool has been disposed.");
315         }
316         if (semaphore != null) {
317             try {
318                 if (!semaphore.tryAcquire(timeout, TimeUnit.SECONDS))
319                     throw new TimeoutException();
320             } catch (InterruptedException e) {
321                 throw new RuntimeException(
322                         "Interrupted while waiting for a database connection.",
323                         e);
324             }
325         }
326         boolean ok = false;
327         try {
328             Connection conn = getConnection2();
329             ok = true;
330             return conn;
331         } finally {
332             if (semaphore != null) {
333                 if (!ok) semaphore.release();
334             }
335         }
336     }
337 
338     private synchronized Connection getConnection2() throws SQLException {
339         if (isDisposed)
340             throw new IllegalStateException(
341                     "Connection pool has been disposed."); // test again with
342                                                            // lock
343         long time = System.currentTimeMillis() + timeout*1000;
344         while (true) {
345             PooledConnection pconn;
346             if (!recycledConnections.isEmpty()) {
347                 pconn = recycledConnections.remove().pooledCon;
348             } else {
349                 pconn = dataSource.getPooledConnection();
350             }
351 
352             Connection conn = pconn.getConnection();
353             if (conn.isValid(DbConstant.VALIDTESTDURATION)) {
354                 activeConnections ++;
355                 pconn.addConnectionEventListener(poolConnectionEventListener);
356                 assertInnerState();
357                 return conn;
358             }
359             if (time > System.currentTimeMillis()) {
360                 // too long
361                 break;
362             }
363         }
364 
365         throw new SQLException("Could not get a valid connection before timeout");
366     }
367 
368     private synchronized void recycleConnection(PooledConnection pconn) {
369         if (isDisposed) {
370             disposeConnection(pconn);
371             return;
372         }
373         try {
374             if (!pconn.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
375                 disposeConnection(pconn);
376                 return;
377             }
378         } catch (SQLException e) {
379             disposeConnection(pconn);
380             return;
381         }
382         if (activeConnections <= 0) throw new AssertionError();
383         activeConnections --;
384         if (semaphore != null) {
385             semaphore.release();
386         }
387         recycledConnections.add(new Con(pconn));
388         assertInnerState();
389     }
390 
391     private synchronized void disposeConnection(PooledConnection pconn) {
392         if (activeConnections <= 0) throw new AssertionError();
393         activeConnections --;
394         if (semaphore != null) {
395             semaphore.release();
396         }
397         closeConnectionNoEx(pconn);
398         assertInnerState();
399     }
400 
401     private void closeConnectionNoEx(PooledConnection pconn) {
402         try {
403             pconn.close();
404         } catch (SQLException e) {
405             //
406         }
407     }
408 
409     private void assertInnerState() {
410         if (activeConnections < 0) throw new AssertionError();
411         if (semaphore != null) {
412             if (activeConnections + recycledConnections.size() > maxConnections)
413                 throw new AssertionError();
414             if (activeConnections + semaphore.availablePermits() > maxConnections)
415                 throw new AssertionError();
416         }
417     }
418 
419     private class PoolConnectionEventListener implements
420             ConnectionEventListener {
421         public void connectionClosed(ConnectionEvent event) {
422             PooledConnection pconn = (PooledConnection) event.getSource();
423             pconn.removeConnectionEventListener(this);
424             recycleConnection(pconn);
425         }
426 
427         public void connectionErrorOccurred(ConnectionEvent event) {
428             PooledConnection pconn = (PooledConnection) event.getSource();
429             pconn.removeConnectionEventListener(this);
430             disposeConnection(pconn);
431         }
432     }
433 
434     /**
435      * Returns the number of active (open) connections of this pool. This is the
436      * number of <code>Connection</code> objects that have been issued by
437      * {@link #getConnection()} for which <code>Connection.close()</code> has
438      * not yet been called.
439      * 
440      * @return the number of active connections.
441      */
442     public synchronized int getActiveConnections() {
443         return activeConnections;
444     }
445 }