1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package goldengate.ftp.core.data.handler;
22
23 import goldengate.ftp.core.command.FtpArgumentCode.TransferMode;
24 import goldengate.ftp.core.command.FtpArgumentCode.TransferStructure;
25 import goldengate.ftp.core.command.FtpArgumentCode.TransferSubType;
26 import goldengate.ftp.core.command.FtpArgumentCode.TransferType;
27 import goldengate.ftp.core.config.FtpConfiguration;
28
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelPipelineFactory;
31 import org.jboss.netty.channel.Channels;
32 import org.jboss.netty.handler.execution.ExecutionHandler;
33 import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
34
35
36
37
38
39
40
41 public class FtpDataPipelineFactory implements ChannelPipelineFactory {
42
43
44
45 public static final String CODEC_MODE = "MODE";
46
47
48
49
50 public static final String CODEC_LIMIT = "LIMITATION";
51
52
53
54
55 public static final String CODEC_TYPE = "TYPE";
56
57
58
59
60 public static final String CODEC_STRUCTURE = "STRUCTURE";
61
62
63
64
65 public static final String PIPELINE_EXECUTOR = "pipelineExecutor";
66
67
68
69
70 public static final String HANDLER = "handler";
71
72 private static final FtpDataTypeCodec ftpDataTypeCodec = new FtpDataTypeCodec(
73 TransferType.ASCII, TransferSubType.NONPRINT);
74
75 private static final FtpDataStructureCodec ftpDataStructureCodec = new FtpDataStructureCodec(
76 TransferStructure.FILE);
77
78
79
80
81 private final Class<? extends DataBusinessHandler> dataBusinessHandler;
82
83
84
85
86 private final FtpConfiguration configuration;
87
88
89
90
91 private final boolean isActive;
92
93
94
95
96
97
98
99
100 public FtpDataPipelineFactory(
101 Class<? extends DataBusinessHandler> dataBusinessHandler,
102 FtpConfiguration configuration, boolean active) {
103 this.dataBusinessHandler = dataBusinessHandler;
104 this.configuration = configuration;
105 isActive = active;
106 }
107
108
109
110
111
112
113 public ChannelPipeline getPipeline() throws Exception {
114 ChannelPipeline pipeline = Channels.pipeline();
115
116 pipeline.addFirst(CODEC_MODE, new FtpDataModeCodec(TransferMode.STREAM,
117 TransferStructure.FILE));
118 pipeline
119 .addLast(CODEC_LIMIT, configuration
120 .getFtpInternalConfiguration()
121 .getGlobalTrafficShapingHandler());
122 ChannelTrafficShapingHandler limitChannel =
123 configuration
124 .getFtpInternalConfiguration()
125 .newChannelTrafficShapingHandler();
126 if (limitChannel != null) {
127 pipeline.addLast(CODEC_LIMIT + "CHANNEL", limitChannel);
128 }
129 pipeline.addLast(CODEC_TYPE, ftpDataTypeCodec);
130 pipeline.addLast(CODEC_STRUCTURE, ftpDataStructureCodec);
131
132 pipeline.addLast(PIPELINE_EXECUTOR, new ExecutionHandler(configuration
133 .getFtpInternalConfiguration().getDataPipelineExecutor()));
134
135 DataBusinessHandler newbusiness = dataBusinessHandler.newInstance();
136 DataNetworkHandler newNetworkHandler = new DataNetworkHandler(
137 configuration, newbusiness, isActive);
138 pipeline.addLast(HANDLER, newNetworkHandler);
139 return pipeline;
140 }
141 }