Skip to content

Commit c49ebbf

Browse files
committed
Clean up example shutdown behavior
Signed-off-by: jeroen.veltman <jeroen.veltman@nextend.nl>
1 parent fa95373 commit c49ebbf

30 files changed

Lines changed: 782 additions & 472 deletions

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/channel/ChannelEchoClient.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.rsocket.core.RSocketConnector;
2323
import io.rsocket.core.RSocketServer;
2424
import io.rsocket.examples.transport.h3.Http3TransportFactory;
25+
import io.rsocket.transport.netty.server.CloseableChannel;
2526
import io.rsocket.util.DefaultPayload;
2627
import java.time.Duration;
2728
import org.slf4j.Logger;
@@ -31,6 +32,7 @@
3132
public final class ChannelEchoClient {
3233

3334
private static final Logger logger = LoggerFactory.getLogger(ChannelEchoClient.class);
35+
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(5);
3436

3537
public static void main(String[] args) {
3638

@@ -42,19 +44,26 @@ public static void main(String[] args) {
4244
.map(s -> "Echo: " + s)
4345
.map(DefaultPayload::create));
4446

45-
RSocketServer.create(echoAcceptor).bindNow(Http3TransportFactory.server("localhost", 7000));
47+
CloseableChannel server =
48+
RSocketServer.create(echoAcceptor).bindNow(Http3TransportFactory.server("localhost", 7000));
4649

4750
RSocket socket =
4851
RSocketConnector.connectWith(Http3TransportFactory.client("localhost", 7000)).block();
4952

50-
socket
51-
.requestChannel(
52-
Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello")))
53-
.map(Payload::getDataUtf8)
54-
.doOnNext(logger::debug)
55-
.take(10)
56-
.doFinally(signalType -> socket.dispose())
57-
.then()
58-
.block();
53+
try {
54+
socket
55+
.requestChannel(
56+
Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello")))
57+
.map(Payload::getDataUtf8)
58+
.doOnNext(logger::debug)
59+
.take(10)
60+
.then()
61+
.block();
62+
} finally {
63+
socket.dispose();
64+
socket.onClose().block(CLOSE_TIMEOUT);
65+
server.dispose();
66+
server.onClose().block(CLOSE_TIMEOUT);
67+
}
5968
}
6069
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/fnf/TaskProcessingWithServerSideNotificationsExample.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.rsocket.core.RSocketConnector;
2323
import io.rsocket.core.RSocketServer;
2424
import io.rsocket.examples.transport.h3.Http3TransportFactory;
25+
import io.rsocket.examples.transport.support.ExampleLifecycle;
26+
import io.rsocket.transport.netty.server.CloseableChannel;
2527
import io.rsocket.util.DefaultPayload;
2628
import java.time.Duration;
2729
import java.util.concurrent.BlockingQueue;
@@ -54,8 +56,9 @@ public static void main(String[] args) throws InterruptedException {
5456
BackgroundWorker backgroundWorker =
5557
new BackgroundWorker(tasksProcessor.asFlux(), idToCompletedTasksMap, idToRSocketMap);
5658

57-
RSocketServer.create(new TasksAcceptor(tasksProcessor, idToCompletedTasksMap, idToRSocketMap))
58-
.bindNow(Http3TransportFactory.server(9991));
59+
CloseableChannel server =
60+
RSocketServer.create(new TasksAcceptor(tasksProcessor, idToCompletedTasksMap, idToRSocketMap))
61+
.bindNow(Http3TransportFactory.server(9991));
5962

6063
Logger logger = LoggerFactory.getLogger("RSocket.Client.ID[Test]");
6164

@@ -71,24 +74,34 @@ public static void main(String[] args) throws InterruptedException {
7174
}))
7275
.connect(Http3TransportFactory.client(9991));
7376

74-
RSocket rSocketRequester1 = rSocketMono.block();
77+
RSocket rSocketRequester1 = null;
78+
RSocket rSocketRequester2 = null;
79+
try {
80+
rSocketRequester1 = rSocketMono.block();
7581

76-
for (int i = 0; i < 10; i++) {
77-
rSocketRequester1.fireAndForget(DefaultPayload.create("task" + i)).block();
78-
}
82+
for (int i = 0; i < 10; i++) {
83+
rSocketRequester1.fireAndForget(DefaultPayload.create("task" + i)).block();
84+
}
7985

80-
Thread.sleep(4000);
86+
Thread.sleep(4000);
8187

82-
rSocketRequester1.dispose();
83-
logger.info("Disposed");
88+
ExampleLifecycle.close(rSocketRequester1);
89+
logger.info("Disposed");
8490

85-
Thread.sleep(4000);
91+
Thread.sleep(4000);
8692

87-
RSocket rSocketRequester2 = rSocketMono.block();
93+
rSocketRequester2 = rSocketMono.block();
8894

89-
logger.info("Reconnected");
95+
logger.info("Reconnected");
9096

91-
Thread.sleep(10000);
97+
Thread.sleep(10000);
98+
} finally {
99+
ExampleLifecycle.close(rSocketRequester2);
100+
ExampleLifecycle.close(rSocketRequester1);
101+
tasksProcessor.tryEmitComplete();
102+
backgroundWorker.cancel();
103+
ExampleLifecycle.close(server);
104+
}
92105
}
93106

94107
static class BackgroundWorker extends BaseSubscriber<Task> {
@@ -103,7 +116,6 @@ static class BackgroundWorker extends BaseSubscriber<Task> {
103116
this.idToCompletedTasksMap = idToCompletedTasksMap;
104117
this.idToRSocketMap = idToRSocketMap;
105118

106-
// mimic a long running task processing
107119
taskProducer
108120
.concatMap(
109121
t ->

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/loadbalancer/RoundRobinRSocketLoadbalancerExample.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.rsocket.loadbalance.LoadbalanceRSocketClient;
2222
import io.rsocket.loadbalance.LoadbalanceTarget;
2323
import io.rsocket.examples.transport.h3.Http3TransportFactory;
24+
import io.rsocket.examples.transport.support.ExampleLifecycle;
2425
import io.rsocket.transport.netty.server.CloseableChannel;
2526
import io.rsocket.util.DefaultPayload;
2627
import java.time.Duration;
@@ -98,12 +99,19 @@ public static void main(String[] args) {
9899
RSocketClient rSocketClient =
99100
LoadbalanceRSocketClient.builder(producer).roundRobinLoadbalanceStrategy().build();
100101

101-
for (int i = 0; i < 10000; i++) {
102-
try {
103-
rSocketClient.requestResponse(Mono.just(DefaultPayload.create("test" + i))).block();
104-
} catch (Throwable t) {
105-
// no ops
102+
try {
103+
for (int i = 0; i < 10000; i++) {
104+
try {
105+
rSocketClient.requestResponse(Mono.just(DefaultPayload.create("test" + i))).block();
106+
} catch (Throwable t) {
107+
// no ops
108+
}
106109
}
110+
} finally {
111+
ExampleLifecycle.close(rSocketClient);
112+
ExampleLifecycle.close(server1);
113+
ExampleLifecycle.close(server2);
114+
ExampleLifecycle.close(server3);
107115
}
108116
}
109117
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/metadata/routing/CompositeMetadataExample.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.rsocket.metadata.TaggingMetadataCodec;
3131
import io.rsocket.metadata.WellKnownMimeType;
3232
import io.rsocket.examples.transport.h3.Http3TransportFactory;
33+
import io.rsocket.examples.transport.support.ExampleLifecycle;
34+
import io.rsocket.transport.netty.server.CloseableChannel;
3335
import io.rsocket.util.ByteBufPayload;
3436
import java.util.Collections;
3537
import java.util.Objects;
@@ -41,22 +43,24 @@ public class CompositeMetadataExample {
4143
static final Logger logger = LoggerFactory.getLogger(CompositeMetadataExample.class);
4244

4345
public static void main(String[] args) {
44-
RSocketServer.create(
45-
SocketAcceptor.forRequestResponse(
46-
payload -> {
47-
final String route = decodeRoute(payload.sliceMetadata());
46+
CloseableChannel server =
47+
RSocketServer.create(
48+
SocketAcceptor.forRequestResponse(
49+
payload -> {
50+
final String route = decodeRoute(payload.sliceMetadata());
4851

49-
logger.info("Received RequestResponse[route={}]", route);
52+
logger.info("Received RequestResponse[route={}]", route);
5053

51-
payload.release();
54+
payload.release();
5255

53-
if ("my.test.route".equals(route)) {
54-
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
55-
}
56+
if ("my.test.route".equals(route)) {
57+
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
58+
}
5659

57-
return Mono.error(new IllegalArgumentException("Route " + route + " not found"));
58-
}))
59-
.bindNow(Http3TransportFactory.server("localhost", 7000));
60+
return Mono.error(
61+
new IllegalArgumentException("Route " + route + " not found"));
62+
}))
63+
.bindNow(Http3TransportFactory.server("localhost", 7000));
6064

6165
RSocket socket =
6266
RSocketConnector.create()
@@ -78,12 +82,18 @@ public static void main(String[] args) {
7882
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
7983
routeMetadata);
8084

81-
socket
82-
.requestResponse(
83-
ByteBufPayload.create(
84-
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), compositeMetadata))
85-
.log()
86-
.block();
85+
try {
86+
socket
87+
.requestResponse(
88+
ByteBufPayload.create(
89+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"),
90+
compositeMetadata))
91+
.log()
92+
.block();
93+
} finally {
94+
ExampleLifecycle.close(socket);
95+
ExampleLifecycle.close(server);
96+
}
8797
}
8898

8999
static String decodeRoute(ByteBuf metadata) {

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/metadata/routing/RoutingMetadataExample.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.rsocket.metadata.TaggingMetadataCodec;
2828
import io.rsocket.metadata.WellKnownMimeType;
2929
import io.rsocket.examples.transport.h3.Http3TransportFactory;
30+
import io.rsocket.examples.transport.support.ExampleLifecycle;
31+
import io.rsocket.transport.netty.server.CloseableChannel;
3032
import io.rsocket.util.ByteBufPayload;
3133
import java.util.Collections;
3234
import org.slf4j.Logger;
@@ -37,22 +39,24 @@ public class RoutingMetadataExample {
3739
static final Logger logger = LoggerFactory.getLogger(RoutingMetadataExample.class);
3840

3941
public static void main(String[] args) {
40-
RSocketServer.create(
41-
SocketAcceptor.forRequestResponse(
42-
payload -> {
43-
final String route = decodeRoute(payload.sliceMetadata());
42+
CloseableChannel server =
43+
RSocketServer.create(
44+
SocketAcceptor.forRequestResponse(
45+
payload -> {
46+
final String route = decodeRoute(payload.sliceMetadata());
4447

45-
logger.info("Received RequestResponse[route={}]", route);
48+
logger.info("Received RequestResponse[route={}]", route);
4649

47-
payload.release();
50+
payload.release();
4851

49-
if ("my.test.route".equals(route)) {
50-
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
51-
}
52+
if ("my.test.route".equals(route)) {
53+
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
54+
}
5255

53-
return Mono.error(new IllegalArgumentException("Route " + route + " not found"));
54-
}))
55-
.bindNow(Http3TransportFactory.server("localhost", 7000));
56+
return Mono.error(
57+
new IllegalArgumentException("Route " + route + " not found"));
58+
}))
59+
.bindNow(Http3TransportFactory.server("localhost", 7000));
5660

5761
RSocket socket =
5862
RSocketConnector.create()
@@ -66,12 +70,17 @@ public static void main(String[] args) {
6670
final ByteBuf routeMetadata =
6771
TaggingMetadataCodec.createTaggingContent(
6872
ByteBufAllocator.DEFAULT, Collections.singletonList("my.test.route"));
69-
socket
70-
.requestResponse(
71-
ByteBufPayload.create(
72-
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), routeMetadata))
73-
.log()
74-
.block();
73+
try {
74+
socket
75+
.requestResponse(
76+
ByteBufPayload.create(
77+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), routeMetadata))
78+
.log()
79+
.block();
80+
} finally {
81+
ExampleLifecycle.close(socket);
82+
ExampleLifecycle.close(server);
83+
}
7584
}
7685

7786
static String decodeRoute(ByteBuf metadata) {

0 commit comments

Comments
 (0)