This repository hosts a complete workshop on Reactor Netty.

When finishing the workshop one will know how to create and utilize:

  • UDP server and client

  • TCP server and client

  • HTTP server and client

Reference documentation can be useful while creating those servers and clients:

Prerequisites

  • Java 8

  • Java IDE installed with Maven support

How to start

  • Clone/fork this repository https://github.com/violetagg/reactor-netty-workshop

  • Import the project as Maven one in your IDE

  • Make sure that the language level is set to Java 8 in your IDE project settings

  • Follow this script to create HTTP/TCP/UDP server and client

  • Fix the TODO one by one in the tests under io.spring.workshop.reactornetty package in order to make the unit tests green

  • The solution is available in the complete branch for comparison. Each step of this workshop has its companion commit in the git history with a detailed commit message.

UDP server and client

If one wants to utilize the UDP protocol one will need to create a UDP servers that can send packages to each other or even UDP clients that can connect to specific UDP servers. Reactor Netty provides easy to use and configure UdpServer and UdpClient, they hide most of the Netty functionality that is needed in order to create UDP server and client, and in addition add Reactive Streams backpressure.

Creating UDP server

UdpServer allows to build, configure and materialize a UDP server. Invoking UdpServer.create() one can prepare the UDP server for configuration. Having already a UdpServer instance, one can start configuring the host, port, the IO handler etc. For configuration purposes, an immutable builder pattern is used. In the example below the UDP server will be bound on port 8080 and will not use a random port.

UdpServer.create()  // Prepares a UDP server for configuration.
         .port(0)   // Configures the port number as zero, this will let the system
                    // pick up an ephemeral port when binding the server.
         .port(8080)// Configures the port number as 8080.
         .bind()    // Binds the UDP server and returns a Mono<Connection>.
         .block();  // Blocks and waits the server to finish initialising.

When finished with the server configuration, invoking UdpServer.bind(), one will bind the server and Mono<Connection> will be return, subscribing to this Publisher one can react on a successfully finished operation or handle issues that might happen. On the other hand cancelling this Mono, the underlying binding will be aborted. If one do not need to interact with the Mono, there is UdpServer.bindNow(Duration) which is a convenient method for binding the server and obtaining the Connection. The Connection that is emitted as a result of a successfully bound server, holds contextual information for the underlying channel and provides a non-blocking resource disposing API. Disposing resource is very important so once the server is not necessary anymore, it must be disposed by the user via Connection.dispose() or Connection.disposeNow(). The difference between these two methods is that the second one will release the resources and close the underlying channel in a blocking fashion.

Configuring UDP server

Configuring the Wire Logger

There are use case when one wants to enable a wire logger in order to understand what’s wrong with the server. For that purposes UdpServer.wiretap can be used and this will use reactor.netty.udp.UdpServer category with DEBUG level. There are also UdpServer.wiretap(String) and UdpServer.wiretap(String, LogLevel) so that one can change the category and the log level.

Configuring the Event Loop Group

By default the UDP server will use Event Loop Group where the number of the worker threads will be the number of processors available to the runtime on init (but with a minimum value of 4). When one needs a different configuration, one of the LoopResource.create methods can be used in order to configure a new Event Loop Group. Once the Event Loop Group is configured, the new configuration can be provided using one of the UdpServer.runOn.

Add UDP server IO handler that will echo the received package

UdpServer.handle(BiFunction<UdpInbound, UdpOutbound, Publisher<Void>>) should be used if one wants to attach IO handler that will process the incoming packages and will eventually send packages as reply. UdpInbound is used to receive bytes from the peer where UdpInbound.receiveObject() returns the pure inbound Flux, while UdpInbound.receive() returns ByteBufFlux which provides an extra API to handle the incoming traffic. UdpOutbound is used to send bytes to the peer, listen for any error returned by the write operation and close on terminal signal (complete|error). If more than one Publisher is attached (multiple calls to UdpOutbound.send* methods), completion occurs when all publishers complete. The Publisher<Void> that has to be returned as a result represents the sequence of the operations that will be applied for the incoming and outgoing traffic. The packages that will be received and that will be send are handled by io.netty.channel.socket.DatagramPacket. For example if one wants to transform an incoming package and then to prepare a new one for sending, the snippet bellow can be used:

if (o instanceof DatagramPacket) {
    // Incoming DatagramPacket
    DatagramPacket p = (DatagramPacket) o;
    ByteBuf buf1 = Unpooled.copiedBuffer("Hello ", CharsetUtil.UTF_8);
    // Creates a new ByteBuf using the incoming DatagramPacket content.
    ByteBuf buf2 = Unpooled.copiedBuffer(buf1, p.content()
                                                .retain());
    // Creates a new DatagramPacket with the ByteBuf and the sender
    // information from the incoming DatagramPacket.
    return new DatagramPacket(buf2, p.sender());
}

Creating UDP client

