1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package openr66.protocol.localhandler;
22
23 import goldengate.common.file.DataBlock;
24 import goldengate.common.logging.GgInternalLogger;
25 import goldengate.common.logging.GgInternalLoggerFactory;
26
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import openr66.context.ErrorCode;
30 import openr66.context.R66FiniteDualStates;
31 import openr66.context.R66Result;
32 import openr66.context.R66Session;
33 import openr66.context.task.exception.OpenR66RunnerErrorException;
34 import openr66.database.data.DbTaskRunner.TASKSTEP;
35 import openr66.protocol.configuration.Configuration;
36 import openr66.protocol.exception.OpenR66Exception;
37 import openr66.protocol.exception.OpenR66ProtocolPacketException;
38 import openr66.protocol.exception.OpenR66ProtocolSystemException;
39 import openr66.protocol.localhandler.packet.EndRequestPacket;
40 import openr66.protocol.localhandler.packet.ErrorPacket;
41 import openr66.protocol.networkhandler.NetworkTransaction;
42 import openr66.protocol.utils.ChannelUtils;
43
44 import org.jboss.netty.buffer.ChannelBuffers;
45 import org.jboss.netty.channel.Channel;
46 import org.jboss.netty.channel.ChannelFuture;
47
48
49
50
51
52
53 public class RetrieveRunner extends Thread {
54
55
56
57 private static final GgInternalLogger logger = GgInternalLoggerFactory
58 .getLogger(RetrieveRunner.class);
59
60 private final R66Session session;
61
62 private final LocalChannelReference localChannelReference;
63
64 private final Channel channel;
65
66 private boolean done = false;
67
68 protected AtomicBoolean running = new AtomicBoolean(true);
69
70 protected RetrieveRunner() {
71
72 this.session = null;
73 this.localChannelReference = null;
74 this.channel = null;
75 }
76
77
78
79
80
81 public RetrieveRunner(R66Session session, Channel channel) {
82 this.session = session;
83 localChannelReference = this.session.getLocalChannelReference();
84 this.channel = channel;
85 }
86
87
88
89
90 public void stopRunner() {
91 running.set(false);
92 }
93
94
95
96
97
98 @Override
99 public void run() {
100 boolean requestValidDone = false;
101 try {
102 Thread.currentThread().setName("RetrieveRunner: " + channel.getId());
103 try {
104 if (session.getRunner().getGloballaststep() == TASKSTEP.POSTTASK.ordinal()) {
105
106 try {
107 ChannelUtils.writeEndTransfer(localChannelReference);
108 } catch (OpenR66ProtocolPacketException e) {
109 transferInError(e);
110 logger.error("End Retrieve in Error");
111 return;
112 }
113 } else {
114 session.getFile().retrieveBlocking(running);
115 }
116 } catch (OpenR66RunnerErrorException e) {
117 transferInError(e);
118 logger.info("End Retrieve in Error");
119 return;
120 } catch (OpenR66ProtocolSystemException e) {
121 transferInError(e);
122 logger.info("End Retrieve in Error");
123 return;
124 }
125 if (running.get()) {
126 try {
127 localChannelReference.getFutureEndTransfer().await();
128 } catch (InterruptedException e1) {
129 }
130 }
131 logger.debug("Await future End Transfer done: " +
132 localChannelReference.getFutureEndTransfer().isSuccess());
133 if (localChannelReference.getFutureEndTransfer().isDone() &&
134 localChannelReference.getFutureEndTransfer().isSuccess()) {
135
136 requestValidDone = true;
137 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
138 EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
139 if (session.getExtendedProtocol() &&
140 session.getBusinessObject() != null &&
141 session.getBusinessObject().getInfo() != null) {
142 validPacket.setOptional(session.getBusinessObject().getInfo());
143 }
144 try {
145 ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
146 } catch (OpenR66ProtocolPacketException e) {
147 }
148 if (!localChannelReference.getFutureRequest().awaitUninterruptibly(
149 Configuration.configuration.TIMEOUTCON)) {
150
151 session.getRunner().setAllDone();
152 try {
153 session.getRunner().saveStatus();
154 } catch (OpenR66RunnerErrorException e) {
155
156 }
157 localChannelReference.validateRequest(localChannelReference.getFutureEndTransfer().getResult());
158 }
159 if (session.getRunner() != null && session.getRunner().isSelfRequested()) {
160 ChannelUtils.close(localChannelReference.getLocalChannel());
161 }
162 done = true;
163 } else {
164 if (localChannelReference.getFutureEndTransfer().isDone()) {
165
166 if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
167 localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
168 ErrorPacket error = new ErrorPacket(localChannelReference.getErrorMessage(),
169 localChannelReference.getFutureEndTransfer().getResult().code.getCode(),
170 ErrorPacket.FORWARDCLOSECODE);
171 try {
172 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
173 } catch (OpenR66ProtocolPacketException e) {
174 }
175 }
176 }
177 if (!localChannelReference.getFutureRequest().isDone()) {
178 R66Result result = localChannelReference.getFutureEndTransfer().getResult();
179 if (result == null) {
180 result =
181 new R66Result(session, false, ErrorCode.TransferError, session.getRunner());
182 }
183 localChannelReference.invalidateRequest(result);
184 }
185 done = true;
186 logger.info("End Retrieve in Error");
187 }
188 } finally {
189 if (!done) {
190 if (localChannelReference.getFutureEndTransfer().isDone() &&
191 localChannelReference.getFutureEndTransfer().isSuccess()) {
192 if (! requestValidDone) {
193 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
194 EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
195 if (session.getExtendedProtocol() &&
196 session.getBusinessObject() != null &&
197 session.getBusinessObject().getInfo() != null) {
198 validPacket.setOptional(session.getBusinessObject().getInfo());
199 }
200 try {
201 ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
202 } catch (OpenR66ProtocolPacketException e) {
203 }
204 }
205 session.getRunner().setAllDone();
206 try {
207 session.getRunner().saveStatus();
208 } catch (OpenR66RunnerErrorException e) {
209
210 }
211 localChannelReference.validateRequest(localChannelReference
212 .getFutureEndTransfer().getResult());
213 if (session.getRunner() != null && session.getRunner().isSelfRequested()) {
214 ChannelUtils.close(localChannelReference.getLocalChannel());
215 }
216 } else {
217 if (localChannelReference.getFutureEndTransfer().isDone()) {
218 if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
219 localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
220 ErrorPacket error = new ErrorPacket(localChannelReference.getErrorMessage(),
221 localChannelReference.getFutureEndTransfer().getResult().code.getCode(),
222 ErrorPacket.FORWARDCLOSECODE);
223 try {
224 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
225 } catch (OpenR66ProtocolPacketException e) {
226 }
227 }
228 } else {
229 R66Result result = localChannelReference.getFutureEndTransfer().getResult();
230 if (result == null) {
231 result =
232 new R66Result(session, false, ErrorCode.TransferError, session.getRunner());
233 }
234 localChannelReference.invalidateRequest(result);
235 }
236 }
237 }
238 NetworkTransaction.normalEndRetrieve(localChannelReference);
239 }
240 }
241 private void transferInError(OpenR66Exception e) {
242 R66Result result = new R66Result(e, session, true,
243 ErrorCode.TransferError, session.getRunner());
244 logger.error("Transfer in error", e);
245 session.newState(R66FiniteDualStates.ERROR);
246 ErrorPacket error = new ErrorPacket("Transfer in error",
247 ErrorCode.TransferError.getCode(), ErrorPacket.FORWARDCLOSECODE);
248 try {
249 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
250 } catch (OpenR66ProtocolPacketException e1) {
251 }
252 localChannelReference.invalidateRequest(result);
253 ChannelUtils.close(channel);
254 done = true;
255 }
256
257
258
259
260
261
262
263
264
265 public static ChannelFuture writeWhenPossible(
266 DataBlock block, LocalChannelReference localChannelReference)
267 throws OpenR66ProtocolPacketException, OpenR66RunnerErrorException,
268 OpenR66ProtocolSystemException {
269 return ChannelUtils.writeBackDataBlock(localChannelReference, block);
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289 }
290
291
292
293
294
295
296 public static DataBlock transformToDataBlock(byte []data) {
297 DataBlock block = new DataBlock();
298 if (data == null) {
299
300 block.setEOF(true);
301 } else {
302 block.setBlock(ChannelBuffers.wrappedBuffer(data));
303 }
304 return block;
305 }
306 }