1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.exec.database.model;
22
23 import goldengate.common.database.DbPreparedStatement;
24 import goldengate.common.database.DbRequest;
25 import goldengate.common.database.DbSession;
26 import goldengate.common.database.exception.GoldenGateDatabaseNoConnectionException;
27 import goldengate.common.database.exception.GoldenGateDatabaseNoDataException;
28 import goldengate.common.database.exception.GoldenGateDatabaseSqlException;
29
30 import java.sql.SQLException;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import goldengate.ftp.exec.database.DbConstant;
34 import goldengate.ftp.exec.database.data.DbTransferLog;
35
36
37
38
39
40
41 public class DbModelMysql extends goldengate.common.database.model.DbModelMysql {
42
43
44
45
46
47
48
49 public DbModelMysql(String dbserver,
50 String dbuser, String dbpasswd) throws GoldenGateDatabaseNoConnectionException {
51 super(dbserver, dbuser, dbpasswd);
52 }
53 private final ReentrantLock lock = new ReentrantLock();
54
55 @Override
56 public void createTables(DbSession session) throws GoldenGateDatabaseNoConnectionException {
57
58 String createTableH2 = "CREATE TABLE IF NOT EXISTS ";
59 String primaryKey = " PRIMARY KEY ";
60 String notNull = " NOT NULL ";
61
62 DbRequest request = new DbRequest(session);
63
64 String action = createTableH2 + DbTransferLog.table + "(";
65 DbTransferLog.Columns[] acolumns = DbTransferLog.Columns.values();
66 for (int i = 0; i < acolumns.length; i ++) {
67 action += acolumns[i].name() +
68 DBType.getType(DbTransferLog.dbTypes[i]) + notNull + ", ";
69 }
70
71 action += " CONSTRAINT TRANSLOG_PK " + primaryKey + "(";
72 for (int i = DbTransferLog.NBPRKEY; i > 1; i--) {
73 action += acolumns[acolumns.length - i].name() + ",";
74 }
75 action += acolumns[acolumns.length - 1].name() + "))";
76 System.out.println(action);
77 try {
78 request.query(action);
79 } catch (GoldenGateDatabaseNoConnectionException e) {
80 e.printStackTrace();
81 return;
82 } catch (GoldenGateDatabaseSqlException e) {
83 e.printStackTrace();
84 return;
85 } finally {
86 request.close();
87 }
88
89 action = "CREATE INDEX IDX_TRANSLOG ON "+ DbTransferLog.table + "(";
90 DbTransferLog.Columns[] icolumns = DbTransferLog.indexes;
91 for (int i = 0; i < icolumns.length-1; i ++) {
92 action += icolumns[i].name()+ ", ";
93 }
94 action += icolumns[icolumns.length-1].name()+ ")";
95 System.out.println(action);
96 try {
97 request.query(action);
98 } catch (GoldenGateDatabaseNoConnectionException e) {
99 e.printStackTrace();
100 return;
101 } catch (GoldenGateDatabaseSqlException e) {
102 return;
103 } finally {
104 request.close();
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 action = "CREATE TABLE Sequences (name VARCHAR(22) NOT NULL PRIMARY KEY,"+
128 "seq BIGINT NOT NULL)";
129 System.out.println(action);
130 try {
131 request.query(action);
132 } catch (GoldenGateDatabaseNoConnectionException e) {
133 e.printStackTrace();
134 return;
135 } catch (GoldenGateDatabaseSqlException e) {
136 e.printStackTrace();
137 return;
138 } finally {
139 request.close();
140 }
141 action = "INSERT INTO Sequences (name, seq) VALUES ('"+DbTransferLog.fieldseq+"', "+
142 (DbConstant.ILLEGALVALUE + 1)+")";
143 System.out.println(action);
144 try {
145 request.query(action);
146 } catch (GoldenGateDatabaseNoConnectionException e) {
147 e.printStackTrace();
148 return;
149 } catch (GoldenGateDatabaseSqlException e) {
150 e.printStackTrace();
151 return;
152 } finally {
153 request.close();
154 }
155 }
156
157
158
159
160
161
162 @Override
163 public void resetSequence(DbSession session, long newvalue) throws GoldenGateDatabaseNoConnectionException {
164 String action = "UPDATE Sequences SET seq = " + newvalue+
165 " WHERE name = '"+ DbTransferLog.fieldseq + "'";
166 DbRequest request = new DbRequest(session);
167 try {
168 request.query(action);
169 } catch (GoldenGateDatabaseNoConnectionException e) {
170 e.printStackTrace();
171 return;
172 } catch (GoldenGateDatabaseSqlException e) {
173 e.printStackTrace();
174 return;
175 } finally {
176 request.close();
177 }
178 System.out.println(action);
179 }
180
181
182
183
184
185
186 @Override
187 public synchronized long nextSequence(DbSession dbSession)
188 throws GoldenGateDatabaseNoConnectionException,
189 GoldenGateDatabaseSqlException, GoldenGateDatabaseNoDataException {
190 lock.lock();
191 try {
192 long result = DbConstant.ILLEGALVALUE;
193 String action = "SELECT seq FROM Sequences WHERE name = '" +
194 DbTransferLog.fieldseq + "' FOR UPDATE";
195 DbPreparedStatement preparedStatement = new DbPreparedStatement(
196 dbSession);
197 try {
198 dbSession.conn.setAutoCommit(false);
199 } catch (SQLException e1) {
200 }
201 try {
202 preparedStatement.createPrepareStatement(action);
203
204 preparedStatement.executeQuery();
205 if (preparedStatement.getNext()) {
206 try {
207 result = preparedStatement.getResultSet().getLong(1);
208 } catch (SQLException e) {
209 throw new GoldenGateDatabaseSqlException(e);
210 }
211 } else {
212 throw new GoldenGateDatabaseNoDataException(
213 "No sequence found. Must be initialized first");
214 }
215 } finally {
216 preparedStatement.realClose();
217 }
218 action = "UPDATE Sequences SET seq = "+(result+1)+
219 " WHERE name = '"+DbTransferLog.fieldseq+"'";
220 try {
221 preparedStatement.createPrepareStatement(action);
222
223 preparedStatement.executeUpdate();
224 } finally {
225 preparedStatement.realClose();
226 }
227 return result;
228 } finally {
229 try {
230 dbSession.conn.setAutoCommit(true);
231 } catch (SQLException e1) {
232 }
233 lock.unlock();
234 }
235 }
236 }