用户:DukeAnt/Netty

Netty
开发者Netty项目社区
当前版本4.0.15.Final[1](2014年1月21日 (2014-01-21)
预览版本5.0.0.Alpha1(2013年12月22日 (2013-12-22)
编程语言Java
操作系统跨平台
类型企业集成模式英语Enterprise Integration Patterns
面向消息的中间件英语Message-oriented middleware
许可协议Apache许可证 2.0
网站netty.io

Netty is a non-blocking I/O (NIO) client-server framework for the development of Java network applications such as protocol servers and clients. The asynchronous event-driven network application framework and tools are used to simplify network programming such as TCP and UDP socket servers.[2] Netty includes an implementation of the reactor pattern of programming.

Besides being an asynchronous network application framework, Netty also includes built-in HTTP protocol support, including the ability to run inside a servlet container, support for WebSockets, integration with Google Protocol Buffers, SSL/TLS support, support for SPDY protocol and support for message compression. Netty has been around since before 2004.[3]

As of version 4.0.0Alpha, netty also supports the usage of NIO.2 as a backend, along with NIO and blocking Java sockets.

Netty TCP Example

In our first step we have to create a bootstrap. The bootstrap holds important classes such as the ChannelFactory and the PipelineFactory.

        public class NettyServer {

	        private final ServerBootstrap bootstrap;
	
	        public NettyServer() {

The next step requires us to create the resource managers. Netty lets us choose how we want our Channels to be created during runtime. By default the ChannelFactory will actually use exactly the same setup as below without the need to supply any parameters, however for this example we'll stick with the cachedThreadPools.

		final ExecutorService bossThreadPool = Executors.newCachedThreadPool();
		final ExecutorService workerThreadPool = Executors.newCachedThreadPool();

The reason for why you would want to use ExecutorServices is because Threads are expensive to create. Using the ExecutorService class we reduce the amount of work the Java Virtual Machine has to do when creating new Threads by caching old Threads and reusing them instead of deallocate / allocate.

The ChannelFactory is responsible for the I/O Netty performs. Having two ThreadPools supplied by the ExecutorServices you can adjust how quickly you wish your application to perform. What makes Netty so incredibly quick is that it is taking full advantage of the Java NIO libraries. You can change the Nio to Oio, this can be useful when designing Mobile applications for Android. Nio is known to be a little too heavy for mobile applications, by some considered a bad practice.

		final ChannelFactory channelFactory = 
				new NioServerSocketChannelFactory(bossThreadPool, workerThreadPool);
		
		
		this.bootstrap = new ServerBootstrap(channelFactory);

We supply the ChannelFactory to the ServerBootstrap class when invoking its constructor. We set a maximum character buffer length for our Delimiter.

		final int ALLOWED_CHARACTER_BUFFER_SIZE = 8192;

Now, the most important part! We supply the ServerBootstrap with a ChannelPipelineFactory, then we Override the interface getPipeline method. Inside this method we define our handlers. Since Netty uses ChannelBuffers, for simplicity we use the StringDecoder/Encoder handlers to decode and encode our I/O data. We also supply a DelimiterBasedFrameDecoder to detect when our input is supposed to be separated into a chunk.

Consider the following ChannelBuffer data:

           
		[N] [e] [t] [t] [y] [ ] [ ] [ ] [ ] [ ] [ ] [ ] ... [ ] [ ] [ ]

If the handler received the above 5 letters it would wait until a special character (the line delimiter) was intercepted by the stream. Now we append a '\n' to our buffer.

		[N] [e] [t] [t] [y] [\n] [ ] [ ] [ ] [ ] [ ] [ ] ... [ ] [ ] [ ]

The data is now ready to be returned by the ChannelBuffer, we receive it in our messageReceived(...) or handleUpstream(...) method in our handler(s). (handleDownstream(...) for outgoing messages)

		this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(
					new StringDecoder(CharsetUtil.UTF_8), // UpstreamHandler
					new StringEncoder(CharsetUtil.UTF_8), // DownstreamHandler
					new DelimiterBasedFrameDecoder(ALLOWED_CHARACTER_BUFFER_SIZE, Delimiters.lineDelimiter()), // Upstream
					new MyMessageHandler() // Custom ChannelUpstreamHandler
				);
			}
		});

We use a channel when attempting to bind the ServerBootstrap to ensure if the operation was successful

                final int LISTEN_PORT = 53233;
		Channel acceptor = this.bootstrap.bind(new InetSocketAddress(LISTEN_PORT));

We make sure the Server could bind to the port by calling the isBound() method in the Channel class.

		if(!acceptor.isBound()) {
			System.err.println("Server :: Error! Unable to bind to port " + LISTEN_PORT);
			System.exit(-1);
		}
		
		System.out.println("Server :: Successfully bound to port " + LISTEN_PORT + "!" +
				"\nAwaiting new connections...");

It is considered good practice to always release your resources when you are done using the ServerBootstrap. The releaseExternalResources() method will clear the boss- and worker-thread pool.

	public void shutdownServer() {
		this.bootstrap.releaseExternalResources();
	}

This is a separate application, we create an instance for the server and for simplicity we construct the Client in another main class.

	public static void main(String[] args) {
		new NettyServer();
	}
}

