1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http2;
17
18 import java.util.Queue;
19 import java.util.concurrent.atomic.AtomicLong;
20
21 import org.jboss.netty.buffer.ChannelBuffer;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelDownstreamHandler;
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.handler.codec.PrematureChannelClosureException;
29 import org.jboss.netty.util.internal.QueueFactory;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class HttpClientCodec implements ChannelUpstreamHandler,
50 ChannelDownstreamHandler {
51
52
53 final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class);
54
55
56 volatile boolean done;
57
58 private final HttpRequestEncoder encoder = new Encoder();
59
60 private final HttpResponseDecoder decoder;
61
62 private final AtomicLong requestResponseCounter = new AtomicLong(0);
63
64 private final boolean failOnMissingResponse;
65
66
67
68
69
70
71
72 public HttpClientCodec() {
73 this(4096, 8192, 8192, false);
74 }
75
76
77
78
79 public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize,
80 int maxChunkSize) {
81 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
82 }
83
84
85
86
87 public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize,
88 int maxChunkSize, boolean failOnMissingResponse) {
89 decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
90 this.failOnMissingResponse = failOnMissingResponse;
91 }
92
93 @Override
94 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
95 throws Exception {
96 decoder.handleUpstream(ctx, e);
97 }
98
99 @Override
100 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
101 throws Exception {
102 encoder.handleDownstream(ctx, e);
103 }
104
105 private final class Encoder extends HttpRequestEncoder {
106
107 Encoder() {
108 }
109
110 @Override
111 protected Object encode(ChannelHandlerContext ctx, Channel channel,
112 Object msg) throws Exception {
113 if (msg instanceof HttpRequest && !done) {
114 queue.offer(((HttpRequest) msg).getMethod());
115 }
116
117 Object obj = super.encode(ctx, channel, msg);
118
119 if (failOnMissingResponse) {
120
121 if (msg instanceof HttpRequest &&
122 !((HttpRequest) msg).isChunked()) {
123 requestResponseCounter.incrementAndGet();
124 } else if (msg instanceof HttpChunk &&
125 ((HttpChunk) msg).isLast()) {
126
127 requestResponseCounter.incrementAndGet();
128 }
129 }
130
131 return obj;
132
133 }
134 }
135
136 private final class Decoder extends HttpResponseDecoder {
137
138 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
139 super(maxInitialLineLength, maxHeaderSize, maxChunkSize);
140 }
141
142 @Override
143 protected Object decode(ChannelHandlerContext ctx, Channel channel,
144 ChannelBuffer buffer, State state) throws Exception {
145 if (done) {
146 return buffer.readBytes(actualReadableBytes());
147 } else {
148 Object msg = super.decode(ctx, channel, buffer, state);
149 if (failOnMissingResponse) {
150 decrement(msg);
151 }
152 return msg;
153 }
154 }
155
156 private void decrement(Object msg) {
157 if (msg == null) {
158 return;
159 }
160
161
162 if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
163 requestResponseCounter.decrementAndGet();
164 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
165 requestResponseCounter.decrementAndGet();
166 } else if (msg instanceof Object[]) {
167
168
169 requestResponseCounter.decrementAndGet();
170 }
171 }
172
173 @Override
174 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
175 final int statusCode = ((HttpResponse) msg).getStatus().getCode();
176 if (statusCode == 100) {
177
178 return true;
179 }
180
181
182
183 HttpMethod method = queue.poll();
184
185 char firstChar = method.getName().charAt(0);
186 switch (firstChar) {
187 case 'H':
188
189
190
191
192 if (HttpMethod.HEAD.equals(method)) {
193 return true;
194
195
196
197
198
199
200
201
202
203
204
205
206
207 }
208 break;
209 case 'C':
210
211 if (statusCode == 200) {
212 if (HttpMethod.CONNECT.equals(method)) {
213
214 done = true;
215 queue.clear();
216 return true;
217 }
218 }
219 break;
220 }
221
222 return super.isContentAlwaysEmpty(msg);
223 }
224
225 @Override
226 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
227 throws Exception {
228 super.channelClosed(ctx, e);
229
230 if (failOnMissingResponse) {
231 long missingResponses = requestResponseCounter.get();
232 if (missingResponses > 0) {
233 throw new PrematureChannelClosureException(
234 "Channel closed but still missing " +
235 missingResponses + " response(s)");
236 }
237 }
238 }
239
240 }
241 }