UdpClient allows to build, configure and materialize a UDP client. Invoking UdpClient.create() one can prepare the UDP client for configuration. Having already a UdpClient instance, one can start configuring the host, port, the IO handler etc. For configuration purposes, the same immutable builder pattern is used as in UdpServer.

When finished with the client configuration, invoking UdpClient.connect(), one will connect the client and Mono<Connection> will be return, subscribing to this Publisher one can react on a successfully finished operation or handle issues that might happen. On the other hand cancelling this Mono, the underlying connecting operation will be aborted. If one do not need to interact with the Mono, there is UdpClient.connectNow(Duration) which is a convenient method for connecting the client and obtaining the Connection. As already described in the UDP server section, disposing the resources can be done via Connection.dispose() or Connection.disposeNow().

Configuring UDP client

Configuring the Wire Logger

UdpClient.wiretap can be used for wire logging and this will use reactor.netty.udp.UdpClient category with DEBUG level. There are also UdpClient.wiretap(String) and UdpClient.wiretap(String, LogLevel) so that one can change the category and the log level.

Configuring the Event Loop Group

The default configuration for the Event Loop Group is the same as in UDP server. When one needs a different configuration, UdpClient.runOn methods can be used.

Add UDP client IO handler that will send a package and will react on an incoming packages

UdpClient.handle(BiFunction<UdpInbound, UdpOutbound, Publisher<Void>>) should be used if one wants to attach IO handler that will process the incoming packages and will eventually send packages as reply. Here as a convenience UdpOutbound.send* (e.g. UdpOutbound.sendString) methods can be used instead of UdpOutbound.sendObject as the client is connected to exactly one UDP server. The same is also for using UdpInbound.receive() instead of UdpInbound.receiveObject().

Utilize Mono<Connection>

In the section for the UDP server creation was described that as a result of UdpServer.bind one will receive Mono<Connection> which will emit (complete|error) signals. Utilizing this Mono one can send a datagram package (the snippet below) as soon as the UDP server is bound successfully.

DatagramChannel udp = DatagramChannel.open();
udp.configureBlocking(true);
udp.connect(new InetSocketAddress(server1.address().getPort()));

byte[] data = new byte[1024];
new Random().nextBytes(data);
for (int i = 0; i < 4; i++) {
    udp.write(ByteBuffer.wrap(data));
}

udp.close();

TCP server and client

If one wants to utilize the TCP protocol one will need to create a TCP servers that can send packages to the connected clients or TCP clients that can connect to specific TCP servers. Reactor Netty provides easy to use and configure TcpServer and TcpClient, they hide most of the Netty functionality that is needed in order to create with TCP server and client, and in addition add Reactive Streams backpressure.

Creating TCP server

TcpServer allows to build, configure and materialize a TCP server. Invoking TcpServer.create() one can prepare the TCP server for configuration. Having already a TcpServer instance, one can start configuring the host, port, the IO handler etc. For configuration purposes, the same immutable builder pattern is used as in UdpServer.

When finished with the server configuration, invoking TcpServer.bind, one will bind the server and Mono<DisposableServer> will be return, subscribing to this Publisher one can react on a successfully finished operation or handle issues that might happen. On the other hand cancelling this Mono, the underlying connecting operation will be aborted. If one do not need to interact with the Mono, there is TcpServer.bindNow(Duration) which is a convenient method for binding the server and obtaining the DisposableServer. DisposableServer holds contextual information for the underlying server. Disposing the resources can be done via DisposableServer.dispose() or DisposableServer.disposeNow().

Enabling SSL support for TCP server

TcpServer provides several convenient methods for configuring SSL:

  • TcpServer.secure(SslContext) where SslContext is already configured

  • TcpServer.secure(Consumer<? super SslProvider.SslContextSpec>) where the SSL configuration customization can be done via the passed builder.

Add TCP server IO handler that will send a file

TcpServer.handle(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>) should be used if one wants to attach IO handler that will process the incoming messages and will eventually send messages as a reply. NettyInbound is used to receive bytes from the peer where NettyInbound.receiveObject() returns the pure inbound Flux, while NettyInbound.receive() returns ByteBufFlux which provides an extra API to handle the incoming traffic. NettyOutbound is used to send bytes to the peer, listen for any error returned by the write operation and close on terminal signal (complete|error). If more than one Publisher is attached (multiple calls to NettyOutbound.send* methods), completion occurs when all publishers complete. The Publisher<Void> that has to be returned as a result represents the sequence of the operations that will be applied for the incoming and outgoing traffic. For example if one wants to send a file to the client where the file name is received as an incoming package, the snippet bellow can be used:

.handle((in, out) ->
        in.receive()
          .asString()
          .flatMap(s -> {
              try {
                  Path file = Paths.get(getClass().getResource(s).toURI());
                  return out.sendFile(file)
                            .then();
              } catch (URISyntaxException e) {
                  return Mono.error(e);
              }
          }))

Creating TCP client

