1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.protocol.test;
22
23 import goldengate.common.database.DbSession;
24 import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
25 import goldengate.common.database.exception.GoldenGateDatabaseException;
26 import goldengate.common.file.DataBlock;
27 import goldengate.common.logging.GgInternalLoggerFactory;
28 import openr66.client.RecvThroughHandler;
29 import openr66.client.SendThroughClient;
30 import openr66.commander.ClientRunner;
31 import openr66.context.ErrorCode;
32 import openr66.context.R66Result;
33 import openr66.context.task.exception.OpenR66RunnerErrorException;
34 import openr66.database.DbConstant;
35 import openr66.database.data.DbRule;
36 import openr66.database.data.DbTaskRunner;
37 import openr66.protocol.configuration.Configuration;
38 import openr66.protocol.exception.OpenR66DatabaseGlobalException;
39 import openr66.protocol.exception.OpenR66Exception;
40 import openr66.protocol.exception.OpenR66ProtocolBusinessException;
41 import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
42 import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
43 import openr66.protocol.exception.OpenR66ProtocolPacketException;
44 import openr66.protocol.exception.OpenR66ProtocolSystemException;
45 import openr66.protocol.localhandler.LocalChannelReference;
46 import openr66.protocol.localhandler.packet.RequestPacket;
47 import openr66.protocol.networkhandler.NetworkTransaction;
48 import openr66.protocol.utils.R66Future;
49
50 import org.jboss.netty.buffer.ChannelBuffer;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public class TestSendThroughForward extends SendThroughClient {
88 public TestRecvThroughForwardHandler handler;
89 public DbSession dbSession;
90 public volatile boolean foundEOF = false;
91 protected DbTaskRunner sourceRunner;
92
93 public static class TestRecvThroughForwardHandler extends RecvThroughHandler {
94
95 protected TestSendThroughForward client;
96
97
98
99 @Override
100 public void writeChannelBuffer(ChannelBuffer buffer)
101 throws OpenR66ProtocolBusinessException {
102 DataBlock block = new DataBlock();
103 if (buffer.readableBytes() <= 0) {
104
105 block.setEOF(true);
106 } else {
107 block.setBlock(buffer);
108 }
109 try {
110 client.writeWhenPossible(block).await();
111 } catch (OpenR66RunnerErrorException e) {
112 client.transferInError(e);
113 } catch (OpenR66ProtocolPacketException e) {
114 client.transferInError(e);
115 } catch (OpenR66ProtocolSystemException e) {
116 client.transferInError(e);
117 } catch (InterruptedException e) {
118 client.transferInError(new OpenR66ProtocolSystemException(e));
119 }
120 if (block.isEOF()) {
121 client.finalizeRequest();
122 client.foundEOF = true;
123 }
124 }
125
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 public TestSendThroughForward(R66Future future, String remoteHost,
142 String filename, String rulename, String fileinfo, boolean isMD5,
143 int blocksize, NetworkTransaction networkTransaction, long idt,
144 DbSession dbSession, DbTaskRunner runner) {
145 super(future, remoteHost, filename, rulename, fileinfo, isMD5, blocksize,
146 idt, networkTransaction);
147 handler = new TestRecvThroughForwardHandler();
148 handler.client = this;
149 this.dbSession = dbSession;
150 this.sourceRunner = runner;
151 }
152
153
154
155
156 @Override
157 public boolean initiateRequest() {
158 if (logger == null) {
159 logger = GgInternalLoggerFactory.getLogger(TestSendThroughForward.class);
160 }
161 DbRule rule;
162 try {
163 rule = new DbRule(DbConstant.admin.session, rulename);
164 } catch (GoldenGateDatabaseException e) {
165 logger.error("Cannot get Rule: "+rulename, e);
166 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
167 ErrorCode.Internal,null));
168 future.setFailure(e);
169 return false;
170 }
171 int mode = rule.mode;
172 if (isMD5) {
173 mode = RequestPacket.getModeMD5(mode);
174 }
175 RequestPacket request = new RequestPacket(rulename,
176 mode, filename, blocksize, sourceRunner.getRank(),
177 id, fileinfo);
178
179 boolean isSender = true;
180 try {
181 try {
182
183 taskRunner =
184 new DbTaskRunner(DbConstant.admin.session,rule,isSender,request,remoteHost,null);
185 } catch (GoldenGateDatabaseException e) {
186 logger.error("Cannot get task", e);
187 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
188 ErrorCode.Internal, null));
189 future.setFailure(e);
190 return false;
191 }
192 ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, future);
193 runner.setRecvThroughHandler(handler);
194 runner.setSendThroughMode();
195 OpenR66ProtocolNotYetConnectionException exc = null;
196 for (int i = 0; i < Configuration.RETRYNB; i++) {
197 try {
198 localChannelReference = runner.initRequest();
199 exc = null;
200 break;
201 } catch (OpenR66RunnerErrorException e) {
202 logger.error("Cannot Transfer", e);
203 future.setResult(new R66Result(e, null, true,
204 ErrorCode.Internal, taskRunner));
205 future.setFailure(e);
206 return false;
207 } catch (OpenR66ProtocolNoConnectionException e) {
208 logger.error("Cannot Connect", e);
209 future.setResult(new R66Result(e, null, true,
210 ErrorCode.ConnectionImpossible, taskRunner));
211 future.setFailure(e);
212 return false;
213 } catch (OpenR66ProtocolPacketException e) {
214 logger.error("Bad Protocol", e);
215 future.setResult(new R66Result(e, null, true,
216 ErrorCode.TransferError, taskRunner));
217 future.setFailure(e);
218 return false;
219 } catch (OpenR66ProtocolNotYetConnectionException e) {
220 logger.debug("Not Yet Connected", e);
221 exc = e;
222 continue;
223 }
224 }
225 if (exc!= null) {
226 taskRunner.setLocalChannelReference(new LocalChannelReference());
227 logger.error("Cannot Connect", exc);
228 future.setResult(new R66Result(exc, null, true,
229 ErrorCode.ConnectionImpossible, taskRunner));
230 future.setFailure(exc);
231 return false;
232 }
233 try {
234 localChannelReference.waitReadyForSendThrough();
235 } catch (OpenR66Exception e) {
236 logger.error("Cannot Transfer", e);
237 future.setResult(new R66Result(e, null, true,
238 ErrorCode.Internal, taskRunner));
239 future.setFailure(e);
240 return false;
241 }
242 if (taskRunner.getRank() < sourceRunner.getRank()) {
243 sourceRunner.setRankAtStartup(taskRunner.getRank());
244 }
245
246 return true;
247 } finally {
248 if (taskRunner != null) {
249
250
251 if (future.isFailed()) {
252 taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
253 try {
254 taskRunner.update();
255 } catch (GoldenGateDatabaseException e) {
256 }
257 }
258 }
259 }
260 }
261
262
263
264
265 @Override
266 public void finalizeRequest() {
267 if (foundEOF) {
268 return;
269 }
270 super.finalizeRequest();
271 }
272
273
274
275
276 @Override
277 public void transferInError(OpenR66Exception e) {
278 super.transferInError(e);
279 }
280
281 }