ARTICLE

Introduction to RSocket

From Spring Boot in Practice by Somnath Musib

Figure 1. Communication models in RSocket protocol. In the Fire-and-Forget pattern, a client sends one message and expect no response from the server. In the Request-Response pattern, the client sends one message and receives one back from the server. In the Request-Stream pattern, a client sends one message and expects a stream of messages in response from the server. In the Channel pattern, the client and server send streams of messages to each other
  • Reactive Streams semantics for streaming requests interactions Request-Stream and Channel. Support for backpressure signal between requester and responder. This allows a requester to slow down a responder at the source. Thus, reduces reliance on network layer congestion control and network-level buffering
  • Support for Request throttling to reduce the number of possible messages. This can be done after sending a LEASE frame to limit the total number of requests allowed by other ends for a given time
  • Fragmentation and re-assembly of large messages
  • Keepalive through heartbeat messages

TECHNIQUE: Developing applications using RSocket and Spring Boot

Problem

Source Code

If you want to skip to the end and see the final version of the Spring Boot project, it is available at https://github.com/spring-boot-in-practice/repo/tree/main/ch08/spring-boot-rsocket

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.manning.sbip.ch08</groupId>
<artifactId>spring-boot-rsocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-rsocket</name>
<description>Spring Boot RSocket</description>
<properties>
<java.version>16</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>
spring.rsocket.server.port=7000
spring.main.lazy-initialization=true
package com.manning.sbip.ch08.model;

import java.time.Instant;
import java.util.UUID;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class Course {

private UUID courseId = UUID.randomUUID();
private long created = Instant.now().getEpochSecond();
private String courseName;

public Course(String courseName) {
this.courseName = courseName;
}
}
package com.manning.sbip.ch08.controller;

import java.time.Duration;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import com.manning.sbip.ch08.model.Course;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@Controller
public class CourseController {

# Implements the request-response interaction pattern. The user is expected to supply a course and this endpoint echos it back to the caller
@MessageMapping("request-response")
public Mono<Course> requestResponse(final Course course) {
log.info("Received request-response course details {} ", course);
return Mono.just(new Course("Your course name: " + course.getCourseName()));
}

# Implements the fire-forget interaction pattern. The user is expected to supply a course and expects nothing. Thus, we are returning an empty Mono.
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndForget(final Course course) {
log.info("Received fire-and-forget course details {} ", course);
return Mono.empty();
}

# Implements the request-stream interaction pattern. The user is expected to supply a course and this endpoint returns a stream of course with modified course name in an interval of one second
@MessageMapping("request-stream")
public Flux<Course> requestStream(final Course course) {
log.info("Received request-stream course details {} ", course);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Course("Your course name: " + course.getCourseName() + ". Response #" + index))
.log();
}

