当前位置:首页 > Spring > 正文内容

Spring WebFlux WebSocket 推送

淙嶙5年前 (2020-11-16)Spring2700

关于websocket的资料一大堆,但是关于webflux websocket的有用的太少了,由于项目中需要使用,花了几天时间终于搞通了,今天把主要关于推送的部分总结出来,便于以后参考 1. 服务端 增加maven依赖

<properties>
        <java.version>1.8</java.version>
        <spring.version>5.2.3.RELEASE</spring.version>
        <reactor-netty.version>0.9.2.RELEASE</reactor-netty.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-reactor-netty</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>${reactor-netty.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

创建annotation (可以不这样实现,只是这样实现维护更容易些)

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WebSocketMapping {
    String value() default "/ws";
}

定义映射

public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {
    private final Map<String, WebSocketHandler> webSocketHandlerMap = new ConcurrentHashMap<>();

    /**
     * 将使用WebSocketMapping自定义注解的bean放入handlerMapping
     *
     * @throws BeansException 异常
     */
    @Override
    public void initApplicationContext() throws BeansException {
        Map<String, Object> beanMap = obtainApplicationContext().getBeansWithAnnotation(WebSocketMapping.class);
        beanMap.values().forEach(bean -> {
            //使用WebSocketMapping注解的类 不是WebSocketHandler接口的实现,抛出异常
            if (!(bean instanceof WebSocketHandler)) {
                throw new RuntimeException(String.format("Controller [%s] doesn't implement WebSocketHandler interface.",
                        bean.getClass().getName()));
            }
            WebSocketMapping annotation = AnnotationUtils.getAnnotation(bean.getClass(), WebSocketMapping.class);
            //webSocketMapping 映射到管理中
            webSocketHandlerMap.put(annotation.value(), (WebSocketHandler) bean);
        });
        super.setOrder(Ordered.HIGHEST_PRECEDENCE);
        super.setUrlMap(webSocketHandlerMap);
        super.initApplicationContext();
    }
}

设置mapping和adapter

@Configuration
public class WebSocketConfiguration {

    @Bean
    public HandlerMapping webSocketMapping() {
        return new WebSocketHandlerMapping();
    }

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

}

至此配置相关的就好了 写handler

@Component
@WebSocketMapping("/hello-ws")
@Slf4j
public class HelloWordWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
        return session.receive().doOnSubscribe(s -> {
            log.info("发起连接:{}", s);
            //可以将 emitterProcessor 放入一个给客户端发送消息的类中
            send(session,emitterProcessor);
            session.send(emitterProcessor.map(session::textMessage)).toProcessor();
        }).doOnTerminate(() -> {
            //连接中断,关闭session等信息
            log.info("doOnTerminate");
        }).doOnComplete(() -> {
            //处理完成
            log.info("doOnComplete");
        }).doOnCancel(() -> {
            log.info("doOnCancel");
        }).doOnNext(message -> {
           //持续接收消息处理
           if (message.getType().equals(WebSocketMessage.Type.BINARY)) {
           log.info("收到二进制消息");
           } else if (message.getType().equals(WebSocketMessage.Type.TEXT)) {
           String content = message.getPayloadAsText();
           log.info("收到文本消息 :{}", content);
           } else if (message.getType().equals(WebSocketMessage.Type.PING)) {
           log.info("收到ping消息");
           } else if (message.getType().equals(WebSocketMessage.Type.PONG)) {
           log.info("收到pong消息");
           }
           }).doOnError(e -> log.error("doOnError", e)).doOnRequest(r -> log.info("doOnRequest,{}", r)).then();
           }
        }).doOnError(e -> log.error("doOnError", e)).doOnRequest(r -> log.info("doOnRequest,{}", r)).then();
    }


    private void send(WebSocketSession session,EmitterProcessor<String> emitterProcessor)  {
        //只做演示使用,不要这样写
        try {
            for (int i=0; i<10; i++){
                log.info("发送消息给客户端: {} ",i);
                emitterProcessor.onNext(String.format("给客户端%s 推送消息 %d",session.getId(),i));
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
            log.error("InterruptedException",e);
        }
    }
}

2.客户端

public class WSClient {
    public static void main(final String[] args) {
        Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
                .delayElements(Duration.ofSeconds(7));
        EmitterProcessor<String> output = EmitterProcessor.create();
        final WebSocketClient client = new ReactorNettyWebSocketClient();
        Mono<Void> sessionMono = client.execute(URI.create("ws://localhost:8005/hello-ws") , session ->
                session.send(input.map(session::textMessage))
                        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
                        .then());
        output.doOnSubscribe(s-> sessionMono.subscribe()).doOnNext(System.out::println).blockLast(Duration.ofSeconds(20));
    }
}

相关文章

SpringMVC通过url请求到Controller的过程

SpringMVC通过url请求到Controller的过程

>org.springframework.web.servlet.FrameworkServlet#doGet >org.springframework.web.servlet.Fr...

什么是Web Hook

什么是Web Hook

什么是web hook?webhook是一个api概念,是为服务API的使用范式之一,也被成为反向api,是一种通过通常的callback,去增加或者改变web page或者web app行为的方法....

spring 源码解读

spring 源码解读

>org.springframework.web.context.ContextLoaderListener#contextInitialized >org.springframew...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。