Next up is our custom ChannelHandler. You can see that by the name this is an UpstreamHandler. An UpstreamHandler can receive anything that the Server receives, additionally, an DownstreamHandler can capture packets that the server is about to send. However depending on where in the hierarchy level the handler is placed, it can be triggered in different states.

|------|              |------|            |------|
| DSH1 |->-("ABCDE")->| DSH2 |->-("BCD")->| DSH3 |->-("DCB")->[WEB]
|------|              |------|	          |------|

Above are three DownstreamHandlers each one with a specific task. The first (DSH1) DownstreamHandler is the DelimiterBasedFrameDecoder that just output a String "ABCDE" down the stream. The second (DSH2) DownstreamHandler intercepts the output from the previous DownstreamHandler and performs its specific logic on the input which in this case is to remove the vowels. Now the third (DSH3) DownstreamHandler will intercept the outgoing message and it is assigned to reverse the order of the letters. When there's no DonstreamHandlers left in the ChannelPipeline the output will be sent to the client/server.

The same principle applies to UpstreamHandlers. If you want to combine the functionality of the SimpleChannelDownstreamHandler and the SimpleChannelUpstreamHandler there is the class called SimpleChannelHandler. The SimpleChannelHandler class implements both the Down- and Up-stream interfaces which allows the handler to manage messages going both ways.

In this example, the SimpleChannelUpstreamHandler will be used. We create a new class below.

        public class MyMessageHandler extends SimpleChannelUpstreamHandler {

The messageReceived method is where all the messages that passes this UpstreamHandler will be caught. Below we can use the class MessageEvents getMessage() to retrieve the message it has intercepted.

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println("Server :: Received a new message saying: " + e.getMessage());

We can also use the class MessageEvents getChannel() to retrieve the Channel object created by the ChannelFactory we instantiated in the Server class. We can then use the Channel to perform a write() operation on the pipeline (This will go downstream from beginning of the pipeline to the end). It is important that you append a newline separator, '\n', if you want the ChannelBuffer to be cleaned and forwarded from the FrameDelimiter. By appending the 'delimiter' you send the String to its target destination.

		e.getChannel().write("Hello, client! Your IP is " + e.getRemoteAddress() + "!\n" +
				"We received your message saying: " + e.getMessage() + "\n");

We must not forget to call the super.messageReceived(...) for our superclass. If you do not do this, the message will be stuck in the pipeline.

		super.messageReceived(ctx, e);
        }

Override method for exceptions. It's good practice to Log the errors that occur in your handlers.

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		logger.log(Level.SEVERE, e.getCause().toString());
		/* We always call the method superclass. */
		super.exceptionCaught(ctx, e);
	}