# Implements the channel interaction pattern. The user is expected to supply a stream and this endpoint returns a stream of course with a modified course name in an interval configured by the user. The user can specify the interval by invoking the delayElements() method in the source Flux. Recall that in channel interaction patterns, both sides can send a stream of data.
@MessageMapping("stream-stream")
public Flux<Course> channel(final Flux<Integer> settings) {
log.info("Received stream-stream (channel) request... ");

return settings
.doOnNext(setting -> log.info("Requested interval is {} seconds", setting))
.doOnCancel(() -> log.warn("Client cancelled the channel"))
.switchMap(setting -> Flux.interval(Duration.ofSeconds(setting)).map(index -> new Course("Spring. Response #"+index)))
.log();
}
}
C:\Users\musib>rsc --debug --request --data "{\"courseName\":\"Spring\"}" --route request-response --stacktrace tcp://localhost:7000
2021-07-29 10:27:54.597 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2021-07-29 10:27:54.607 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 53
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 |ponse |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 63 6f 75 72 73 65 4e 61 6d 65 22 3a 22 53 |{"courseName":"S|
|00000010| 70 72 69 6e 67 22 7d |pring"} |
+--------+-------------------------------------------------+----------------+
2021-07-29 10:27:54.768 DEBUG 17700 --- [actor-tcp-nio-2] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 118
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 63 6f 75 72 73 65 49 64 22 3a 22 32 33 39 |{"courseId":"239|
|00000010| 66 37 65 64 61 2d 65 31 61 64 2d 34 66 30 36 2d |f7eda-e1ad-4f06-|
|00000020| 62 66 30 64 2d 63 38 31 32 61 66 36 66 65 37 61 |bf0d-c812af6fe7a|
|00000030| 63 22 2c 22 63 72 65 61 74 65 64 22 3a 31 36 32 |c","created":162|
|00000040| 37 35 33 34 36 37 34 2c 22 63 6f 75 72 73 65 4e |7534674,"courseN|
|00000050| 61 6d 65 22 3a 22 59 6f 75 72 20 63 6f 75 72 73 |ame":"Your cours|
|00000060| 65 20 6e 61 6d 65 3a 20 53 70 72 69 6e 67 22 7d |e name: Spring"}|
+--------+-------------------------------------------------+----------------+
{"courseId":"239f7eda-e1ad-4f06-bf0d-c812af6fe7ac","created":1627534674,"courseName":"Your course name: Spring"}
package com.manning.sbip.ch08;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.rsocket.context.LocalRSocketServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;

import com.manning.sbip.ch08.model.Course;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@SpringBootTest
class SpringBootRsocketApplicationTests {

private static RSocketRequester requester;

# Set up the RsocketRequester instance. The RSocketRequester.Builder interface lets us create a requester by connecting to the server
@BeforeAll
public static void setUpOnce(@Autowired RSocketRequester.Builder builder, @LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies rSocketStrategies) {

requester = builder.tcp("localhost", port);
}

@Test
public void testRequestResponse() {
# Send a request
Mono<Course> courseMono = requester
.route("request-response")
.data(new Course("Spring"))
.retrieveMono(Course.class);

# Verify the response
StepVerifier.create(courseMono)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring"))
.verifyComplete();
}
}
@Test
public void testFireAndForget() {
# Send a request
Mono<Void> courseMono = requester
.route("fire-and-forget")
.data(new Course("Spring"))
.retrieveMono(Void.class);

# Verify the response
StepVerifier
.create(courseMono)
.verifyComplete();
}
@Test
public void testRequestStream() {
# Send a request and expect a stream of courses as Flux<Course>
Flux<Course> courseFlux = requester
.route("request-stream")
.data(new Course("Spring"))
.retrieveFlux(Course.class);

# Use StepVerifier to verify the response. We retrieve two courses from the stream and then cancel them to indicate we are not interested in further data from the stream.
StepVerifier.create(courseFlux)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #0"))
.expectNextCount(0)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Your course name: Spring. Response #1"))
.thenCancel()
.verify();
}
@Test
public void testChannel() {
# Create first setting after 0 seconds. Server starts sending after 2 # seconds.
Mono<Integer> setting1 = Mono.just(Integer.valueOf(2)).delayElement(Duration.ofSeconds(0));
# Create next setting after 3 seconds. Server starts sending in after 1
# second.
Mono<Integer> setting2 = Mono.just(Integer.valueOf(1)).delayElement(Duration.ofSeconds(3));

# Bundle settings into a Flux
Flux<Integer> settings = Flux.concat(setting1, setting2);

# Send a stream of request messages
Flux<Course> stream = requester.route("stream-stream").data(settings).retrieveFlux(Course.class);

# Verify that the response messages contain the expected data
StepVerifier
.create(stream)
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
.consumeNextWith(course -> assertThat(course.getCourseName()).isEqualTo("Spring. Response #0"))
.thenCancel()
.verify();
}

Discussion

In this technique, we’ve demonstrated the use of the RSocket protocol in a Spring Boot application. We’ve seen the use of the spring-boot-starter-rsocket dependency that puts the necessary dependencies into the application.

Figure 2. Spring Boot RSocket auto-configuration classes

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.