Introduction to RSocket
From Spring Boot in Practice by Somnath Musib
This article explores Rsocket and how it interacts with Spring Boot.
In this article, we’ll explore the RSocket protocol and its use with Spring Boot.
RSocket is an application protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports such as Aeron. RSocket allows the following four communication models as shown in figure 1:
In RSocket, once the initial handshake between the client and server is done, the “client” vs “server” distinction is lost as both sides can independently initiate one of the interactions as specified in figure 1.
The RSocket protocol has few key features and offers several benefits:
- Reactive Streams semantics for streaming requests interactions Request-Stream and Channel. Support for backpressure signal between requester and responder. This allows a requester to slow down a responder at the source. Thus, reduces reliance on network layer congestion control and network-level buffering
- Support for Request throttling to reduce the number of possible messages. This can be done after sending a
frame to limit the total number of requests allowed by other ends for a given time - Fragmentation and re-assembly of large messages
- Keepalive through heartbeat messages
Next, let us demonstrate how to use the RSocket protocol in a Spring Boot application. We’ll implement all four interaction patterns as shown in figure 1.
TECHNIQUE: Developing applications using RSocket and Spring Boot
You learned about the RSocket protocol and need to use it in a Spring Boot application
Spring Framework provides support for the RSocket protocol in the spring-messaging
module. Spring Boot provides the spring-boot-starter-rsocket
starter dependency that includes the relevant dependencies to use RSocket in a Spring Boot application.
To begin, let us create a new Spring Boot project with the dependencies as shown in listing 1:
Listing 1. The pom.xml file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi=""
<relativePath/> <!-- lookup parent from repository -->
<description>Spring Boot RSocket</description>
The notable dependency in listing 1 is the spring-boot-starter-rsocket dependency. This transitively includes the other required dependencies such as spring-messaging
, rsocket-core,
In the file, let us include the following properties as shown listing 2:
Listing 2. configuration
The first property sets the TCP port for the RSocket server to 7000 and the second property enables the Spring Boot’s lazy initialization.
In this Spring Boot application, we’ll continue with the Course
domain object. The updated Course model is shown in Listing 3:
Listing 3. The Course Domain Class
package com.manning.sbip.ch08.model;
import java.time.Instant;
import java.util.UUID;
import lombok.Data;
import lombok.NoArgsConstructor;
public class Course {
private UUID courseId = UUID.randomUUID();
private long created =;
private String courseName;
public Course(String courseName) {
this.courseName = courseName;
The Course
class has a courseId
field which is a random UUID, a created
field that captures the course creation time and a courseName
field that is supplied by the user.
Next, let us define the CourseController
class that contains the routes for all four interaction models as specified in figure 1. Listing 4 shows the CourseController
Listing 4. The Course Controller
package com.manning.sbip.ch08.controller;
import java.time.Duration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import com.manning.sbip.ch08.model.Course;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CourseController {
# Implements the request-response interaction pattern. The user is expected to supply a course and this endpoint echos it back to the caller
public Mono<Course> requestResponse(final Course course) {"Received request-response course details {} ", course);
return Mono.just(new Course("Your course name: " + course.getCourseName()));
# Implements the fire-forget interaction pattern. The user is expected to supply a course and expects nothing. Thus, we are returning an empty Mono.
public Mono<Void> fireAndForget(final Course course) {"Received fire-and-forget course details {} ", course);
return Mono.empty();
# Implements the request-stream interaction pattern. The user is expected to supply a course and this endpoint returns a stream of course with modified course name in an interval of one second
public Flux<Course> requestStream(final Course course) {"Received request-stream course details {} ", course);
return Flux
.map(index -> new Course("Your course name: " + course.getCourseName() + ". Response #" + index))
# Implements the channel interaction pattern. The user is expected to supply a stream and this endpoint returns a stream of course with a modified course name in an interval configured by the user. The user can specify the interval by invoking the delayElements() method in the source Flux. Recall that in channel interaction patterns, both sides can send a stream of data.
public Flux<Course> channel(final Flux<Integer> settings) {"Received stream-stream (channel) request... ");
return settings
.doOnNext(setting ->"Requested interval is {} seconds", setting))
.doOnCancel(() -> log.warn("Client cancelled the channel"))
.switchMap(setting -> Flux.interval(Duration.ofSeconds(setting)).map(index -> new Course("Spring. Response #"+index)))
You can start the application and find that it is running on configured TCP port 7000. We’ll demonstrate two approaches to test the application. First, we can use RSocket Client CLI (RSC) to test the routes. It’s a command-line utility that lets you access the endpoints. You can go through for the steps to install this on your machine. Once you’ve installed it, access the request-response
route using the command as shown in Listing 5:
Listing 5. Invoking RSocket endpoint with rsc CLI
C:\Users\musib>rsc --debug --request --data "{\"courseName\":\"Spring\"}" --route request-response --stacktrace tcp://localhost:7000
2021-07-29 10:27:54.597 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
2021-07-29 10:27:54.607 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 53
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 |ponse |
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
|00000000| 7b 22 63 6f 75 72 73 65 4e 61 6d 65 22 3a 22 53 |{"courseName":"S|
|00000010| 70 72 69 6e 67 22 7d |pring"} |
2021-07-29 10:27:54.768 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 118
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
|00000000| 7b 22 63 6f 75 72 73 65 49 64 22 3a 22 32 33 39 |{"courseId":"239|
|00000010| 66 37 65 64 61 2d 65 31 61 64 2d 34 66 30 36 2d |f7eda-e1ad-4f06-|
|00000020| 62 66 30 64 2d 63 38 31 32 61 66 36 66 65 37 61 |bf0d-c812af6fe7a|
|00000030| 63 22 2c 22 63 72 65 61 74 65 64 22 3a 31 36 32 |c","created":162|
|00000040| 37 35 33 34 36 37 34 2c 22 63 6f 75 72 73 65 4e |7534674,"courseN|
|00000050| 61 6d 65 22 3a 22 59 6f 75 72 20 63 6f 75 72 73 |ame":"Your cours|
|00000060| 65 20 6e 61 6d 65 3a 20 53 70 72 69 6e 67 22 7d |e name: Spring"}|
{"courseId":"239f7eda-e1ad-4f06-bf0d-c812af6fe7ac","created":1627534674,"courseName":"Your course name: Spring"}
We have enabled the debug in the command to print the frame details. As you can see, the first frame send is SETUP
with some metadata and the payload. Last, it receives the response from the endpoint. In listing 5, we’ve shown how to test request-response with the RSC client. Similarly, you can test other patterns also with RSC. Refer to for a detailed understanding of the frame and the protocol.
Next, we can also write the integration test cases to test the endpoint. Listing 6 shows the test case:
Listing 6. Integration test to verify the request-response route
package com.manning.sbip.ch08;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.rsocket.context.LocalRSocketServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import com.manning.sbip.ch08.model.Course;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
class SpringBootRsocketApplicationTests {
private static RSocketRequester requester;
# Set up the RsocketRequester instance. The RSocketRequester.Builder interface lets us create a requester by connecting to the server
public static void setUpOnce(@Autowired RSocketRequester.Builder builder, @LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies rSocketStrategies) {
requester = builder.tcp("localhost", port);
public void testRequestResponse() {
# Send a request
Mono<Course> courseMono = requester
.data(new Course("Spring"))
# Verify the response
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring"))
In the testRequestResponse()
we send a request to the route and validate the expected response. In the requester instance, we set the route path, the data and then retrieve the response. Since this request-response
pattern, we expect a single response and it is captured in a Mono
. We then use the StepVerifier
to consume the response and assert the expected value from the response. Once the verification is done, we complete it with verifyComplete().
Let us now define the remaining test cases in the SpringBootRsocketApplicationTests
class. Listing 7 shows the fire and forget endpoint:
Listing 7. Testing Fire and Forget Endpoint
public void testFireAndForget() {
# Send a request
Mono<Void> courseMono = requester
.data(new Course("Spring"))
# Verify the response
Listing 8. shows the test case to test the Request Stream endpoint:
Listing 8. Testing the Request Stream Endpoint
public void testRequestStream() {
# Send a request and expect a stream of courses as Flux<Course>
Flux<Course> courseFlux = requester
.data(new Course("Spring"))
# Use StepVerifier to verify the response. We retrieve two courses from the stream and then cancel them to indicate we are not interested in further data from the stream.
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #0"))
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #1"))
Listing 9. shows the test case to test the channel endpoint:
Listing 9. Testing the Channel Endpoint
public void testChannel() {
# Create first setting after 0 seconds. Server starts sending after 2 # seconds.
Mono<Integer> setting1 = Mono.just(Integer.valueOf(2)).delayElement(Duration.ofSeconds(0));
# Create next setting after 3 seconds. Server starts sending in after 1
# second.
Mono<Integer> setting2 = Mono.just(Integer.valueOf(1)).delayElement(Duration.ofSeconds(3));
# Bundle settings into a Flux
Flux<Integer> settings = Flux.concat(setting1, setting2);
# Send a stream of request messages
Flux<Course> stream = requester.route("stream-stream").data(settings).retrieveFlux(Course.class);
# Verify that the response messages contain the expected data
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
In this technique, we’ve demonstrated the use of the RSocket
protocol in a Spring Boot application. We’ve seen the use of the spring-boot-starter-rsocket
dependency that puts the necessary dependencies into the application.
The Spring Boot also provides several auto-configuration classes that configure the RSocket in a Spring Boot application. Figure 2 shows these classes:
RsocketMessagingAutoConfiguration auto-configures the RsocketMessageHandler. This class handles RSocket requests for the methods defined with @ConnectMapping and @MessageMapping annotations.
RsocketRequesterAutoConfiguration auto-configures the RsocketRequester. This class provides a fluent API that can be used to accept and return input and output. It also provides methods to prepare routing and other metadata.
RsocketServerAutoConfiguration auto-configures the RSocket server. We’ve configured the spring.rsocket.server.port property to start the standalone RSocket server at port 7000.
The RsocketStrategiesAutoConfiguration auto-configures RsocketStrategies. This class defines the strategies for use by RSocket requester and responder components. Some of the strategies, for instance, are the decoder and encoder for the messages.
Last, the RsocketSecurityAutoConfiguration auto-configures Spring Security for an RSocket server. Securing the RSocket server with Spring Security is beyond the scope of this text. You can refer to the internet on this subject.
