1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.commandexec.client;
22
23 import goldengate.commandexec.utils.LocalExecDefaultResult;
24 import goldengate.commandexec.utils.LocalExecResult;
25 import goldengate.common.future.GgFuture;
26 import goldengate.common.logging.GgInternalLogger;
27 import goldengate.common.logging.GgInternalLoggerFactory;
28
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34
35
36
37
38
39
40 public class LocalExecClientHandler extends SimpleChannelUpstreamHandler {
41
42
43
44
45 private static final GgInternalLogger logger = GgInternalLoggerFactory
46 .getLogger(LocalExecClientHandler.class);
47
48
49 private LocalExecResult result;
50 private StringBuilder back;
51 private boolean firstMessage = true;
52 private GgFuture future;
53 protected LocalExecClientPipelineFactory factory = null;
54
55
56
57 public LocalExecClientHandler(LocalExecClientPipelineFactory factory) {
58 this.factory = factory;
59 }
60
61
62
63
64 public void initExecClient() {
65 this.result = new LocalExecResult(LocalExecDefaultResult.NoStatus);
66 this.back = new StringBuilder();
67 this.firstMessage = true;
68 this.future = new GgFuture(true);
69 }
70
71
72
73
74 @Override
75 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
76 throws Exception {
77 initExecClient();
78 factory.addChannel(ctx.getChannel());
79 }
80
81
82
83
84 @Override
85 public void channelDisconnected(ChannelHandlerContext ctx,
86 ChannelStateEvent e) throws Exception {
87 this.factory.removeChannel(e.getChannel());
88 }
89
90
91
92
93
94
95
96
97
98
99 @Override
100 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
101 throws Exception {
102 logger.debug("ChannelClosed");
103 if (!future.isDone()) {
104
105 finalizeMessage();
106 }
107 super.channelClosed(ctx, e);
108 }
109
110
111
112 private void finalizeMessage() {
113 if (firstMessage) {
114 logger.warn(result.status+" "+result.result);
115 result.set(LocalExecDefaultResult.NoMessage);
116 } else {
117 result.result = back.toString();
118 }
119 if (result.status < 0) {
120 if (result.exception != null) {
121 this.future.setFailure(result.exception);
122 } else {
123 this.future.cancel();
124 }
125 } else {
126 this.future.setSuccess();
127 }
128 }
129
130
131
132
133 public LocalExecResult waitFor(long delay) {
134 if (delay <= 0) {
135 this.future.awaitUninterruptibly();
136 } else {
137 this.future.awaitUninterruptibly(delay);
138 }
139 result.isSuccess = this.future.isSuccess();
140 return result;
141 }
142
143 @Override
144 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
145
146 String mesg = (String) e.getMessage();
147
148 if (firstMessage) {
149 firstMessage = false;
150 int pos = mesg.indexOf(' ');
151 try {
152 result.status = Integer.parseInt(mesg.substring(0, pos));
153 } catch (NumberFormatException e1) {
154
155 result.set(LocalExecDefaultResult.BadTransmition);
156 back.append(mesg);
157 ctx.getChannel().close();
158 return;
159 }
160 mesg = mesg.substring(pos+1);
161 result.result = mesg;
162 back.append(mesg);
163 } else if (LocalExecDefaultResult.ENDOFCOMMAND.startsWith(mesg)) {
164 logger.debug("Receive End of Command");
165 this.finalizeMessage();
166 } else {
167 back.append('\n');
168 back.append(mesg);
169 }
170 }
171
172 @Override
173 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
174 logger.error("Unexpected exception from downstream while get information: "+firstMessage,
175 e.getCause());
176 if (firstMessage) {
177 firstMessage = false;
178 result.set(LocalExecDefaultResult.BadTransmition);
179 result.exception = (Exception) e.getCause();
180 back = new StringBuilder("Error in LocalExec: ");
181 back.append(result.exception.getMessage());
182 back.append('\n');
183 } else {
184 back.append("\nERROR while receiving answer: ");
185 result.exception = (Exception) e.getCause();
186 back.append(result.exception.getMessage());
187 back.append('\n');
188 }
189 e.getChannel().close();
190 }
191 }