This repository hosts a complete workshop on Reactor Netty
.
When finishing the workshop one will know how to create and utilize:
-
UDP
server and client -
TCP
server and client -
HTTP
server and client
Reference documentation can be useful while creating those servers and clients:
-
Netty documentation (Reactor Netty uses Netty 4.1.x)
How to start
-
Clone/fork this repository https://github.com/violetagg/reactor-netty-workshop
-
Import the project as
Maven
one in yourIDE
-
Make sure that the language level is set to
Java 8
in yourIDE
project settings -
Follow this script to create
HTTP
/TCP
/UDP
server and client -
Fix the
TODO
one by one in the tests underio.spring.workshop.reactornetty
package in order to make the unit tests green -
The solution is available in the
complete
branch for comparison. Each step of this workshop has its companion commit in the git history with a detailed commit message.
UDP server and client
If one wants to utilize the UDP
protocol one will need to create a UDP
servers that can send packages
to each other or even UDP
clients that can connect to specific UDP
servers.
Reactor Netty
provides easy to use and configure UdpServer
and UdpClient
, they hide most of the
Netty
functionality that is needed in order to create UDP
server and client, and in addition add
Reactive Streams
backpressure.
Creating UDP server
UdpServer
allows to build, configure and materialize a UDP
server.
Invoking UdpServer.create()
one can prepare the UDP
server for configuration. Having already a UdpServer
instance, one can start configuring the host, port, the IO handler etc.
For configuration purposes, an immutable builder pattern is used.
In the example below the UDP
server will be bound on port 8080
and will not use a random port.
UdpServer.create() // Prepares a UDP server for configuration.
.port(0) // Configures the port number as zero, this will let the system
// pick up an ephemeral port when binding the server.
.port(8080)// Configures the port number as 8080.
.bind() // Binds the UDP server and returns a Mono<Connection>.
.block(); // Blocks and waits the server to finish initialising.
When finished with the server configuration, invoking UdpServer.bind()
, one will bind the server
and Mono<Connection>
will be return, subscribing to this Publisher
one can react
on a successfully finished operation or handle issues that might happen. On the other hand
cancelling this Mono
, the underlying binding will be aborted. If one do not need to interact with the Mono
,
there is UdpServer.bindNow(Duration)
which is a convenient method for binding the server and obtaining the
Connection
.
The Connection
that is emitted as a result of a successfully bound server, holds contextual information
for the underlying channel and provides a non-blocking resource disposing API.
Disposing resource is very important so once the server is not necessary anymore, it must be disposed
by the user via Connection.dispose()
or Connection.disposeNow()
. The difference between these two methods is
that the second one will release the resources and close the underlying channel in a blocking fashion.
Configuring UDP server
Configuring the Wire Logger
There are use case when one wants to enable a wire logger in order to understand what’s wrong with the server.
For that purposes UdpServer.wiretap
can be used and this will use reactor.netty.udp.UdpServer
category
with DEBUG
level. There are also UdpServer.wiretap(String)
and UdpServer.wiretap(String, LogLevel)
so that
one can change the category and the log level.
Configuring the Event Loop Group
By default the UDP
server will use Event Loop Group where the number of the worker threads will be
the number of processors available to the runtime on init (but with a minimum value of 4). When
one needs a different configuration, one of the LoopResource.create
methods can be used in order to configure a new
Event Loop Group. Once the Event Loop Group is configured, the new configuration can be provided using one of the
UdpServer.runOn
.
Add UDP server IO handler that will echo the received package
UdpServer.handle(BiFunction<UdpInbound, UdpOutbound, Publisher<Void>>)
should be used if one wants to attach IO
handler that will process the incoming packages and will eventually send packages as reply.
UdpInbound
is used to receive bytes from the peer where UdpInbound.receiveObject()
returns the pure inbound
Flux
, while UdpInbound.receive()
returns ByteBufFlux
which provides an extra API to handle the incoming traffic.
UdpOutbound
is used to send bytes to the peer, listen for any error returned by the write operation
and close on terminal signal (complete|error). If more than one Publisher
is attached
(multiple calls to UdpOutbound.send*
methods), completion occurs when all publishers complete.
The Publisher<Void>
that has to be returned as a result represents the sequence of the operations that will be
applied for the incoming and outgoing traffic.
The packages that will be received and that will be send are handled by io.netty.channel.socket.DatagramPacket
.
For example if one wants to transform an incoming package and then to prepare a new one for sending, the snippet bellow
can be used:
if (o instanceof DatagramPacket) {
// Incoming DatagramPacket
DatagramPacket p = (DatagramPacket) o;
ByteBuf buf1 = Unpooled.copiedBuffer("Hello ", CharsetUtil.UTF_8);
// Creates a new ByteBuf using the incoming DatagramPacket content.
ByteBuf buf2 = Unpooled.copiedBuffer(buf1, p.content()
.retain());
// Creates a new DatagramPacket with the ByteBuf and the sender
// information from the incoming DatagramPacket.
return new DatagramPacket(buf2, p.sender());
}
Creating UDP client
UdpClient
allows to build, configure and materialize a UDP
client.
Invoking UdpClient.create()
one can prepare the UDP
client for configuration. Having already a UdpClient
instance, one can start configuring the host, port, the IO handler etc.
For configuration purposes, the same immutable builder pattern is used as in UdpServer
.
When finished with the client configuration, invoking UdpClient.connect()
, one will connect the client
and Mono<Connection>
will be return, subscribing to this Publisher
one can react
on a successfully finished operation or handle issues that might happen. On the other hand
cancelling this Mono
, the underlying connecting operation will be aborted. If one do not need to interact with the
Mono
, there is UdpClient.connectNow(Duration)
which is a convenient method for connecting the client and obtaining
the Connection
.
As already described in the UDP
server section, disposing the resources can be done via Connection.dispose()
or Connection.disposeNow()
.
Configuring UDP client
Add UDP client IO handler that will send a package and will react on an incoming packages
UdpClient.handle(BiFunction<UdpInbound, UdpOutbound, Publisher<Void>>)
should be used if one wants to attach IO
handler that will process the incoming packages and will eventually send packages as reply.
Here as a convenience UdpOutbound.send*
(e.g. UdpOutbound.sendString
) methods can be used instead of
UdpOutbound.sendObject
as the client is connected to exactly one UDP
server. The same is also for
using UdpInbound.receive()
instead of UdpInbound.receiveObject()
.
Utilize Mono<Connection>
In the section for the UDP
server creation was described that as a result of UdpServer.bind
one will
receive Mono<Connection>
which will emit (complete|error) signals.
Utilizing this Mono
one can send a datagram package (the snippet below) as soon as the UDP
server is
bound successfully.
DatagramChannel udp = DatagramChannel.open();
udp.configureBlocking(true);
udp.connect(new InetSocketAddress(server1.address().getPort()));
byte[] data = new byte[1024];
new Random().nextBytes(data);
for (int i = 0; i < 4; i++) {
udp.write(ByteBuffer.wrap(data));
}
udp.close();
TCP server and client
If one wants to utilize the TCP
protocol one will need to create a TCP
servers that can send packages
to the connected clients or TCP
clients that can connect to specific TCP
servers.
Reactor Netty
provides easy to use and configure TcpServer
and TcpClient
, they hide most of the
Netty
functionality that is needed in order to create with TCP
server and client, and in addition add
Reactive Streams
backpressure.
Creating TCP server
TcpServer
allows to build, configure and materialize a TCP
server.
Invoking TcpServer.create()
one can prepare the TCP
server for configuration. Having already a TcpServer
instance, one can start configuring the host, port, the IO handler etc.
For configuration purposes, the same immutable builder pattern is used as in UdpServer
.
When finished with the server configuration, invoking TcpServer.bind
, one will bind the server
and Mono<DisposableServer>
will be return, subscribing to this Publisher
one can react
on a successfully finished operation or handle issues that might happen. On the other hand
cancelling this Mono
, the underlying connecting operation will be aborted. If one do not need to interact with the
Mono
, there is TcpServer.bindNow(Duration)
which is a convenient method for binding the server and obtaining
the DisposableServer
.
DisposableServer
holds contextual information for the underlying server.
Disposing the resources can be done via DisposableServer.dispose()
or DisposableServer.disposeNow()
.
Enabling SSL support for TCP server
TcpServer
provides several convenient methods for configuring SSL:
-
TcpServer.secure(SslContext)
where SslContext is already configured -
TcpServer.secure(Consumer<? super SslProvider.SslContextSpec>)
where the SSL configuration customization can be done via the passed builder.
Add TCP server IO handler that will send a file
TcpServer.handle(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>)
should be used if one wants to attach IO
handler that will process the incoming messages and will eventually send messages as a reply.
NettyInbound
is used to receive bytes from the peer where NettyInbound.receiveObject()
returns the pure inbound
Flux
, while NettyInbound.receive()
returns ByteBufFlux
which provides an extra API to handle the incoming traffic.
NettyOutbound
is used to send bytes to the peer, listen for any error returned by the write operation
and close on terminal signal (complete|error). If more than one Publisher
is attached
(multiple calls to NettyOutbound.send*
methods), completion occurs when all publishers complete.
The Publisher<Void>
that has to be returned as a result represents the sequence of the operations that will be
applied for the incoming and outgoing traffic.
For example if one wants to send a file to the client where the file name is received as an incoming package,
the snippet bellow can be used:
.handle((in, out) ->
in.receive()
.asString()
.flatMap(s -> {
try {
Path file = Paths.get(getClass().getResource(s).toURI());
return out.sendFile(file)
.then();
} catch (URISyntaxException e) {
return Mono.error(e);
}
}))
Creating TCP client
TcpClient
allows to build, configure and materialize a TCP
client.
Invoking TcpClient.create()
one can prepare the TCP
client for configuration. Having already a TcpClient
instance, one can start configuring the host, port, the IO handler etc.
For configuration purposes, the same immutable builder pattern is used as in UdpServer
.
When finished with the client configuration, invoking TcpClient.connect()
, one will connect the client
and Mono<Connection>
will be return, subscribing to this Publisher
one can react
on a successfully finished operation or handle issues that might happen. On the other hand
cancelling this Mono
, the underlying connecting operation will be aborted. If one do not need to interact with the
Mono
, there is TcpClient.connectNow(Duration)
which is a convenient method for connecting the client and obtaining
the Connection
.
As already described in the UDP
server section, disposing the resources can be done via Connection.dispose()
or Connection.disposeNow()
.
Enabling SSL support for TCP client
TcpClient
provides several convenient methods for configuring SSL.
When one wants to use the default SSL configuration provided by Reactor Netty TcpClient.secure()
can be used.
If additional configuration is necessary then one of the following methods can be used:
-
TcpClient.secure(SslContext)
where SslContext is already configured -
TcpClient.secure(Consumer<? super SslProvider.SslContextSpec>)
where the SSL configuration customization can be done via the passed builder.
Add TCP client IO handler
TcpClient.handle(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>)
should be used if one wants to attach IO
handler that will process the incoming messages and will eventually send messages as reply.
Here as a convenience NettyOutbound.send*
(e.g. NettyOutbound.sendString
) methods can be used instead of
NettyOutbound.sendObject
. The same is also for using NettyInbound.receive()
instead of
NettyInbound.receiveObject()
.
HTTP server and client
Creating HTTP server
HttpServer
allows to build, configure and materialize a HTTP
server.
Invoking HttpServer.create()
one can prepare the HTTP
server for configuration. Having already a HttpServer
instance, one can start configuring the host, port, the IO handler, compression etc.
For configuration purposes, the same immutable builder pattern is used as in UdpServer
.
When finished with the server configuration, invoking HttpServer.bind
, one will bind the server
and Mono<DisposableServer>
will be return, subscribing to this Publisher
one can react
on a successfully finished operation or handle issues that might happen. On the other hand
cancelling this Mono
, the underlying connecting operation will be aborted. If one do not need to interact with the
Mono
, there is HttpServer.bindNow(Duration)
which is a convenient method for binding the server and obtaining
the DisposableServer
.
Disposing the resources can be done via DisposableServer.dispose()
or DisposableServer.disposeNow()
.
Defining routes for the HTTP server
In HttpServer
one can handle the incoming requests and outgoing responses using
HttpServer.handle(BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>)
which is similar to the
mechanism that was already described for UdpServer
/TcpServer
. However there is also a possibility to specify
concrete routes and HTTP methods that the server will respond. This can be done using
HttpServer.route(Consumer<HttpServerRoutes>)
. Using HttpServerRoutes
one can specify the HTTP method, paths etc.
For example the snippet below specifies that the server will respond only on POST
method, where the path starts with
/test
and has a path parameter.
.route(routes ->
routes.post("/test/{param}", (req, res) ->
res.sendString(req.receive()
.asString()
.map(s -> s + ' ' + req.param("param") + '!'))))
HttpServerRequest
provides API for accessing http request attributes as method, path, headers, path parameters etc.
as well as to receive the request body.
HttpServerResponse
provides API for accessing http response attributes as status code, headers, compression etc.
as well as to send the response body.
Creating HTTP client
HttpClient
allows to build, configure and materialize a HTTP
client.
Invoking HttpClient.create()
one can prepare the HTTP
client for configuration. Having already a HttpClient
instance, one can start configuring the host, port, headers, compression etc.
For configuration purposes, the same immutable builder pattern is used as in UdpServer
.
When finished with the client configuration, invoking HttpClient.get|post|…
methods, one will receive
HttpClient.RequestSender
and will be able start configuring the HTTP
request such as the uri and the request body.
HttpClient.RequestSender.send*
will end the HTTP request’s configuration and one can start discribing the actions
on the HTTP
response when it is received on the returned HttpClient.ResponseReceiver
, the response body can be obtained via the provided
HttpClient.ResponseReceiver.response*
methods. As HttpClient.ResponseReceiver
API always returns Publisher
,
the request and response executions are always deferred to the moment when there is a Subscriber
that subscribes to the defined sequence. For example in the snippet below block()
will subscribe to the defined
sequence and in fact will trigger the execution.
In the snippet below can be used to send POST request with a body and received the answer from the server:
HttpClient.create() // Prepares a HTTP client for configuration.
.port(server.port()) // Obtain the server's port and provide it as a port to which this
// client should connect.
.wiretap(true) // Applies a wire logger configuration.
.headers(h -> h.add("Content-Type", "text/plain")) // Adds headers to the HTTP request.
.post() // Specifies that POST method will be used.
.uri("/test/World") // Specifies the path.
.send(ByteBufFlux.fromString(Flux.just("Hello"))) // Sends the request body.
.responseContent() // Receives the response body.
.aggregate()
.asString()
.block();