1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.client;
22
23 import goldengate.common.database.exception.GoldenGateDatabaseException;
24 import goldengate.common.file.DataBlock;
25 import goldengate.common.logging.GgInternalLoggerFactory;
26 import openr66.commander.ClientRunner;
27 import openr66.context.ErrorCode;
28 import openr66.context.R66FiniteDualStates;
29 import openr66.context.R66Result;
30 import openr66.context.task.exception.OpenR66RunnerErrorException;
31 import openr66.database.DbConstant;
32 import openr66.database.data.DbRule;
33 import openr66.database.data.DbTaskRunner;
34 import openr66.protocol.configuration.Configuration;
35 import openr66.protocol.exception.OpenR66DatabaseGlobalException;
36 import openr66.protocol.exception.OpenR66Exception;
37 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
38 import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
39 import openr66.protocol.exception.OpenR66ProtocolPacketException;
40 import openr66.protocol.exception.OpenR66ProtocolSystemException;
41 import openr66.protocol.localhandler.LocalChannelReference;
42 import openr66.protocol.localhandler.RetrieveRunner;
43 import openr66.protocol.localhandler.packet.EndRequestPacket;
44 import openr66.protocol.localhandler.packet.ErrorPacket;
45 import openr66.protocol.localhandler.packet.RequestPacket;
46 import openr66.protocol.networkhandler.NetworkTransaction;
47 import openr66.protocol.test.TestSendThroughClient;
48 import openr66.protocol.utils.ChannelUtils;
49 import openr66.protocol.utils.R66Future;
50
51 import org.jboss.netty.buffer.ChannelBuffers;
52 import org.jboss.netty.channel.ChannelFuture;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public abstract class SendThroughClient extends AbstractTransfer {
107 protected final NetworkTransaction networkTransaction;
108 protected LocalChannelReference localChannelReference;
109 protected DbTaskRunner taskRunner = null;
110
111
112
113
114
115
116
117
118
119
120
121 public SendThroughClient(R66Future future, String remoteHost,
122 String filename, String rulename, String fileinfo, boolean isMD5,
123 int blocksize, long id, NetworkTransaction networkTransaction) {
124 super(SendThroughClient.class,
125 future, filename, rulename, fileinfo, isMD5, remoteHost, blocksize, id, null);
126 this.networkTransaction = networkTransaction;
127 }
128
129
130
131 public void run() {
132 logger.error("DO NOT call this method for this class");
133 }
134
135
136
137
138
139
140
141 public boolean initiateRequest() {
142 if (logger == null) {
143 logger = GgInternalLoggerFactory.getLogger(SendThroughClient.class);
144 }
145 DbRule rule;
146 try {
147 rule = new DbRule(DbConstant.admin.session, rulename);
148 } catch (GoldenGateDatabaseException e) {
149 logger.error("Cannot get Rule: "+rulename, e);
150 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
151 ErrorCode.Internal,null));
152 future.setFailure(e);
153 return false;
154 }
155 int mode = rule.mode;
156 if (isMD5) {
157 mode = RequestPacket.getModeMD5(mode);
158 }
159 RequestPacket request = new RequestPacket(rulename,
160 mode, filename, blocksize, 0,
161 id, fileinfo);
162
163 boolean isSender = true;
164 try {
165 try {
166
167 taskRunner =
168 new DbTaskRunner(DbConstant.admin.session,rule,isSender,request,remoteHost, null);
169 } catch (GoldenGateDatabaseException e) {
170 logger.error("Cannot get task", e);
171 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
172 ErrorCode.Internal, null));
173 future.setFailure(e);
174 return false;
175 }
176 ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, future);
177 runner.setSendThroughMode();
178 OpenR66ProtocolNotYetConnectionException exc = null;
179 for (int i = 0; i < Configuration.RETRYNB; i++) {
180 try {
181 localChannelReference = runner.initRequest();
182 exc = null;
183 break;
184 } catch (OpenR66RunnerErrorException e) {
185 logger.error("Cannot Transfer", e);
186 future.setResult(new R66Result(e, null, true,
187 ErrorCode.Internal, taskRunner));
188 future.setFailure(e);
189 return false;
190 } catch (OpenR66ProtocolNoConnectionException e) {
191 logger.error("Cannot Connect", e);
192 future.setResult(new R66Result(e, null, true,
193 ErrorCode.ConnectionImpossible, taskRunner));
194 future.setFailure(e);
195 return false;
196 } catch (OpenR66ProtocolPacketException e) {
197 logger.error("Bad Protocol", e);
198 future.setResult(new R66Result(e, null, true,
199 ErrorCode.TransferError, taskRunner));
200 future.setFailure(e);
201 return false;
202 } catch (OpenR66ProtocolNotYetConnectionException e) {
203 logger.debug("Not Yet Connected", e);
204 exc = e;
205 continue;
206 }
207 }
208 if (exc!= null) {
209 taskRunner.setLocalChannelReference(new LocalChannelReference());
210 logger.error("Cannot Connect", exc);
211 future.setResult(new R66Result(exc, null, true,
212 ErrorCode.ConnectionImpossible, taskRunner));
213 future.setFailure(exc);
214 return false;
215 }
216 try {
217 localChannelReference.waitReadyForSendThrough();
218 } catch (OpenR66Exception e) {
219 logger.error("Cannot Transfer", e);
220 future.setResult(new R66Result(e, null, true,
221 ErrorCode.Internal, taskRunner));
222 future.setFailure(e);
223 return false;
224 }
225
226 return true;
227 } finally {
228 if (taskRunner != null) {
229 if (future.isFailed()) {
230 try {
231 taskRunner.delete();
232 } catch (GoldenGateDatabaseException e) {
233 }
234 }
235 }
236 }
237 }
238
239
240
241 public void finalizeRequest() {
242 try {
243 try {
244 ChannelUtils.writeEndTransfer(localChannelReference);
245 } catch (OpenR66ProtocolPacketException e) {
246
247 try {
248 localChannelReference.getSession().setFinalizeTransfer(
249 false,
250 new R66Result(e, localChannelReference.getSession(), false,
251 ErrorCode.Internal, taskRunner));
252 } catch (OpenR66RunnerErrorException e1) {
253 transferInError(e1);
254 return;
255 } catch (OpenR66ProtocolSystemException e1) {
256 transferInError(e1);
257 return;
258 }
259 }
260 localChannelReference.getFutureEndTransfer().awaitUninterruptibly();
261 logger.debug("Await future End Transfer done: " +
262 localChannelReference.getFutureEndTransfer().isSuccess());
263 if (localChannelReference.getFutureEndTransfer().isSuccess()) {
264
265 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
266 EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
267 if (localChannelReference.getSession().getExtendedProtocol() &&
268 localChannelReference.getSession().getBusinessObject() != null &&
269 localChannelReference.getSession().getBusinessObject().getInfo() != null && localChannelReference.getSession().getBusinessObject().getInfo() != null) {
270 validPacket.setOptional(localChannelReference.getSession().getBusinessObject().getInfo());
271 }
272 try {
273 ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
274 } catch (OpenR66ProtocolPacketException e) {
275 }
276 if (!localChannelReference.getFutureRequest().awaitUninterruptibly(
277 Configuration.configuration.TIMEOUTCON)) {
278
279 localChannelReference.validateRequest(localChannelReference.getFutureEndTransfer().getResult());
280 }
281 if (taskRunner != null && taskRunner.isSelfRequested()) {
282 ChannelUtils.close(localChannelReference.getLocalChannel());
283 }
284 } else {
285 transferInError(null);
286 }
287 } finally {
288 if (taskRunner != null) {
289 if ((future.isDone() && (!future.isSuccess())) || nolog) {
290 try {
291 taskRunner.delete();
292 } catch (GoldenGateDatabaseException e) {
293 }
294 }
295 }
296 }
297 }
298
299
300
301
302 public void transferInError(OpenR66Exception e) {
303 if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
304 R66Result result = new R66Result(e, localChannelReference.getSession(), true,
305 ErrorCode.TransferError, taskRunner);
306 logger.error("Transfer in error", e);
307 localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
308 ErrorPacket error = new ErrorPacket("Transfer in error",
309 ErrorCode.TransferError.getCode(), ErrorPacket.FORWARDCLOSECODE);
310 try {
311 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
312 } catch (OpenR66ProtocolPacketException e1) {
313 }
314 localChannelReference.invalidateRequest(result);
315 }
316 ChannelUtils.close(localChannelReference.getLocalChannel());
317 }
318
319
320
321
322
323
324
325
326
327 public ChannelFuture writeWhenPossible(DataBlock block)
328 throws OpenR66RunnerErrorException, OpenR66ProtocolPacketException, OpenR66ProtocolSystemException {
329 return RetrieveRunner.writeWhenPossible(block, localChannelReference);
330 }
331
332
333
334
335
336 public DataBlock transformToDataBlock(byte []data) {
337 DataBlock block = new DataBlock();
338 if (data == null) {
339
340 block.setEOF(true);
341 } else {
342 block.setBlock(ChannelBuffers.wrappedBuffer(data));
343 }
344 return block;
345 }
346 }