10 changed files with 302 additions and 9 deletions
@ -0,0 +1,38 @@
|
||||
package com.logpm.aftersales.config; |
||||
|
||||
import com.logpm.aftersales.launcher.OpcSessionHandler; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket; |
||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer; |
||||
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; |
||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter; |
||||
|
||||
import javax.annotation.Resource; |
||||
|
||||
/** |
||||
* 开启WebSocket支持 |
||||
* @author 12702 |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
@EnableWebSocket |
||||
@ConditionalOnProperty(name = "system.websocket.isOpen",havingValue = "true") |
||||
//@ConditionalOnProperty(name = "spring.profiles.active",havingValue = "dev")
|
||||
public class WebSocketConfig implements WebSocketConfigurer { |
||||
|
||||
@Resource |
||||
private OpcSessionHandler opcSessionHandler; |
||||
|
||||
// @Bean
|
||||
// public ServerEndpointExporter serverEndpointExporter() {
|
||||
// return new ServerEndpointExporter();
|
||||
// }
|
||||
|
||||
@Override |
||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { |
||||
registry.addHandler(opcSessionHandler, "/ws/automate").setAllowedOrigins("*"); |
||||
} |
||||
} |
@ -0,0 +1,78 @@
|
||||
package com.logpm.aftersales.launcher; |
||||
|
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.lang.NonNull; |
||||
import org.springframework.stereotype.Component; |
||||
import org.springframework.web.socket.*; |
||||
import java.io.IOException; |
||||
import java.util.concurrent.CopyOnWriteArraySet; |
||||
|
||||
/** |
||||
* @author tarzan |
||||
*/ |
||||
@Component |
||||
@Slf4j |
||||
public class OpcSessionHandler implements WebSocketHandler { |
||||
|
||||
|
||||
|
||||
private static final CopyOnWriteArraySet<WebSocketSession> SESSIONS=new CopyOnWriteArraySet<>(); |
||||
|
||||
|
||||
private final static String ALERT="alert"; |
||||
private final static String CONTROL="control"; |
||||
|
||||
@Override |
||||
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception { |
||||
SESSIONS.add(session); |
||||
log.info(session.getId()+" OpcSessionHandler当前在线人数:"+SESSIONS.size()); |
||||
} |
||||
|
||||
@Override |
||||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { |
||||
String msg = message.getPayload().toString(); |
||||
log.info("接收消息"+session.getId()+":"+msg); |
||||
|
||||
} |
||||
|
||||
|
||||
|
||||
@Override |
||||
public void handleTransportError(WebSocketSession session,@NonNull Throwable exception) throws Exception { |
||||
log.error("OpcSessionHandler连接出错"+session.getId()); |
||||
SESSIONS.remove(session); |
||||
if (!session.isOpen()) { |
||||
session.close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void afterConnectionClosed(WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception { |
||||
log.info("OpcSessionHandler关闭连接"+session.getId()); |
||||
SESSIONS.remove(session); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public boolean supportsPartialMessages() { |
||||
return false; |
||||
} |
||||
|
||||
|
||||
public void sendMessageToUser(WebSocketSession session, String contents) { |
||||
if (session != null && session.isOpen()) { |
||||
TextMessage message = new TextMessage(contents); |
||||
try { |
||||
session.sendMessage(message); |
||||
} catch (IOException e) { |
||||
log.error(e.getMessage()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public void sendMessageToAllUsers(String contents) { |
||||
SESSIONS.forEach(session->sendMessageToUser(session, contents)); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,20 @@
|
||||
package com.logpm.aftersales.resp; |
||||
|
||||
import io.swagger.annotations.ApiModel; |
||||
import io.swagger.annotations.ApiModelProperty; |
||||
import lombok.Data; |
||||
|
||||
/** |
||||
* @author 12702 |
||||
*/ |
||||
@Data |
||||
@ApiModel("ws通知返回对象") |
||||
public class NoticeWebsocketResp<T> { |
||||
|
||||
@ApiModelProperty(value = "通知类型") |
||||
private String noticeType; |
||||
|
||||
@ApiModelProperty(value = "通知内容") |
||||
private T noticeInfo; |
||||
|
||||
} |
@ -0,0 +1,105 @@
|
||||
package com.logpm.aftersales.service; |
||||
|
||||
|
||||
import com.alibaba.fastjson.JSONObject; |
||||
import com.logpm.aftersales.config.WebSocketConfig; |
||||
import lombok.EqualsAndHashCode; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.core.tool.utils.CollectionUtil; |
||||
import org.springblade.core.tool.utils.Func; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import javax.websocket.*; |
||||
import javax.websocket.server.ServerEndpoint; |
||||
import java.util.List; |
||||
import java.util.concurrent.CopyOnWriteArraySet; |
||||
|
||||
/** |
||||
* @author tarzan |
||||
*/ |
||||
@ConditionalOnClass(value = WebSocketConfig.class) |
||||
@ServerEndpoint("/ws/automate") |
||||
@Slf4j |
||||
@EqualsAndHashCode |
||||
public class WebSocketServer { |
||||
|
||||
/** 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 */ |
||||
private static final CopyOnWriteArraySet<WebSocketServer> WEBSOCKET_CLIENTS =new CopyOnWriteArraySet<>(); |
||||
/** 当前session */ |
||||
private Session session; |
||||
/** 当前session订阅的话题 */ |
||||
private List<String> topics; |
||||
|
||||
public WebSocketServer() { |
||||
} |
||||
|
||||
@OnOpen |
||||
public void onOpen(Session session) { |
||||
this.session = session; |
||||
WEBSOCKET_CLIENTS.add(this); |
||||
log.info("WebSocketServer有新客户端连接加入:{},当前在线人数为:{}", session.getId(),WEBSOCKET_CLIENTS.size()); |
||||
} |
||||
|
||||
@OnClose |
||||
public void onClose() { |
||||
WEBSOCKET_CLIENTS.remove(this); |
||||
log.info("有一连接关闭:{},当前在线人数为:{}", this.session.getId(), WEBSOCKET_CLIENTS.size()); |
||||
} |
||||
|
||||
@OnMessage |
||||
public void onMessage(String message, Session session) { |
||||
this.topics= Func.toStrList(message); |
||||
log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); |
||||
} |
||||
|
||||
@OnError |
||||
public void onError(Session session, Throwable error) { |
||||
log.error("发生错误:"+error.getMessage()); |
||||
WEBSOCKET_CLIENTS.remove(this); |
||||
log.info("有一连接异常:{},当前在线人数为:{}", session.getId(), WEBSOCKET_CLIENTS.size()); |
||||
} |
||||
|
||||
public <T> void sendData(T data, String topicName) { |
||||
JSONObject result=new JSONObject(); |
||||
result.put("topic",topicName); |
||||
result.put("data",data); |
||||
//显示值为null的字段
|
||||
// String jsonString = JSON.toJSONString(result, SerializerFeature.WriteMapNullValue);
|
||||
sendMessage(result.toString(),topicName); |
||||
} |
||||
|
||||
/** |
||||
* 发送消息广播 |
||||
* |
||||
* @param message 消息文本 |
||||
* @author tarzan |
||||
* @date 2022年12月08日 09:24:34 |
||||
*/ |
||||
public void sendMessage(String message,String topic){ |
||||
try { |
||||
WEBSOCKET_CLIENTS.forEach(client ->{ |
||||
if (client.session.isOpen()&&CollectionUtil.isNotEmpty(client.topics)&&client.topics.contains(topic)) { |
||||
client.session.getAsyncRemote().sendText(message); |
||||
} |
||||
}); |
||||
} catch (Exception e) { |
||||
log.error(e.getMessage()); |
||||
} |
||||
} |
||||
|
||||
public static void sendMessageToAllUsers(String message){ |
||||
try { |
||||
WEBSOCKET_CLIENTS.forEach(client -> { |
||||
if (client.session.isOpen()) { |
||||
client.session.getAsyncRemote().sendText(message); |
||||
} |
||||
}); |
||||
} catch (Exception e) { |
||||
log.error(e.getMessage()); |
||||
} |
||||
} |
||||
|
||||
|
||||
} |
Loading…
Reference in new issue