View Javadoc

1   /**
2      This file is part of GoldenGate Project (named also GoldenGate or GG).
3   
4      Copyright 2009, Frederic Bregier, and individual contributors by the @author
5      tags. See the COPYRIGHT.txt in the distribution for a full listing of
6      individual contributors.
7   
8      All GoldenGate Project is free software: you can redistribute it and/or 
9      modify it under the terms of the GNU General Public License as published 
10     by the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12  
13     GoldenGate is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17  
18     You should have received a copy of the GNU General Public License
19     along with GoldenGate .  If not, see <http://www.gnu.org/licenses/>.
20   */
21  package openr66.protocol.utils;
22  
23  import goldengate.common.command.exception.CommandAbstractException;
24  import goldengate.common.database.DbPreparedStatement;
25  import goldengate.common.database.DbSession;
26  import goldengate.common.database.data.AbstractDbData.UpdatedInfo;
27  import goldengate.common.database.exception.GoldenGateDatabaseException;
28  import goldengate.common.logging.GgInternalLogger;
29  import goldengate.common.logging.GgInternalLoggerFactory;
30  
31  import java.sql.Timestamp;
32  
33  import openr66.client.RequestTransfer;
34  import openr66.commander.ClientRunner;
35  import openr66.commander.CommanderNoDb;
36  import openr66.context.ErrorCode;
37  import openr66.context.R66FiniteDualStates;
38  import openr66.context.R66Result;
39  import openr66.context.R66Session;
40  import openr66.context.filesystem.R66File;
41  import openr66.context.task.exception.OpenR66RunnerErrorException;
42  import openr66.database.data.DbTaskRunner;
43  import openr66.database.data.DbTaskRunner.TASKSTEP;
44  import openr66.protocol.configuration.Configuration;
45  import openr66.protocol.exception.OpenR66ProtocolSystemException;
46  import openr66.protocol.localhandler.LocalChannelReference;
47  import openr66.protocol.localhandler.packet.ErrorPacket;
48  
49  /**
50   * Utility class for transfers
51   *
52   * @author Frederic Bregier
53   *
54   */
55  public class TransferUtils {
56      /**
57       * Internal Logger
58       */
59      private static final GgInternalLogger logger = GgInternalLoggerFactory
60              .getLogger(TransferUtils.class);
61  
62      /**
63       * Try to restart one Transfer Runner Task
64       * @param taskRunner
65       * @return the associated Result
66       * @throws GoldenGateDatabaseException
67       */
68      public static R66Result restartTransfer(DbTaskRunner taskRunner, LocalChannelReference lcr) throws GoldenGateDatabaseException {
69          R66Result finalResult = new R66Result(null, true, ErrorCode.InitOk, taskRunner);
70          if (lcr != null) {
71              finalResult.code = ErrorCode.QueryStillRunning;
72              finalResult.other = "Transfer is still running so not restartable";
73          } else {
74              if (taskRunner.isSendThrough()) {
75                  // XXX FIXME TODO cannot be restarted... Really?
76                  if (false) {
77                      finalResult.code = ErrorCode.PassThroughMode;
78                      finalResult.other = "Transfer cannot be restarted since it is in PassThrough mode";
79                      return finalResult;
80                  }
81              }
82              // Transfer is not running
83              // but maybe need action on database
84              try {
85                  if (taskRunner.restart(true)) {
86                      finalResult.code = ErrorCode.PreProcessingOk;
87                      finalResult.other = "Transfer is restarted";
88                  } else {
89                      if (taskRunner.isSelfRequested() &&
90                              (taskRunner.getGloballaststep() < TASKSTEP.POSTTASK.ordinal())) {
91                          // send a VALID packet with VALID code to the requester
92                          R66Future result = new R66Future(true);
93                          RequestTransfer requestTransfer =
94                              new RequestTransfer(result, taskRunner.getSpecialId(),
95                                      taskRunner.getRequested(), taskRunner.getRequester(),
96                                      false, false, true,
97                                      Configuration.configuration.getInternalRunner().
98                                          getNetworkTransaction());
99                          requestTransfer.run();
100                         result.awaitUninterruptibly();
101                         R66Result finalValue = result.getResult();
102                         switch (finalValue.code) {
103                             case QueryStillRunning:
104                                 finalResult.code = ErrorCode.QueryStillRunning;
105                                 finalResult.other = "Transfer restart requested but already active and running";
106                                 break;
107                             case Running:
108                                 finalResult.code = ErrorCode.Running;
109                                 finalResult.other = "Transfer restart requested but already running";
110                                 break;
111                             case PreProcessingOk:
112                                 finalResult.code = ErrorCode.PreProcessingOk;
113                                 finalResult.other = "Transfer restart requested and restarted";
114                                 break;
115                             case CompleteOk:
116                                 finalResult.code = ErrorCode.CompleteOk;
117                                 finalResult.other = "Transfer restart requested but already finished so try to run Post Action";
118                                 taskRunner.setPostTask();
119                                 TransferUtils.finalizeTaskWithNoSession(taskRunner, lcr);
120                                 taskRunner.setErrorExecutionStatus(ErrorCode.QueryAlreadyFinished);
121                                 taskRunner.update();
122                                 break;
123                             case RemoteError:
124                                 finalResult.code = ErrorCode.RemoteError;
125                                 finalResult.other = "Transfer restart requested but remote error";
126                                 break;
127                             default:
128                                 finalResult.code = ErrorCode.Internal;
129                                 finalResult.other = "Transfer restart requested but internal error";
130                                 break;
131                         }
132                     } else {
133                         finalResult.code = ErrorCode.CompleteOk;
134                         finalResult.other = "Transfer is finished so not restartable";
135                         taskRunner.setPostTask();
136                         TransferUtils.finalizeTaskWithNoSession(taskRunner, lcr);
137                         taskRunner.setErrorExecutionStatus(ErrorCode.QueryAlreadyFinished);
138                         taskRunner.update();
139                     }
140                 }
141             } catch (OpenR66RunnerErrorException e) {
142                 finalResult.code = ErrorCode.PreProcessingOk;
143                 finalResult.other = "Transfer is restarted";
144             }
145         }
146         return finalResult;
147     }
148 
149     private static void stopOneTransfer(DbTaskRunner taskRunner,
150             StringBuilder builder, R66Session session, String body) {
151         LocalChannelReference lcr =
152             Configuration.configuration.getLocalTransaction().
153             getFromRequest(taskRunner.getKey());
154         ErrorCode result;
155         ErrorCode code = ErrorCode.StoppedTransfer;
156         if (lcr != null) {
157             int rank = taskRunner.getRank();
158             lcr.sessionNewState(R66FiniteDualStates.ERROR);
159             ErrorPacket perror = new ErrorPacket("Transfer Stopped at "+rank,
160                     code.getCode(), ErrorPacket.FORWARDCLOSECODE);
161             try {
162                 //XXX ChannelUtils.writeAbstractLocalPacket(lcr, perror);
163                 // inform local instead of remote
164                 ChannelUtils.writeAbstractLocalPacketToLocal(lcr, perror);
165             } catch (Exception e) {
166             }
167             result = ErrorCode.StoppedTransfer;
168         } else {
169             // Transfer is not running
170             // if in ERROR already just ignore it
171             if (taskRunner.getUpdatedInfo() == UpdatedInfo.INERROR) {
172                 result = ErrorCode.TransferError;
173             } else {
174                 // the database saying it is not stopped
175                 result = ErrorCode.TransferError;
176                 if (taskRunner != null) {
177                     if (taskRunner.stopOrCancelRunner(code)) {
178                         result = ErrorCode.StoppedTransfer;
179                     }
180                 }
181             }
182         }
183         ErrorCode last = taskRunner.getErrorInfo();
184         taskRunner.setErrorExecutionStatus(result);
185         if (builder != null) {
186             builder.append(taskRunner.toSpecializedHtml(session, body,
187                 lcr != null ? "Active" : "NotActive"));
188         }
189         taskRunner.setErrorExecutionStatus(last);
190     }
191     /**
192      * Stop all selected transfers
193      * @param dbSession
194      * @param limit
195      * @param builder
196      * @param session
197      * @param body
198      * @param startid
199      * @param stopid
200      * @param tstart
201      * @param tstop
202      * @param rule
203      * @param req
204      * @param pending
205      * @param transfer
206      * @param error
207      * @return the associated StringBuilder if the one given as parameter is not null
208      */
209     public static StringBuilder stopSelectedTransfers(DbSession dbSession, int limit,
210             StringBuilder builder, R66Session session, String body,
211             String startid, String stopid, Timestamp tstart, Timestamp tstop, String rule,
212             String req, boolean pending, boolean transfer, boolean error) {
213         if (dbSession == null || dbSession.isDisconnected) {
214             // do it without DB
215             if (ClientRunner.activeRunners != null) {
216                 for (ClientRunner runner: ClientRunner.activeRunners) {
217                     DbTaskRunner taskRunner = runner.getTaskRunner();
218                     stopOneTransfer(taskRunner, builder, session, body);
219                 }
220             }
221             if (CommanderNoDb.todoList != null) {
222                 CommanderNoDb.todoList.clear();
223             }
224             return builder;
225         }
226         DbPreparedStatement preparedStatement = null;
227         try {
228             preparedStatement =
229                 DbTaskRunner.getFilterPrepareStatement(dbSession, limit, true,
230                         startid, stopid, tstart, tstop, rule, req,
231                         pending, transfer, error, false, false);
232             preparedStatement.executeQuery();
233             int i = 0;
234             while (preparedStatement.getNext()) {
235                 i++;
236                 DbTaskRunner taskRunner = DbTaskRunner.getFromStatement(preparedStatement);
237                 stopOneTransfer(taskRunner, builder, session, body);
238             }
239             preparedStatement.realClose();
240             return builder;
241         } catch (GoldenGateDatabaseException e) {
242             if (preparedStatement != null) {
243                 preparedStatement.realClose();
244             }
245             logger.error("OpenR66 Error {}",e.getMessage());
246             return null;
247         }
248     }
249 
250     /**
251      * Finalize a local task since only Post action has to be done
252      * @param taskRunner
253      * @param localChannelReference
254      * @throws OpenR66RunnerErrorException
255      */
256     public static void finalizeTaskWithNoSession(DbTaskRunner taskRunner,
257             LocalChannelReference localChannelReference)
258     throws OpenR66RunnerErrorException {
259         R66Session session = new R66Session();
260         session.setStatus(50);
261         String remoteId = taskRunner.isSelfRequested() ?
262                 taskRunner.getRequester() :
263                     taskRunner.getRequested();
264         session.getAuth().specialNoSessionAuth(false, remoteId);
265         session.setNoSessionRunner(taskRunner, localChannelReference);
266         if (taskRunner.isSender()) {
267             // Change dir
268             try {
269                 session.getDir().changeDirectory(taskRunner.getRule().sendPath);
270             } catch (CommandAbstractException e) {
271                 throw new OpenR66RunnerErrorException(e);
272             }
273         } else {
274             // Change dir
275             try {
276                 session.getDir().changeDirectory(taskRunner.getRule().workPath);
277             } catch (CommandAbstractException e) {
278                 throw new OpenR66RunnerErrorException(e);
279             }
280         }
281         try {
282             session.setFileAfterPreRunner(false);
283         } catch (OpenR66RunnerErrorException e) {
284             logger.error("Cannot recreate file: {}",taskRunner.getFilename());
285             taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
286             taskRunner.setErrorExecutionStatus(ErrorCode.FileNotFound);
287             try {
288                 taskRunner.update();
289             } catch (GoldenGateDatabaseException e1) {
290             }
291             throw new OpenR66RunnerErrorException("Cannot recreate file", e);
292         }
293         R66File file = session.getFile();
294         R66Result finalValue = new R66Result(null, true, ErrorCode.CompleteOk, taskRunner);
295         finalValue.file = file;
296         finalValue.runner = taskRunner;
297         taskRunner.finishTransferTask(ErrorCode.TransferOk);
298         try {
299             taskRunner.finalizeTransfer(localChannelReference, file, finalValue, true);
300         } catch (OpenR66ProtocolSystemException e) {
301             logger.error("Cannot validate runner:\n    {}",taskRunner.toShortString());
302             taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
303             taskRunner.setErrorExecutionStatus(ErrorCode.Internal);
304             try {
305                 taskRunner.update();
306             } catch (GoldenGateDatabaseException e1) {
307             }
308             throw new OpenR66RunnerErrorException("Cannot validate runner", e);
309         }
310     }
311 }