概述
最近在做的iot平台,对接的设备算力有限,所以需要延时下发消息。并且,延时的时间不同的设备可能不一样,所以可以由设备动态对通道进行配置。加了个延时配置。
代码
这是远程下发的调用
/**
* 默认发送间隔(毫秒),用于保护低算力客户端。
*/
private static final long DEFAULT_SEND_INTERVAL_MS = 100L;
private final ScheduledExecutorService dispatcher = Executors.newScheduledThreadPool(4);
private final Map<String, ClientSender> senderMap = new ConcurrentHashMap<>();
private final Map<String, Long> clientDelayMap = new ConcurrentHashMap<>();
@PostMapping("/remote/ws-hub/msg")
public void sendMsg(@RequestParam("clientId") String clientId, @RequestBody byte[] msg) {
// 进入队列,后台异步按顺序、固定间隔发送
ClientSender sender = senderMap.computeIfAbsent(clientId, k -> new ClientSender());
sender.queue.add(msg.clone()); // 防止调用方复用/修改数组
startWorker(clientId, sender);
}
消息来了就开始run,不会阻塞线程,所以不会一直消耗资源
private void startWorker(String clientId, ClientSender sender) {
// 保证同一 client 只有一个 worker 在跑
if (!sender.running.compareAndSet(false, true)) {
return;
}
dispatcher.execute(() -> processQueue(clientId, sender));
}
因为前面已经到集合里了
处理消息
private void processQueue(String clientId, ClientSender sender) {
try {
while (true) {
byte[] data = sender.queue.poll();
if (data == null) {
break;
}
WebSocketSession session = AuthHandler.sessionHashMap.get(clientId);
if (session != null && session.isOpen()) {
session.sendMessage(new BinaryMessage(data));
} else {
log.warn("session not found or closed for client {}", clientId);
}
// 间隔发送,保护低算力客户端;每次读取最新配置,动态生效
TimeUnit.MILLISECONDS.sleep(getDelayForClient(clientId));
}
} catch (Exception e) {
log.error("send msg error {}", clientId, e);
} finally {
sender.running.set(false);
// 若有新消息在退出前入队,重新启动 worker
if (!sender.queue.isEmpty()) {
startWorker(clientId, sender);
}
}
}
客户端的获取
private long getDelayForClient(String clientId) {
return clientDelayMap.getOrDefault(clientId, DEFAULT_SEND_INTERVAL_MS);
}
/**
* 按 clientId 维护独立队列与执行状态,保证有序发送。
*/
private static class ClientSender {
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean running = new AtomicBoolean(false);
}
如果客户端要更新的时候
/**
* 业务侧更新通道属性时调用,按设备(clientId)动态调整下发间隔。
*/
@PostMapping("/remote/ws-hub/msg/delay")
public void updateDelay(@RequestParam("clientId") String clientId,
@RequestParam("delayMs") long delayMs) {
long safeDelay = Math.max(0, delayMs); // 不允许负值
clientDelayMap.put(clientId, safeDelay);
log.info("updated send delay for client {} to {} ms", clientId, safeDelay);
}