TcpClient allows to build, configure and materialize a TCP client. Invoking TcpClient.create() one can prepare the TCP client for configuration. Having already a TcpClient instance, one can start configuring the host, port, the IO handler etc. For configuration purposes, the same immutable builder pattern is used as in UdpServer.

When finished with the client configuration, invoking TcpClient.connect(), one will connect the client and Mono<Connection> will be return, subscribing to this Publisher one can react on a successfully finished operation or handle issues that might happen. On the other hand cancelling this Mono, the underlying connecting operation will be aborted. If one do not need to interact with the Mono, there is TcpClient.connectNow(Duration) which is a convenient method for connecting the client and obtaining the Connection. As already described in the UDP server section, disposing the resources can be done via Connection.dispose() or Connection.disposeNow().

Enabling SSL support for TCP client

TcpClient provides several convenient methods for configuring SSL. When one wants to use the default SSL configuration provided by Reactor Netty TcpClient.secure() can be used. If additional configuration is necessary then one of the following methods can be used:

  • TcpClient.secure(SslContext) where SslContext is already configured

  • TcpClient.secure(Consumer<? super SslProvider.SslContextSpec>) where the SSL configuration customization can be done via the passed builder.

Add TCP client IO handler

TcpClient.handle(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>) should be used if one wants to attach IO handler that will process the incoming messages and will eventually send messages as reply. Here as a convenience NettyOutbound.send* (e.g. NettyOutbound.sendString) methods can be used instead of NettyOutbound.sendObject. The same is also for using NettyInbound.receive() instead of NettyInbound.receiveObject().

HTTP server and client

Creating HTTP server

HttpServer allows to build, configure and materialize a HTTP server. Invoking HttpServer.create() one can prepare the HTTP server for configuration. Having already a HttpServer instance, one can start configuring the host, port, the IO handler, compression etc. For configuration purposes, the same immutable builder pattern is used as in UdpServer.

When finished with the server configuration, invoking HttpServer.bind, one will bind the server and Mono<DisposableServer> will be return, subscribing to this Publisher one can react on a successfully finished operation or handle issues that might happen. On the other hand cancelling this Mono, the underlying connecting operation will be aborted. If one do not need to interact with the Mono, there is HttpServer.bindNow(Duration) which is a convenient method for binding the server and obtaining the DisposableServer. Disposing the resources can be done via DisposableServer.dispose() or DisposableServer.disposeNow().

Defining routes for the HTTP server

In HttpServer one can handle the incoming requests and outgoing responses using HttpServer.handle(BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>) which is similar to the mechanism that was already described for UdpServer/TcpServer. However there is also a possibility to specify concrete routes and HTTP methods that the server will respond. This can be done using HttpServer.route(Consumer<HttpServerRoutes>). Using HttpServerRoutes one can specify the HTTP method, paths etc. For example the snippet below specifies that the server will respond only on POST method, where the path starts with /test and has a path parameter.

.route(routes ->
        routes.post("/test/{param}", (req, res) ->
                res.sendString(req.receive()
                                  .asString()
                                  .map(s -> s + ' ' + req.param("param") + '!'))))

HttpServerRequest provides API for accessing http request attributes as method, path, headers, path parameters etc. as well as to receive the request body. HttpServerResponse provides API for accessing http response attributes as status code, headers, compression etc. as well as to send the response body.

Creating HTTP client

HttpClient allows to build, configure and materialize a HTTP client. Invoking HttpClient.create() one can prepare the HTTP client for configuration. Having already a HttpClient instance, one can start configuring the host, port, headers, compression etc. For configuration purposes, the same immutable builder pattern is used as in UdpServer.

When finished with the client configuration, invoking HttpClient.get|post|…​ methods, one will receive HttpClient.RequestSender and will be able start configuring the HTTP request such as the uri and the request body. HttpClient.RequestSender.send* will end the HTTP request’s configuration and one can start discribing the actions on the HTTP response when it is received on the returned HttpClient.ResponseReceiver, the response body can be obtained via the provided HttpClient.ResponseReceiver.response* methods. As HttpClient.ResponseReceiver API always returns Publisher, the request and response executions are always deferred to the moment when there is a Subscriber that subscribes to the defined sequence. For example in the snippet below block() will subscribe to the defined sequence and in fact will trigger the execution.

In the snippet below can be used to send POST request with a body and received the answer from the server:

HttpClient.create()             // Prepares a HTTP client for configuration.
          .port(server.port())  // Obtain the server's port and provide it as a port to which this
                                // client should connect.
          .wiretap(true)        // Applies a wire logger configuration.
          .headers(h -> h.add("Content-Type", "text/plain")) // Adds headers to the HTTP request.
          .post()              // Specifies that POST method will be used.
          .uri("/test/World")  // Specifies the path.
          .send(ByteBufFlux.fromString(Flux.just("Hello")))  // Sends the request body.
          .responseContent()   // Receives the response body.
          .aggregate()
          .asString()
          .block();