ARTICLE

Introduction to RSocket

From Spring Boot in Practice by Somnath Musib

This article explores Rsocket and how it interacts with Spring Boot.

Take 35% off Spring Boot in Practice by entering fccmusib into the discount code box at checkout at manning.com.

In this article, we’ll explore the RSocket protocol and its use with Spring Boot.

RSocket is an application protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports such as Aeron. RSocket allows the following four communication models as shown in figure 1:

Figure 1. Communication models in RSocket protocol. In the Fire-and-Forget pattern, a client sends one message and expect no response from the server. In the Request-Response pattern, the client sends one message and receives one back from the server. In the Request-Stream pattern, a client sends one message and expects a stream of messages in response from the server. In the Channel pattern, the client and server send streams of messages to each other

In RSocket, once the initial handshake between the client and server is done, the “client” vs “server” distinction is lost as both sides can independently initiate one of the interactions as specified in figure 1.

The RSocket protocol has few key features and offers several benefits:

  • Reactive Streams semantics for streaming requests interactions Request-Stream and Channel. Support for backpressure signal between requester and responder. This allows a requester to slow down a responder at the source. Thus, reduces reliance on network layer congestion control and network-level buffering
  • Support for Request throttling to reduce the number of possible messages. This can be done after sending a LEASE frame to limit the total number of requests allowed by other ends for a given time
  • Fragmentation and re-assembly of large messages
  • Keepalive through heartbeat messages

Next, let us demonstrate how to use the RSocket protocol in a Spring Boot application. We’ll implement all four interaction patterns as shown in figure 1.

TECHNIQUE: Developing applications using RSocket and Spring Boot

Problem

You learned about the RSocket protocol and need to use it in a Spring Boot application

Solution

Spring Framework provides support for the RSocket protocol in the spring-messaging module. Spring Boot provides the spring-boot-starter-rsocket starter dependency that includes the relevant dependencies to use RSocket in a Spring Boot application.

Source Code

If you want to skip to the end and see the final version of the Spring Boot project, it is available at https://github.com/spring-boot-in-practice/repo/tree/main/ch08/spring-boot-rsocket

To begin, let us create a new Spring Boot project with the dependencies as shown in listing 1:

Listing 1. The pom.xml file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.manning.sbip.ch08</groupId>
<artifactId>spring-boot-rsocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-rsocket</name>
<description>Spring Boot RSocket</description>
<properties>
<java.version>16</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

The notable dependency in listing 1 is the spring-boot-starter-rsocket dependency. This transitively includes the other required dependencies such as spring-messaging, rsocket-core, etc.

In the application.properties file, let us include the following properties as shown listing 2:

Listing 2. application.properties configuration

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

The first property sets the TCP port for the RSocket server to 7000 and the second property enables the Spring Boot’s lazy initialization.

In this Spring Boot application, we’ll continue with the Course domain object. The updated Course model is shown in Listing 3:

Listing 3. The Course Domain Class

package com.manning.sbip.ch08.model;

import java.time.Instant;
import java.util.UUID;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class Course {

private UUID courseId = UUID.randomUUID();
private long created = Instant.now().getEpochSecond();
private String courseName;

public Course(String courseName) {
this.courseName = courseName;
}
}

The Course class has a courseId field which is a random UUID, a created field that captures the course creation time and a courseName field that is supplied by the user.

Next, let us define the CourseController class that contains the routes for all four interaction models as specified in figure 1. Listing 4 shows the CourseController class:

Listing 4. The Course Controller

package com.manning.sbip.ch08.controller;

import java.time.Duration;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import com.manning.sbip.ch08.model.Course;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@Controller
public class CourseController {

# Implements the request-response interaction pattern. The user is expected to supply a course and this endpoint echos it back to the caller
@MessageMapping("request-response")
public Mono<Course> requestResponse(final Course course) {
log.info("Received request-response course details {} ", course);
return Mono.just(new Course("Your course name: " + course.getCourseName()));
}

# Implements the fire-forget interaction pattern. The user is expected to supply a course and expects nothing. Thus, we are returning an empty Mono.
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndForget(final Course course) {
log.info("Received fire-and-forget course details {} ", course);
return Mono.empty();
}

# Implements the request-stream interaction pattern. The user is expected to supply a course and this endpoint returns a stream of course with modified course name in an interval of one second
@MessageMapping("request-stream")
public Flux<Course> requestStream(final Course course) {
log.info("Received request-stream course details {} ", course);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Course("Your course name: " + course.getCourseName() + ". Response #" + index))
.log();
}

