View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.protocol.localhandler;
22  
23  import goldengate.common.file.DataBlock;
24  import goldengate.common.logging.GgInternalLogger;
25  import goldengate.common.logging.GgInternalLoggerFactory;
26  
27  import java.util.concurrent.atomic.AtomicBoolean;
28  
29  import openr66.context.ErrorCode;
30  import openr66.context.R66FiniteDualStates;
31  import openr66.context.R66Result;
32  import openr66.context.R66Session;
33  import openr66.context.task.exception.OpenR66RunnerErrorException;
34  import openr66.database.data.DbTaskRunner.TASKSTEP;
35  import openr66.protocol.configuration.Configuration;
36  import openr66.protocol.exception.OpenR66Exception;
37  import openr66.protocol.exception.OpenR66ProtocolPacketException;
38  import openr66.protocol.exception.OpenR66ProtocolSystemException;
39  import openr66.protocol.localhandler.packet.EndRequestPacket;
40  import openr66.protocol.localhandler.packet.ErrorPacket;
41  import openr66.protocol.networkhandler.NetworkTransaction;
42  import openr66.protocol.utils.ChannelUtils;
43  
44  import org.jboss.netty.buffer.ChannelBuffers;
45  import org.jboss.netty.channel.Channel;
46  import org.jboss.netty.channel.ChannelFuture;
47  
48  /**
49   * Retrieve transfer runner
50   * @author Frederic Bregier
51   *
52   */
53  public class RetrieveRunner extends Thread {
54      /**
55       * Internal Logger
56       */
57      private static final GgInternalLogger logger = GgInternalLoggerFactory
58              .getLogger(RetrieveRunner.class);
59  
60      private final R66Session session;
61  
62      private final LocalChannelReference localChannelReference;
63  
64      private final Channel channel;
65  
66      private boolean done = false;
67  
68      protected AtomicBoolean running = new AtomicBoolean(true);
69  
70      protected RetrieveRunner() {
71          // empty constructor
72          this.session = null;
73          this.localChannelReference = null;
74          this.channel = null;
75      }
76      /**
77       *
78       * @param session
79       * @param channel local channel
80       */
81      public RetrieveRunner(R66Session session, Channel channel) {
82          this.session = session;
83          localChannelReference = this.session.getLocalChannelReference();
84          this.channel = channel;
85      }
86  
87      /**
88       * Try to stop the runner
89       */
90      public void stopRunner() {
91          running.set(false);
92      }
93      /*
94       * (non-Javadoc)
95       *
96       * @see java.lang.Runnable#run()
97       */
98      @Override
99      public void run() {
100         boolean requestValidDone = false;
101         try {
102             Thread.currentThread().setName("RetrieveRunner: " + channel.getId());
103             try {
104                 if (session.getRunner().getGloballaststep() == TASKSTEP.POSTTASK.ordinal()) {
105                     // restart from PostTask global step so just end now
106                     try {
107                         ChannelUtils.writeEndTransfer(localChannelReference);
108                     } catch (OpenR66ProtocolPacketException e) {
109                         transferInError(e);
110                         logger.error("End Retrieve in Error");
111                         return;
112                     }
113                 } else {
114                     session.getFile().retrieveBlocking(running);
115                 }
116             } catch (OpenR66RunnerErrorException e) {
117                 transferInError(e);
118                 logger.info("End Retrieve in Error");
119                 return;
120             } catch (OpenR66ProtocolSystemException e) {
121                 transferInError(e);
122                 logger.info("End Retrieve in Error");
123                 return;
124             }
125             if (running.get()) {
126                 try {
127                     localChannelReference.getFutureEndTransfer().await();
128                 } catch (InterruptedException e1) {
129                 }
130             }
131             logger.debug("Await future End Transfer done: " +
132                     localChannelReference.getFutureEndTransfer().isSuccess());
133             if (localChannelReference.getFutureEndTransfer().isDone() &&
134                     localChannelReference.getFutureEndTransfer().isSuccess()) {
135                 // send a validation
136                 requestValidDone = true;
137                 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
138                 EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
139                 if (session.getExtendedProtocol() &&
140                         session.getBusinessObject() != null && 
141                         session.getBusinessObject().getInfo() != null) {
142                     validPacket.setOptional(session.getBusinessObject().getInfo());
143                 }
144                 try {
145                     ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
146                 } catch (OpenR66ProtocolPacketException e) {
147                 }
148                 if (!localChannelReference.getFutureRequest().awaitUninterruptibly(
149                         Configuration.configuration.TIMEOUTCON)) {
150                     // valid it however
151                     session.getRunner().setAllDone();
152                     try {
153                         session.getRunner().saveStatus();
154                     } catch (OpenR66RunnerErrorException e) {
155                         // ignore
156                     }
157                     localChannelReference.validateRequest(localChannelReference.getFutureEndTransfer().getResult());
158                 }
159                 if (session.getRunner() != null && session.getRunner().isSelfRequested()) {
160                     ChannelUtils.close(localChannelReference.getLocalChannel());
161                 }
162                 done = true;
163             } else {
164                 if (localChannelReference.getFutureEndTransfer().isDone()) {
165                     // Done and Not Success => error
166                     if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
167                         localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
168                         ErrorPacket error = new ErrorPacket(localChannelReference.getErrorMessage(),
169                                 localChannelReference.getFutureEndTransfer().getResult().code.getCode(), 
170                                 ErrorPacket.FORWARDCLOSECODE);
171                         try {
172                             ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
173                         } catch (OpenR66ProtocolPacketException e) {
174                         }
175                     }
176                 }
177                 if (!localChannelReference.getFutureRequest().isDone()) {
178                     R66Result result = localChannelReference.getFutureEndTransfer().getResult();
179                     if (result == null) {
180                         result =
181                             new R66Result(session, false, ErrorCode.TransferError, session.getRunner());
182                     }
183                     localChannelReference.invalidateRequest(result);
184                 }
185                 done = true;
186                 logger.info("End Retrieve in Error");
187             }
188         } finally {
189             if (!done) {
190                 if (localChannelReference.getFutureEndTransfer().isDone() &&
191                         localChannelReference.getFutureEndTransfer().isSuccess()) {
192                     if (! requestValidDone) {
193                         localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
194                         EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
195                         if (session.getExtendedProtocol() &&
196                                 session.getBusinessObject() != null && 
197                                 session.getBusinessObject().getInfo() != null) {
198                             validPacket.setOptional(session.getBusinessObject().getInfo());
199                         }
200                         try {
201                             ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
202                         } catch (OpenR66ProtocolPacketException e) {
203                         }
204                     }
205                     session.getRunner().setAllDone();
206                     try {
207                         session.getRunner().saveStatus();
208                     } catch (OpenR66RunnerErrorException e) {
209                         // ignore
210                     }
211                     localChannelReference.validateRequest(localChannelReference
212                             .getFutureEndTransfer().getResult());
213                     if (session.getRunner() != null && session.getRunner().isSelfRequested()) {
214                         ChannelUtils.close(localChannelReference.getLocalChannel());
215                     }
216                 } else {
217                     if (localChannelReference.getFutureEndTransfer().isDone()) {
218                         if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
219                             localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
220                             ErrorPacket error = new ErrorPacket(localChannelReference.getErrorMessage(),
221                                     localChannelReference.getFutureEndTransfer().getResult().code.getCode(), 
222                                     ErrorPacket.FORWARDCLOSECODE);
223                             try {
224                                 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
225                             } catch (OpenR66ProtocolPacketException e) {
226                             }
227                         }
228                     } else {
229                         R66Result result = localChannelReference.getFutureEndTransfer().getResult();
230                         if (result == null) {
231                             result =
232                                 new R66Result(session, false, ErrorCode.TransferError, session.getRunner());
233                         }
234                         localChannelReference.invalidateRequest(result);
235                     }
236                 }
237             }
238             NetworkTransaction.normalEndRetrieve(localChannelReference);
239         }
240     }
241     private void transferInError(OpenR66Exception e) {
242         R66Result result = new R66Result(e, session, true,
243                 ErrorCode.TransferError, session.getRunner());
244         logger.error("Transfer in error", e);
245         session.newState(R66FiniteDualStates.ERROR);
246         ErrorPacket error = new ErrorPacket("Transfer in error",
247                 ErrorCode.TransferError.getCode(), ErrorPacket.FORWARDCLOSECODE);
248         try {
249             ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
250         } catch (OpenR66ProtocolPacketException e1) {
251         }
252         localChannelReference.invalidateRequest(result);
253         ChannelUtils.close(channel);
254         done = true;
255     }
256     /**
257      * Write the next block when the channel is ready to prevent OOM
258      * @param block
259      * @param localChannelReference
260      * @return the ChannelFuture on the write operation
261      * @throws OpenR66ProtocolPacketException
262      * @throws OpenR66RunnerErrorException
263      * @throws OpenR66ProtocolSystemException
264      */
265     public static ChannelFuture writeWhenPossible(
266             DataBlock block, LocalChannelReference localChannelReference)
267         throws OpenR66ProtocolPacketException, OpenR66RunnerErrorException,
268             OpenR66ProtocolSystemException {
269         return ChannelUtils.writeBackDataBlock(localChannelReference, block);
270         // XXX Keep this in case the bug comes back
271         /*
272         // Test if channel is writable in order to prevent OOM
273         if (! localChannelReference.getNetworkChannel().isWritable()) {
274             return ChannelUtils.writeBackDataBlock(localChannelReference, block);
275         } else if (Configuration.configuration.anyBandwidthLimitation) {
276             // Patch to limit the impact when no real reason to wait for writing
277             // double computation of traffic but ok
278             long wait = ChannelUtils.willBeWaitingWriting(localChannelReference, block.getByteCount());
279             if (wait == 0) {
280                 ChannelUtils.writeBackDataBlock(localChannelReference, block);
281                 return Channels.succeededFuture(localChannelReference.getNetworkChannel());
282             }
283             return ChannelUtils.writeBackDataBlock(localChannelReference, block);
284         } else {
285             ChannelUtils.writeBackDataBlock(localChannelReference, block);
286             return Channels.succeededFuture(localChannelReference.getNetworkChannel());
287         }
288         */
289     }
290 
291     /**
292      * Utility method for send through mode
293      * @param data the data byte, if null it is the last block
294      * @return the DataBlock associated to the data
295      */
296     public static DataBlock transformToDataBlock(byte []data) {
297         DataBlock block = new DataBlock();
298         if (data == null) {
299             // last block
300             block.setEOF(true);
301         } else {
302             block.setBlock(ChannelBuffers.wrappedBuffer(data));
303         }
304         return block;
305     }
306 }