Spring WebFlux WebSocket 推送
关于websocket的资料一大堆,但是关于webflux websocket的有用的太少了,由于项目中需要使用,花了几天时间终于搞通了,今天把主要关于推送的部分总结出来,便于以后参考 1. 服务端 增加maven依赖
<properties>
<java.version>1.8</java.version>
<spring.version>5.2.3.RELEASE</spring.version>
<reactor-netty.version>0.9.2.RELEASE</reactor-netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor-netty.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
创建annotation (可以不这样实现,只是这样实现维护更容易些)
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WebSocketMapping {
String value() default "/ws";
}
定义映射
public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {
private final Map<String, WebSocketHandler> webSocketHandlerMap = new ConcurrentHashMap<>();
/**
* 将使用WebSocketMapping自定义注解的bean放入handlerMapping
*
* @throws BeansException 异常
*/
@Override
public void initApplicationContext() throws BeansException {
Map<String, Object> beanMap = obtainApplicationContext().getBeansWithAnnotation(WebSocketMapping.class);
beanMap.values().forEach(bean -> {
//使用WebSocketMapping注解的类 不是WebSocketHandler接口的实现,抛出异常
if (!(bean instanceof WebSocketHandler)) {
throw new RuntimeException(String.format("Controller [%s] doesn't implement WebSocketHandler interface.",
bean.getClass().getName()));
}
WebSocketMapping annotation = AnnotationUtils.getAnnotation(bean.getClass(), WebSocketMapping.class);
//webSocketMapping 映射到管理中
webSocketHandlerMap.put(annotation.value(), (WebSocketHandler) bean);
});
super.setOrder(Ordered.HIGHEST_PRECEDENCE);
super.setUrlMap(webSocketHandlerMap);
super.initApplicationContext();
}
}
设置mapping和adapter
@Configuration
public class WebSocketConfiguration {
@Bean
public HandlerMapping webSocketMapping() {
return new WebSocketHandlerMapping();
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
至此配置相关的就好了 写handler
@Component
@WebSocketMapping("/hello-ws")
@Slf4j
public class HelloWordWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
return session.receive().doOnSubscribe(s -> {
log.info("发起连接:{}", s);
//可以将 emitterProcessor 放入一个给客户端发送消息的类中
send(session,emitterProcessor);
session.send(emitterProcessor.map(session::textMessage)).toProcessor();
}).doOnTerminate(() -> {
//连接中断,关闭session等信息
log.info("doOnTerminate");
}).doOnComplete(() -> {
//处理完成
log.info("doOnComplete");
}).doOnCancel(() -> {
log.info("doOnCancel");
}).doOnNext(message -> {
//持续接收消息处理
if (message.getType().equals(WebSocketMessage.Type.BINARY)) {
log.info("收到二进制消息");
} else if (message.getType().equals(WebSocketMessage.Type.TEXT)) {
String content = message.getPayloadAsText();
log.info("收到文本消息 :{}", content);
} else if (message.getType().equals(WebSocketMessage.Type.PING)) {
log.info("收到ping消息");
} else if (message.getType().equals(WebSocketMessage.Type.PONG)) {
log.info("收到pong消息");
}
}).doOnError(e -> log.error("doOnError", e)).doOnRequest(r -> log.info("doOnRequest,{}", r)).then();
}
}).doOnError(e -> log.error("doOnError", e)).doOnRequest(r -> log.info("doOnRequest,{}", r)).then();
}
private void send(WebSocketSession session,EmitterProcessor<String> emitterProcessor) {
//只做演示使用,不要这样写
try {
for (int i=0; i<10; i++){
log.info("发送消息给客户端: {} ",i);
emitterProcessor.onNext(String.format("给客户端%s 推送消息 %d",session.getId(),i));
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
log.error("InterruptedException",e);
}
}
}
2.客户端
public class WSClient {
public static void main(final String[] args) {
Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
.delayElements(Duration.ofSeconds(7));
EmitterProcessor<String> output = EmitterProcessor.create();
final WebSocketClient client = new ReactorNettyWebSocketClient();
Mono<Void> sessionMono = client.execute(URI.create("ws://localhost:8005/hello-ws") , session ->
session.send(input.map(session::textMessage))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
.then());
output.doOnSubscribe(s-> sessionMono.subscribe()).doOnNext(System.out::println).blockLast(Duration.ofSeconds(20));
}
}