# Implements the channel interaction pattern. The user is expected to supply a stream and this endpoint returns a stream of course with a modified course name in an interval configured by the user. The user can specify the interval by invoking the delayElements() method in the source Flux. Recall that in channel interaction patterns, both sides can send a stream of data.
@MessageMapping("stream-stream")
public Flux<Course> channel(final Flux<Integer> settings) {
log.info("Received stream-stream (channel) request... ");

return settings
.doOnNext(setting -> log.info("Requested interval is {} seconds", setting))
.doOnCancel(() -> log.warn("Client cancelled the channel"))
.switchMap(setting -> Flux.interval(Duration.ofSeconds(setting)).map(index -> new Course("Spring. Response #"+index)))
.log();
}
}

You can start the application and find that it is running on configured TCP port 7000. We’ll demonstrate two approaches to test the application. First, we can use RSocket Client CLI (RSC) to test the routes. It’s a command-line utility that lets you access the endpoints. You can go through https://github.com/making/rsc for the steps to install this on your machine. Once you’ve installed it, access the request-response route using the command as shown in Listing 5:

Listing 5. Invoking RSocket endpoint with rsc CLI

C:\Users\musib>rsc --debug --request --data "{\"courseName\":\"Spring\"}" --route request-response --stacktrace tcp://localhost:7000
2021-07-29 10:27:54.597 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2021-07-29 10:27:54.607 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 53
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 |ponse |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 63 6f 75 72 73 65 4e 61 6d 65 22 3a 22 53 |{"courseName":"S|
|00000010| 70 72 69 6e 67 22 7d |pring"} |
+--------+-------------------------------------------------+----------------+
2021-07-29 10:27:54.768 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 118
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 63 6f 75 72 73 65 49 64 22 3a 22 32 33 39 |{"courseId":"239|
|00000010| 66 37 65 64 61 2d 65 31 61 64 2d 34 66 30 36 2d |f7eda-e1ad-4f06-|
|00000020| 62 66 30 64 2d 63 38 31 32 61 66 36 66 65 37 61 |bf0d-c812af6fe7a|
|00000030| 63 22 2c 22 63 72 65 61 74 65 64 22 3a 31 36 32 |c","created":162|
|00000040| 37 35 33 34 36 37 34 2c 22 63 6f 75 72 73 65 4e |7534674,"courseN|
|00000050| 61 6d 65 22 3a 22 59 6f 75 72 20 63 6f 75 72 73 |ame":"Your cours|
|00000060| 65 20 6e 61 6d 65 3a 20 53 70 72 69 6e 67 22 7d |e name: Spring"}|
+--------+-------------------------------------------------+----------------+
{"courseId":"239f7eda-e1ad-4f06-bf0d-c812af6fe7ac","created":1627534674,"courseName":"Your course name: Spring"}

We have enabled the debug in the command to print the frame details. As you can see, the first frame send is SETUP and then REQUEST_RESPONSE with some metadata and the payload. Last, it receives the response from the endpoint. In listing 5, we’ve shown how to test request-response with the RSC client. Similarly, you can test other patterns also with RSC. Refer to https://rsocket.io/about/protocol for a detailed understanding of the frame and the protocol.

Next, we can also write the integration test cases to test the endpoint. Listing 6 shows the test case:

Listing 6. Integration test to verify the request-response route

package com.manning.sbip.ch08;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.rsocket.context.LocalRSocketServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;

import com.manning.sbip.ch08.model.Course;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@SpringBootTest
class SpringBootRsocketApplicationTests {

private static RSocketRequester requester;

# Set up the RsocketRequester instance. The RSocketRequester.Builder interface lets us create a requester by connecting to the server
@BeforeAll
public static void setUpOnce(@Autowired RSocketRequester.Builder builder, @LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies rSocketStrategies) {

requester = builder.tcp("localhost", port);
}

@Test
public void testRequestResponse() {
# Send a request
Mono<Course> courseMono = requester
.route("request-response")
.data(new Course("Spring"))
.retrieveMono(Course.class);

# Verify the response
StepVerifier.create(courseMono)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring"))
.verifyComplete();
}
}

In the testRequestResponse() we send a request to the route and validate the expected response. In the requester instance, we set the route path, the data and then retrieve the response. Since this request-response pattern, we expect a single response and it is captured in a Mono. We then use the StepVerifier to consume the response and assert the expected value from the response. Once the verification is done, we complete it with verifyComplete().

Let us now define the remaining test cases in the SpringBootRsocketApplicationTests class. Listing 7 shows the fire and forget endpoint:

Listing 7. Testing Fire and Forget Endpoint

@Test
public void testFireAndForget() {
# Send a request
Mono<Void> courseMono = requester
.route("fire-and-forget")
.data(new Course("Spring"))
.retrieveMono(Void.class);

# Verify the response
StepVerifier
.create(courseMono)
.verifyComplete();
}

Listing 8. shows the test case to test the Request Stream endpoint:

Listing 8. Testing the Request Stream Endpoint

@Test
public void testRequestStream() {
# Send a request and expect a stream of courses as Flux<Course>
Flux<Course> courseFlux = requester
.route("request-stream")
.data(new Course("Spring"))
.retrieveFlux(Course.class);

# Use StepVerifier to verify the response. We retrieve two courses from the stream and then cancel them to indicate we are not interested in further data from the stream.
StepVerifier.create(courseFlux)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #0"))
.expectNextCount(0)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #1"))
.thenCancel()
.verify();
}

Listing 9. shows the test case to test the channel endpoint:

Listing 9. Testing the Channel Endpoint

@Test
public void testChannel() {
# Create first setting after 0 seconds. Server starts sending after 2 # seconds.
Mono<Integer> setting1 = Mono.just(Integer.valueOf(2)).delayElement(Duration.ofSeconds(0));
# Create next setting after 3 seconds. Server starts sending in after 1
# second.
Mono<Integer> setting2 = Mono.just(Integer.valueOf(1)).delayElement(Duration.ofSeconds(3));

# Bundle settings into a Flux
Flux<Integer> settings = Flux.concat(setting1, setting2);

# Send a stream of request messages
Flux<Course> stream = requester.route("stream-stream").data(settings).retrieveFlux(Course.class);

# Verify that the response messages contain the expected data
StepVerifier
.create(stream)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
.thenCancel()
.verify();
}

Discussion

In this technique, we’ve demonstrated the use of the RSocket protocol in a Spring Boot application. We’ve seen the use of the spring-boot-starter-rsocket dependency that puts the necessary dependencies into the application.

The Spring Boot also provides several auto-configuration classes that configure the RSocket in a Spring Boot application. Figure 2 shows these classes:

Figure 2. Spring Boot RSocket auto-configuration classes

RsocketMessagingAutoConfiguration auto-configures the RsocketMessageHandler. This class handles RSocket requests for the methods defined with @ConnectMapping and @MessageMapping annotations.

RsocketRequesterAutoConfiguration auto-configures the RsocketRequester. This class provides a fluent API that can be used to accept and return input and output. It also provides methods to prepare routing and other metadata.

RsocketServerAutoConfiguration auto-configures the RSocket server. We’ve configured the spring.rsocket.server.port property to start the standalone RSocket server at port 7000.

The RsocketStrategiesAutoConfiguration auto-configures RsocketStrategies. This class defines the strategies for use by RSocket requester and responder components. Some of the strategies, for instance, are the decoder and encoder for the messages.

Last, the RsocketSecurityAutoConfiguration auto-configures Spring Security for an RSocket server. Securing the RSocket server with Spring Security is beyond the scope of this text. You can refer to the internet on this subject.

That’s all for this article. If you want to learn more about the book, check it out on Manning’s liveBook platform here.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Manning Publications

Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.