View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.protocol.test;
22  
23  import goldengate.common.database.DbSession;
24  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
25  import goldengate.common.database.exception.GoldenGateDatabaseException;
26  import goldengate.common.file.DataBlock;
27  import goldengate.common.logging.GgInternalLoggerFactory;
28  import openr66.client.RecvThroughHandler;
29  import openr66.client.SendThroughClient;
30  import openr66.commander.ClientRunner;
31  import openr66.context.ErrorCode;
32  import openr66.context.R66Result;
33  import openr66.context.task.exception.OpenR66RunnerErrorException;
34  import openr66.database.DbConstant;
35  import openr66.database.data.DbRule;
36  import openr66.database.data.DbTaskRunner;
37  import openr66.protocol.configuration.Configuration;
38  import openr66.protocol.exception.OpenR66DatabaseGlobalException;
39  import openr66.protocol.exception.OpenR66Exception;
40  import openr66.protocol.exception.OpenR66ProtocolBusinessException;
41  import openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
42  import openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
43  import openr66.protocol.exception.OpenR66ProtocolPacketException;
44  import openr66.protocol.exception.OpenR66ProtocolSystemException;
45  import openr66.protocol.localhandler.LocalChannelReference;
46  import openr66.protocol.localhandler.packet.RequestPacket;
47  import openr66.protocol.networkhandler.NetworkTransaction;
48  import openr66.protocol.utils.R66Future;
49  
50  import org.jboss.netty.buffer.ChannelBuffer;
51  
52  /**
53   * <b>WARNING: This class is not functional neither integrated</b><br>
54   * 
55   * Test class for Send Through to another R66 Server as forward<br>
56   * Only a subpart of SenThroughClient is to be made since 
57   * steps 1-2 and steps 7-8 are only for client, not for server.
58   *
59   * 3) Prepare the request of transfer:<br>
60   * <tt>     R66Future futureReq = new R66Future(true);</tt><br>
61   * <tt>     TestSendThroughForward transaction = new TestSendThroughForward(futureReq,...);</tt><br>
62   * <tt>     if (! transaction.initiateRequest()) { error }</tt><br>
63   * <br>
64   * 4) Once initiateRequest() gives true, you are ready to send the data in through mode
65   *  using the TestRecvThroughForwardHandler:<br>
66   * <br>
67   * 5) Once you have finished, so this is the last block, you have to do the following:<br>
68   * If the last block is not empty:<br>
69   * <tt>     DataBlock block = transaction.transformToDataBlock(data);</tt><br>
70   * <tt>     block.setEOF(true);</tt><br>
71   * <tt>     futureWrite = transaction.writeWhenPossible(block);</tt><br>
72   * <tt>     futureWrite.awaitUninterruptibly();</tt><br>
73   * <br>
74   * If the last block is empty, it is already handled by TestRecvThroughForwardHandler<br>
75   * <br>
76   * 6) If everything is in success:<br>
77   * <tt>     transaction.finalizeRequest();</tt><br>
78   * <br>
79   * And now wait for the transfer to finish:<br>
80   * <tt>     futureReq.awaitUninterruptibly();</tt><br>
81   * <tt>     R66Result result = futureReq.getResult();</tt><br>
82   * <br>
83   *
84   * @author Frederic Bregier
85   *
86   */
87  public class TestSendThroughForward extends SendThroughClient {
88      public TestRecvThroughForwardHandler handler;
89      public DbSession dbSession;
90      public volatile boolean foundEOF = false;
91      protected DbTaskRunner sourceRunner;
92      
93      public static class TestRecvThroughForwardHandler extends RecvThroughHandler {
94  
95          protected TestSendThroughForward client;
96          /* (non-Javadoc)
97           * @see openr66.client.RecvThroughHandler#writeChannelBuffer(org.jboss.netty.buffer.ChannelBuffer)
98           */
99          @Override
100         public void writeChannelBuffer(ChannelBuffer buffer)
101                 throws OpenR66ProtocolBusinessException {
102             DataBlock block = new DataBlock();
103             if (buffer.readableBytes() <= 0) {
104              // last block
105                 block.setEOF(true);
106             } else {
107                 block.setBlock(buffer);
108             }
109             try {
110                 client.writeWhenPossible(block).await();
111             } catch (OpenR66RunnerErrorException e) {
112                 client.transferInError(e);
113             } catch (OpenR66ProtocolPacketException e) {
114                 client.transferInError(e);
115             } catch (OpenR66ProtocolSystemException e) {
116                 client.transferInError(e);
117             } catch (InterruptedException e) {
118                 client.transferInError(new OpenR66ProtocolSystemException(e));
119             }
120             if (block.isEOF()) {
121                 client.finalizeRequest();
122                 client.foundEOF = true;
123             }
124         }
125 
126     }
127 
128     /**
129      * @param future
130      * @param remoteHost
131      * @param filename
132      * @param rulename
133      * @param fileinfo
134      * @param isMD5
135      * @param blocksize
136      * @param networkTransaction
137      * @param idt Id Transfer if any temptative already exists
138      * @param dbSession
139      * @param runner (recv runner)
140      */
141     public TestSendThroughForward(R66Future future, String remoteHost,
142             String filename, String rulename, String fileinfo, boolean isMD5,
143             int blocksize, NetworkTransaction networkTransaction, long idt,
144             DbSession dbSession, DbTaskRunner runner) {
145         super(future, remoteHost, filename, rulename, fileinfo, isMD5, blocksize,
146                 idt, networkTransaction);
147         handler = new TestRecvThroughForwardHandler();
148         handler.client = this;
149         this.dbSession = dbSession;
150         this.sourceRunner = runner;
151     }
152 
153     /* (non-Javadoc)
154      * @see openr66.client.SendThroughClient#initiateRequest()
155      */
156     @Override
157     public boolean initiateRequest() {
158         if (logger == null) {
159             logger = GgInternalLoggerFactory.getLogger(TestSendThroughForward.class);
160         }
161         DbRule rule;
162         try {
163             rule = new DbRule(DbConstant.admin.session, rulename);
164         } catch (GoldenGateDatabaseException e) {
165             logger.error("Cannot get Rule: "+rulename, e);
166             future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
167                     ErrorCode.Internal,null));
168             future.setFailure(e);
169             return false;
170         }
171         int mode = rule.mode;
172         if (isMD5) {
173             mode = RequestPacket.getModeMD5(mode);
174         }
175         RequestPacket request = new RequestPacket(rulename,
176                 mode, filename, blocksize, sourceRunner.getRank(),
177                 id, fileinfo);
178         // Not isRecv since it is the requester, so send => isSender is true
179         boolean isSender = true;
180         try {
181             try {
182                 // no delay
183                 taskRunner =
184                     new DbTaskRunner(DbConstant.admin.session,rule,isSender,request,remoteHost,null);
185             } catch (GoldenGateDatabaseException e) {
186                 logger.error("Cannot get task", e);
187                 future.setResult(new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
188                         ErrorCode.Internal, null));
189                 future.setFailure(e);
190                 return false;
191             }
192             ClientRunner runner = new ClientRunner(networkTransaction, taskRunner, future);
193             runner.setRecvThroughHandler(handler);
194             runner.setSendThroughMode();
195             OpenR66ProtocolNotYetConnectionException exc = null;
196             for (int i = 0; i < Configuration.RETRYNB; i++) {
197                 try {
198                     localChannelReference = runner.initRequest();
199                     exc = null;
200                     break;
201                 } catch (OpenR66RunnerErrorException e) {
202                     logger.error("Cannot Transfer", e);
203                     future.setResult(new R66Result(e, null, true,
204                             ErrorCode.Internal, taskRunner));
205                     future.setFailure(e);
206                     return false;
207                 } catch (OpenR66ProtocolNoConnectionException e) {
208                     logger.error("Cannot Connect", e);
209                     future.setResult(new R66Result(e, null, true,
210                             ErrorCode.ConnectionImpossible, taskRunner));
211                     future.setFailure(e);
212                     return false;
213                 } catch (OpenR66ProtocolPacketException e) {
214                     logger.error("Bad Protocol", e);
215                     future.setResult(new R66Result(e, null, true,
216                             ErrorCode.TransferError, taskRunner));
217                     future.setFailure(e);
218                     return false;
219                 } catch (OpenR66ProtocolNotYetConnectionException e) {
220                     logger.debug("Not Yet Connected", e);
221                     exc = e;
222                     continue;
223                 }
224             }
225             if (exc!= null) {
226                 taskRunner.setLocalChannelReference(new LocalChannelReference());
227                 logger.error("Cannot Connect", exc);
228                 future.setResult(new R66Result(exc, null, true,
229                         ErrorCode.ConnectionImpossible, taskRunner));
230                 future.setFailure(exc);
231                 return false;
232             }
233             try {
234                 localChannelReference.waitReadyForSendThrough();
235             } catch (OpenR66Exception e) {
236                 logger.error("Cannot Transfer", e);
237                 future.setResult(new R66Result(e, null, true,
238                         ErrorCode.Internal, taskRunner));
239                 future.setFailure(e);
240                 return false;
241             }
242             if (taskRunner.getRank() < sourceRunner.getRank()) {
243                 sourceRunner.setRankAtStartup(taskRunner.getRank());
244             }
245             // now start the send from external data
246             return true;
247         } finally {
248             if (taskRunner != null) {
249                 // not delete but sourceRunner and taskRunner should be stopped
250                 // and taskRunner not allowed to be restarted alone
251                 if (future.isFailed()) {
252                     taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
253                     try {
254                         taskRunner.update();
255                     } catch (GoldenGateDatabaseException e) {
256                     }
257                 }
258             }
259         }
260     }
261 
262     /* (non-Javadoc)
263      * @see openr66.client.SendThroughClient#finalizeRequest()
264      */
265     @Override
266     public void finalizeRequest() {
267         if (foundEOF) {
268             return;
269         }
270         super.finalizeRequest();
271     }
272 
273     /* (non-Javadoc)
274      * @see openr66.client.SendThroughClient#transferInError(openr66.protocol.exception.OpenR66Exception)
275      */
276     @Override
277     public void transferInError(OpenR66Exception e) {
278         super.transferInError(e);
279     }
280     
281 }