We can specifically handle new connections. For example add the Channel to a ChannelGroup.

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("Server :: " + e.getChannel().getRemoteAddress() + " has connected!");
		/* We always call the method superclass. */
		super.channelConnected(ctx, e);
	}

We can also handle when a disconnection occur. Here we could remove the Channel from abovementioned ChannelGroup.

	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("Server :: " + e.getChannel().getRemoteAddress() + " has disconnected from the Server.");
		/* We always call the method superclass. */
		super.channelDisconnected(ctx, e);
	}

It is time to create the Client. We do this in a new class.

public class NettyClient {

When coding the Client, there very little things that differ from the Server class. The name ServerBootstrap has changed, the Client uses ClientBootstrap instead.

	private final ClientBootstrap bootstrap;

In our constructor we initialize all the functionality for this example.

	public NettyClient() {
		final ExecutorService bossThreadPool = Executors.newCachedThreadPool();
		final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
		
		final ChannelFactory factory = new NioClientSocketChannelFactory(bossThreadPool, workerThreadPool);

Everything is equal, except for the Bootstrap name. Here we construct the ClientBootstrap using equal parameters as the ServerBootstrap.

		this.bootstrap = new ClientBootstrap(factory);
		
		final int ALLOWED_CHARACTER_BUFFER_SIZE = 8192;
		
		this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(
					new StringDecoder(CharsetUtil.UTF_8),
					new StringEncoder(CharsetUtil.UTF_8),
					new DelimiterBasedFrameDecoder(ALLOWED_CHARACTER_BUFFER_SIZE, Delimiters.lineDelimiter()),
					
					/* We also add our Clients own ChannelHandler. */
					new ClientChannelHandler()
				);
			}
		});

Instead of only defining the port, we also define the host IP Address that we're supposed to connect to. In this case I'm running the Server on my own machine.

		final String HOSTNAME = "localhost";
		final int LISTEN_PORT = 53233;

Now we perform an asynchronous operation. Hence we have to invoke the ChannelFuture.awaitUninterruptibly() method to wait for it to complete. (Tip: It's a good habit to add a timeout to the awaitUninterruptibly method to hinder a possibly system jam, even better would be to add a ChannelFutureListener callback on the channel).

		ChannelFuture connection = this.bootstrap.connect(
			new InetSocketAddress(HOSTNAME, LISTEN_PORT)
		);

We access the isSuccess method flag to determine the status of the connection attempt. If the isSuccess() method returns true we can send messages through the connection.

		if(!connection.awaitUninterruptibly().isSuccess()) {
			System.err.println("Client :: Unable to connect to host " + HOSTNAME + ":" + LISTEN_PORT);
			System.exit(-1);
		}

                System.out.println("Client :: Successfully connected to host " + HOSTNAME + ":" + LISTEN_PORT);

We create a Channel called comLink. The ChannelFuture will provide us with a Channel that can be used to write messages across the stream.

		Channel comLink = connection.getChannel();

We output the String message through the DownstreamHandlers down to the Sink where it is sent to the Server.

		comLink.write("Hello, Server!\n");

For the sake of simplicity, I made a private class inside the Client class to handle the Upstream.

	private class ClientChannelHandler extends SimpleChannelUpstreamHandler {
		
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
			/* Extremely simple variant of a handler. Only for testing the communication
			 * between the two applications. */
			System.out.println("Received a message from the server: " + e.getMessage());
			super.messageReceived(ctx, e);
		}
		
	}

We launch the application after the Server has started and then try to connect.

	public static void main(String[] args) {
		new NettyClient();
	}
}

What you now have is a very simple application made with Netty. If you want to review the complete source you can find it on Github

参见

参考文献

  1. ^ Downloads. Netty.io. [2014-02-17]. 
  2. ^ Netty Project. Netty Project Community. 
  3. ^ Netty 2 1.0 Released. 

外部链接