View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.client;
22  
23  import goldengate.common.database.exception.GoldenGateDatabaseException;
24  import goldengate.common.file.DataBlock;
25  import goldengate.common.logging.GgInternalLoggerFactory;
26  import openr66.commander.ClientRunner;
27  import openr66.context.ErrorCode;
28  import openr66.context.R66FiniteDualStates;
29  import openr66.context.R66Result;
30  import openr66.context.task.exception.OpenR66RunnerErrorException;
31  import openr66.database.DbConstant;
32  import openr66.database.data.DbRule;
33  import openr66.database.data.DbTaskRunner;
34  import openr66.protocol.configuration.Configuration;
35  import openr66.protocol.exception.OpenR66DatabaseGlobalException;
36  import openr66.protocol.exception.OpenR66Exception;
37  import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
38  import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
39  import openr66.protocol.exception.OpenR66ProtocolPacketException;
40  import openr66.protocol.exception.OpenR66ProtocolSystemException;
41  import openr66.protocol.localhandler.LocalChannelReference;
42  import openr66.protocol.localhandler.RetrieveRunner;
43  import openr66.protocol.localhandler.packet.EndRequestPacket;
44  import openr66.protocol.localhandler.packet.ErrorPacket;
45  import openr66.protocol.localhandler.packet.RequestPacket;
46  import openr66.protocol.networkhandler.NetworkTransaction;
47  import openr66.protocol.test.TestSendThroughClient;
48  import openr66.protocol.utils.ChannelUtils;
49  import openr66.protocol.utils.R66Future;
50  
51  import org.jboss.netty.buffer.ChannelBuffers;
52  import org.jboss.netty.channel.ChannelFuture;
53  
54  /**
55   * Class for Send Through client
56   *
57   * This class does not included the real file transfer since it is up to the business project
58   * to implement how to read new data to be sent to the remote host. If an error occurs,
59   * no transfer log is kept.
60   *
61   * 1) Configuration must have been loaded<br>
62   * <br>
63   * 2) Pipeline and NetworkTransaction must have been initiated:<br>
64   * <tt>     Configuration.configuration.pipelineInit();</tt><br>
65   * <tt>     NetworkTransaction networkTransaction = new NetworkTransaction();</tt><br>
66   * <br>
67   * 3) Prepare the request of transfer:<br>
68   * <tt>     R66Future futureReq = new R66Future(true);</tt><br>
69   * <tt>     SendThroughClient transaction = new SendThroughClient(futureReq,...);</tt><br>
70   * <tt>     if (! transaction.initiateRequest()) { error }</tt><br>
71   * <br>
72   * 4) Once initiateRequest() gives true, you are ready to send the data in through mode like:<br>
73   * <tt>     byte[] data = readOrGetInSomeWayData();</tt><br>
74   * <tt>     DataBlock block = transaction.transformToDataBlock(data);</tt><br>
75   * <tt>     futureWrite = transaction.writeWhenPossible(block);</tt><br>
76   * <br>
77   * 5) Once you have finished, so this is the last block, you have to do the following:<br>
78   * If the last block is not empty:<br>
79   * <tt>     DataBlock block = transaction.transformToDataBlock(data);</tt><br>
80   * <tt>     block.setEOF(true);</tt><br>
81   * Or if the last block is empty:<br>
82   * <tt>     DataBlock block = transaction.transformToDataBlock(null);</tt><br>
83   * Then <br>
84   * <tt>     futureWrite = transaction.writeWhenPossible(block);</tt><br>
85   * <tt>     futureWrite.awaitUninterruptibly();</tt><br>
86   * <br>
87   * 6) If everything is in success:<br>
88   * <tt>     transaction.finalizeRequest();</tt><br>
89   * <br>
90   * And now wait for the transfer to finish:<br>
91   * <tt>     futureReq.awaitUninterruptibly();</tt><br>
92   * <tt>     R66Result result = futureReq.getResult();</tt><br>
93   * <br>
94   * 7) If there is the need to re-do, just re-execute the steps from 3 to 6.<br>
95   * Don't forget at the very end to finish the global structure (steps 3 to 6 no more executed):<br>
96   * <tt>     networkTransaction.closeAll();</tt><br>
97   * <br>
98   * 8) In case of errors during steps 4 or 5 (and only those), call the following:<br>
99   * <tr>     transaction.transferInError(openR66Exception);</tr><br>
100  * <br>
101  * @see TestSendThroughClient {@link TestSendThroughClient} Class as example of usage
102  *
103  * @author Frederic Bregier
104  *
105  */
106 public abstract class SendThroughClient extends AbstractTransfer {
107     protected final NetworkTransaction networkTransaction;
108     protected LocalChannelReference localChannelReference;
109     protected DbTaskRunner taskRunner = null;
110     /**
111      * @param future
112      * @param remoteHost
113      * @param filename
114      * @param rulename
115      * @param fileinfo
116      * @param isMD5
117      * @param blocksize
118      * @param networkTransaction
119      * @param id
120      */
121     public SendThroughClient(R66Future future, String remoteHost,
122             String filename, String rulename, String fileinfo, boolean isMD5,
123             int blocksize, long id, NetworkTransaction networkTransaction) {
124         super(SendThroughClient.class,
125                 future, filename, rulename, fileinfo, isMD5, remoteHost, blocksize, id, null);
126         this.networkTransaction = networkTransaction;
127     }
128     /**
129      * DO NOT CALL THIS!
130      */
131     public void run() {
132         logger.error("DO NOT call this method for this class");
133     }
134     /**
135      * Prior to call this method, the pipeline and NetworkTransaction must have been initialized.
136      * It is the responsibility of the caller to finish all network resources.
137      * Note that this is only the first part of the execution for this client.
138      *
139      * @return True if the initiate of the request is OK, else False
140      */
141     public boolean initiateRequest() {
142         if (logger == null) {
143             logger = GgInternalLoggerFactory.getLogger(SendThroughClient.class);
144         }
145         DbRule rule;
146         try {
147             rule = new DbRule(DbConstant.admin.session, rulename);
148         } catch (GoldenGateDatabaseException e) {
149             logger.error("Cannot get Rule: "+rulename, e);
150             future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
151                     ErrorCode.Internal,null));
152             future.setFailure(e);
153             return false;
154         }
155         int mode = rule.mode;
156         if (isMD5) {
157             mode = RequestPacket.getModeMD5(mode);
158         }
159         RequestPacket request = new RequestPacket(rulename,
160                 mode, filename, blocksize, 0,
161                 id, fileinfo);
162         // Not isRecv since it is the requester, so send => isSender is true
163         boolean isSender = true;
164         try {
165             try {
166                 // no starttime since immediate
167                 taskRunner =
168                     new DbTaskRunner(DbConstant.admin.session,rule,isSender,request,remoteHost, null);
169             } catch (GoldenGateDatabaseException e) {
170                 logger.error("Cannot get task", e);
171                 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
172                         ErrorCode.Internal, null));
173                 future.setFailure(e);
174                 return false;
175             }
176             ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, future);
177             runner.setSendThroughMode();
178             OpenR66ProtocolNotYetConnectionException exc = null;
179             for (int i = 0; i < Configuration.RETRYNB; i++) {
180                 try {
181                     localChannelReference = runner.initRequest();
182                     exc = null;
183                     break;
184                 } catch (OpenR66RunnerErrorException e) {
185                     logger.error("Cannot Transfer", e);
186                     future.setResult(new R66Result(e, null, true,
187                             ErrorCode.Internal, taskRunner));
188                     future.setFailure(e);
189                     return false;
190                 } catch (OpenR66ProtocolNoConnectionException e) {
191                     logger.error("Cannot Connect", e);
192                     future.setResult(new R66Result(e, null, true,
193                             ErrorCode.ConnectionImpossible, taskRunner));
194                     future.setFailure(e);
195                     return false;
196                 } catch (OpenR66ProtocolPacketException e) {
197                     logger.error("Bad Protocol", e);
198                     future.setResult(new R66Result(e, null, true,
199                             ErrorCode.TransferError, taskRunner));
200                     future.setFailure(e);
201                     return false;
202                 } catch (OpenR66ProtocolNotYetConnectionException e) {
203                     logger.debug("Not Yet Connected", e);
204                     exc = e;
205                     continue;
206                 }
207             }
208             if (exc!= null) {
209                 taskRunner.setLocalChannelReference(new LocalChannelReference());
210                 logger.error("Cannot Connect", exc);
211                 future.setResult(new R66Result(exc, null, true,
212                         ErrorCode.ConnectionImpossible, taskRunner));
213                 future.setFailure(exc);
214                 return false;
215             }
216             try {
217                 localChannelReference.waitReadyForSendThrough();
218             } catch (OpenR66Exception e) {
219                 logger.error("Cannot Transfer", e);
220                 future.setResult(new R66Result(e, null, true,
221                         ErrorCode.Internal, taskRunner));
222                 future.setFailure(e);
223                 return false;
224             }
225             // now start the send from external data
226             return true;
227         } finally {
228             if (taskRunner != null) {
229                 if (future.isFailed()) {
230                     try {
231                         taskRunner.delete();
232                     } catch (GoldenGateDatabaseException e) {
233                     }
234                 }
235             }
236         }
237     }
238     /**
239      * Finalize the request
240      */
241     public void finalizeRequest() {
242         try {
243             try {
244                 ChannelUtils.writeEndTransfer(localChannelReference);
245             } catch (OpenR66ProtocolPacketException e) {
246                 // An error occurs!
247                 try {
248                     localChannelReference.getSession().setFinalizeTransfer(
249                             false,
250                             new R66Result(e, localChannelReference.getSession(), false,
251                                     ErrorCode.Internal, taskRunner));
252                 } catch (OpenR66RunnerErrorException e1) {
253                     transferInError(e1);
254                     return;
255                 } catch (OpenR66ProtocolSystemException e1) {
256                     transferInError(e1);
257                     return;
258                 }
259             }
260             localChannelReference.getFutureEndTransfer().awaitUninterruptibly();
261             logger.debug("Await future End Transfer done: " +
262                     localChannelReference.getFutureEndTransfer().isSuccess());
263             if (localChannelReference.getFutureEndTransfer().isSuccess()) {
264                 // send a validation
265                 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
266                 EndRequestPacket validPacket = new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
267                 if (localChannelReference.getSession().getExtendedProtocol() &&
268                         localChannelReference.getSession().getBusinessObject() != null &&
269                         localChannelReference.getSession().getBusinessObject().getInfo() != null && localChannelReference.getSession().getBusinessObject().getInfo() != null) {
270                     validPacket.setOptional(localChannelReference.getSession().getBusinessObject().getInfo());
271                 }
272                 try {
273                     ChannelUtils.writeAbstractLocalPacket(localChannelReference, validPacket, true);
274                 } catch (OpenR66ProtocolPacketException e) {
275                 }
276                 if (!localChannelReference.getFutureRequest().awaitUninterruptibly(
277                     Configuration.configuration.TIMEOUTCON)) {
278                     // valid it however
279                     localChannelReference.validateRequest(localChannelReference.getFutureEndTransfer().getResult());
280                 }
281                 if (taskRunner != null && taskRunner.isSelfRequested()) {
282                     ChannelUtils.close(localChannelReference.getLocalChannel());
283                 }
284             } else {
285                 transferInError(null);
286             }
287         } finally {
288             if (taskRunner != null) {
289                 if ((future.isDone() && (!future.isSuccess())) || nolog) {
290                     try {
291                         taskRunner.delete();
292                     } catch (GoldenGateDatabaseException e) {
293                     }
294                 }
295             }
296         }
297     }
298     /**
299      * To be used in case of error after a correct initiate of the request
300      * @param e
301      */
302     public void transferInError(OpenR66Exception e) {
303         if (!localChannelReference.getFutureEndTransfer().getResult().isAnswered) {
304             R66Result result = new R66Result(e, localChannelReference.getSession(), true,
305                     ErrorCode.TransferError, taskRunner);
306             logger.error("Transfer in error", e);
307             localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
308             ErrorPacket error = new ErrorPacket("Transfer in error",
309                     ErrorCode.TransferError.getCode(), ErrorPacket.FORWARDCLOSECODE);
310             try {
311                 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error, true);
312             } catch (OpenR66ProtocolPacketException e1) {
313             }
314             localChannelReference.invalidateRequest(result);
315         }
316         ChannelUtils.close(localChannelReference.getLocalChannel());
317     }
318     /**
319      * Write the next block when the channel is ready to prevent OOM
320      * @param block
321      * @return the ChannelFuture on the write operation
322      *
323      * @throws OpenR66RunnerErrorException
324      * @throws OpenR66ProtocolPacketException
325      * @throws OpenR66ProtocolSystemException
326      */
327     public ChannelFuture writeWhenPossible(DataBlock block)
328     throws OpenR66RunnerErrorException, OpenR66ProtocolPacketException, OpenR66ProtocolSystemException {
329         return RetrieveRunner.writeWhenPossible(block, localChannelReference);
330     }
331     /**
332      * Utility method for send through mode
333      * @param data the data byte, if null it is the last block
334      * @return the DataBlock associated to the data
335      */
336     public DataBlock transformToDataBlock(byte []data) {
337         DataBlock block = new DataBlock();
338         if (data == null) {
339             // last block
340             block.setEOF(true);
341         } else {
342             block.setBlock(ChannelBuffers.wrappedBuffer(data));
343         }
344         return block;
345     }
346 }