CodeL
以前端为翼,以 AI 为脑,向全栈而行
2026-04-01

WebSocket 和 SSE 实战教程

WebSocket 和 SSE 实战教程:Vue 3 实时通信全攻略 从零掌握两种实时通信技术,5个实战场景 + 完整代码,搞定实时聊天、推送通知、数据大屏! 一、核心概念 1.1 什么是实时通信? 传统网页是"你问我答...

WebSocket 和 SSE 实战教程:Vue 3 实时通信全攻略 #

从零掌握两种实时通信技术,5个实战场景 + 完整代码,搞定实时聊天、推送通知、数据大屏!


一、核心概念 #

1.1 什么是实时通信? #

传统网页是"你问我答"模式:用户点击按钮 → 发送请求 → 服务器返回结果。

但有些场景需要"服务器主动说话":

  • 📱 微信收到新消息,不用刷新页面
  • 📈 股票价格实时变动
  • 🔔 有新通知自动弹出
  • 🎮 多人在线游戏同步

这就是实时通信,常见技术有两种:WebSocketSSE

1.2 WebSocket vs SSE:选哪个? #

特性 WebSocket SSE (Server-Sent Events)
通信方向 双向(客户端 ↔ 服务器) 单向(服务器 → 客户端)
协议 ws:// 或 wss:// HTTP/HTTPS
数据格式 文本、二进制 仅文本(UTF-8)
断线重连 需手动实现 浏览器自动重连
兼容性 IE10+ IE 不支持(需 polyfill)
适用场景 聊天、游戏、协作编辑 推送通知、股票行情、日志流

一句话总结

  • 需要双向通信(客户端也要发消息)→ WebSocket
  • 只需要服务器推送(客户端只接收)→ SSE 更简单

1.3 核心名词解释 #

名词 解释 类比
WebSocket 全双工通信协议,一次握手,持久连接 电话通话(双方都能说话)
SSE 服务器向客户端推送事件流 收音机(只能听广播)
Handshake WebSocket 连接建立过程 拨号建立通话
Heartbeat 心跳检测,保持连接活跃 定期说"喂,还在吗?"
Reconnect 断线重连机制 信号不好自动重拨

二、基础使用 #

2.1 WebSocket 基础 #

原生 WebSocket API #

// 创建连接
const ws = new WebSocket('wss://example.com/ws')
 
// 连接成功
ws.onopen = () => {
  console.log('连接成功')
  ws.send('Hello Server!') // 发送消息
}
 
// 接收消息
ws.onmessage = (event) => {
  console.log('收到消息:', event.data)
}
 
// 连接关闭
ws.onclose = (event) => {
  console.log('连接关闭:', event.code, event.reason)
}
 
// 连接错误
ws.onerror = (error) => {
  console.error('连接错误:', error)
}
 
// 主动关闭连接
ws.close(1000, '正常关闭')

readyState 状态值 #

状态值 常量 说明
0 CONNECTING 连接中
1 OPEN 已连接,可通信
2 CLOSING 关闭中
3 CLOSED 已关闭

2.2 SSE 基础 #

原生 EventSource API #

// 创建连接(只能 GET 请求)
const eventSource = new EventSource('https://example.com/events')
 
// 连接成功
eventSource.onopen = () => {
  console.log('SSE 连接成功')
}
 
// 接收默认消息
eventSource.onmessage = (event) => {
  console.log('收到消息:', event.data)
}
 
// 监听自定义事件
eventSource.addEventListener('notification', (event) => {
  console.log('收到通知:', event.data)
})
 
// 监听股票更新事件
eventSource.addEventListener('stock', (event) => {
  const data = JSON.parse(event.data)
  console.log('股票价格:', data.price)
})
 
// 连接错误
eventSource.onerror = (error) => {
  console.error('SSE 错误:', error)
  // 浏览器会自动重连
}
 
// 关闭连接
eventSource.close()

SSE 服务器响应格式 #

服务器返回的 Content-Type 必须是 text/event-stream

Content-Type: text/event-stream
 
// 默认消息
data: Hello World
 
// 带事件名称
event: notification
data: {"title": "新消息", "content": "有人评论了你的文章"}
 
// 带 ID(用于断点续传)
id: 123
event: stock
data: {"symbol": "AAPL", "price": 178.50}
 
// 多行数据
data: 第一行
data: 第二行

三、进阶用法 #

3.1 WebSocket 重连机制 #

原生 WebSocket 没有自动重连,需要自己实现:

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url
    this.reconnectInterval = options.reconnectInterval || 1000
    this.maxReconnectInterval = options.maxReconnectInterval || 30000
    this.reconnectDecay = options.reconnectDecay || 1.5
    this.timeoutInterval = options.timeoutInterval || 5000
    this.maxReconnectAttempts = options.maxReconnectAttempts || 10
    
    this.ws = null
    this.reconnectAttempts = 0
    this.forcedClose = false
    this.timedOut = false
    
    // 事件回调
    this.onopen = null
    this.onmessage = null
    this.onclose = null
    this.onerror = null
    
    this.connect()
  }
  
  connect() {
    this.ws = new WebSocket(this.url)
    
    // 超时检测
    const timeout = setTimeout(() => {
      this.timedOut = true
      this.ws.close()
      this.timedOut = false
    }, this.timeoutInterval)
    
    this.ws.onopen = (event) => {
      clearTimeout(timeout)
      this.reconnectAttempts = 0
      this.onopen?.(event)
    }
    
    this.ws.onmessage = (event) => {
      this.onmessage?.(event)
    }
    
    this.ws.onclose = (event) => {
      clearTimeout(timeout)
      this.onclose?.(event)
      
      // 非主动关闭,尝试重连
      if (!this.forcedClose && this.reconnectAttempts < this.maxReconnectAttempts) {
        const interval = Math.min(
          this.reconnectInterval * Math.pow(this.reconnectDecay, this.reconnectAttempts),
          this.maxReconnectInterval
        )
        
        setTimeout(() => {
          this.reconnectAttempts++
          this.connect()
        }, interval)
      }
    }
    
    this.ws.onerror = (error) => {
      this.onerror?.(error)
    }
  }
  
  send(data) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(data)
    }
  }
  
  close() {
    this.forcedClose = true
    this.ws?.close()
  }
  
  get readyState() {
    return this.ws?.readyState ?? WebSocket.CLOSED
  }
}
 
// 使用示例
const ws = new ReconnectingWebSocket('wss://example.com/ws', {
  reconnectInterval: 1000,
  maxReconnectAttempts: 10
})
 
ws.onopen = () => console.log('连接成功')
ws.onmessage = (e) => console.log('消息:', e.data)
ws.onclose = () => console.log('连接关闭')

3.2 心跳检测 #

保持连接活跃,检测僵尸连接:

class WebSocketWithHeartbeat {
  constructor(url, options = {}) {
    this.url = url
    this.heartbeatInterval = options.heartbeatInterval || 30000 // 30秒
    this.heartbeatTimeout = options.heartbeatTimeout || 5000 // 5秒
    
    this.ws = null
    this.heartbeatTimer = null
    this.heartbeatTimeoutTimer = null
    
    this.connect()
  }
  
  connect() {
    this.ws = new WebSocket(this.url)
    
    this.ws.onopen = () => {
      console.log('连接成功')
      this.startHeartbeat()
    }
    
    this.ws.onmessage = (event) => {
      // 收到心跳响应,清除超时定时器
      if (event.data === 'pong') {
        this.clearHeartbeatTimeout()
        return
      }
      
      console.log('业务消息:', event.data)
    }
    
    this.ws.onclose = () => {
      this.stopHeartbeat()
      // 重连逻辑...
    }
  }
  
  startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        // 发送心跳
        this.ws.send('ping')
        
        // 设置超时检测
        this.heartbeatTimeoutTimer = setTimeout(() => {
          console.warn('心跳超时,断开重连')
          this.ws.close()
        }, this.heartbeatTimeout)
      }
    }, this.heartbeatInterval)
  }
  
  stopHeartbeat() {
    clearInterval(this.heartbeatTimer)
    this.clearHeartbeatTimeout()
  }
  
  clearHeartbeatTimeout() {
    clearTimeout(this.heartbeatTimeoutTimer)
    this.heartbeatTimeoutTimer = null
  }
}
 
// 使用示例
const ws = new WebSocketWithHeartbeat('wss://example.com/ws', {
  heartbeatInterval: 30000, // 30秒发一次心跳
  heartbeatTimeout: 5000   // 5秒没响应算超时
})

3.3 Vue 3 组合式 API 封装 #

封装成可复用的 composable:

// composables/useWebSocket.js
import { ref, onMounted, onUnmounted } from 'vue'
 
export function useWebSocket(url, options = {}) {
  const data = ref(null)
  const status = ref('connecting') // connecting | open | closed | error
  const error = ref(null)
  
  let ws = null
  let reconnectAttempts = 0
  let reconnectTimer = null
  
  const {
    reconnect = true,
    reconnectInterval = 1000,
    maxReconnectAttempts = 5,
    heartbeat = false,
    heartbeatInterval = 30000,
    onOpen,
    onMessage,
    onClose,
    onError
  } = options
  
  const connect = () => {
    status.value = 'connecting'
    ws = new WebSocket(url)
    
    ws.onopen = (event) => {
      status.value = 'open'
      error.value = null
      reconnectAttempts = 0
      onOpen?.(event)
      
      if (heartbeat) {
        startHeartbeat()
      }
    }
    
    ws.onmessage = (event) => {
      data.value = event.data
      onMessage?.(event.data)
    }
    
    ws.onclose = (event) => {
      status.value = 'closed'
      onClose?.(event)
      
      if (reconnect && reconnectAttempts < maxReconnectAttempts) {
        reconnectAttempts++
        reconnectTimer = setTimeout(connect, reconnectInterval * reconnectAttempts)
      }
    }
    
    ws.onerror = (err) => {
      status.value = 'error'
      error.value = err
      onError?.(err)
    }
  }
  
  const send = (message) => {
    if (ws?.readyState === WebSocket.OPEN) {
      ws.send(typeof message === 'string' ? message : JSON.stringify(message))
      return true
    }
    return false
  }
  
  const close = () => {
    if (reconnectTimer) {
      clearTimeout(reconnectTimer)
    }
    stopHeartbeat()
    ws?.close()
  }
  
  // 心跳逻辑
  let heartbeatTimer = null
  const startHeartbeat = () => {
    heartbeatTimer = setInterval(() => {
      send('ping')
    }, heartbeatInterval)
  }
  
  const stopHeartbeat = () => {
    if (heartbeatTimer) {
      clearInterval(heartbeatTimer)
      heartbeatTimer = null
    }
  }
  
  onMounted(() => {
    connect()
  })
  
  onUnmounted(() => {
    close()
  })
  
  return {
    data,
    status,
    error,
    send,
    close,
    reconnect: connect
  }
}

3.4 SSE 组合式 API 封装 #

// composables/useSSE.js
import { ref, onMounted, onUnmounted } from 'vue'
 
export function useSSE(url, options = {}) {
  const data = ref(null)
  const status = ref('connecting') // connecting | open | closed | error
  const error = ref(null)
  const lastEventId = ref(null)
  
  let eventSource = null
  
  const {
    withCredentials = false,
    onOpen,
    onMessage,
    onError,
    events = {} // 自定义事件监听
  } = options
  
  const connect = () => {
    status.value = 'connecting'
    
    // 断点续传:带上上次的事件 ID
    const urlWithId = lastEventId.value 
      ? `${url}?lastEventId=${lastEventId.value}` 
      : url
    
    eventSource = new EventSource(urlWithId, { withCredentials })
    
    eventSource.onopen = (event) => {
      status.value = 'open'
      error.value = null
      onOpen?.(event)
    }
    
    eventSource.onmessage = (event) => {
      lastEventId.value = event.lastEventId
      data.value = event.data
      onMessage?.(event.data)
    }
    
    // 注册自定义事件
    Object.entries(events).forEach(([eventName, handler]) => {
      eventSource.addEventListener(eventName, (event) => {
        lastEventId.value = event.lastEventId
        handler(JSON.parse(event.data))
      })
    })
    
    eventSource.onerror = (err) => {
      status.value = 'error'
      error.value = err
      onError?.(err)
      // SSE 会自动重连
    }
  }
  
  const close = () => {
    eventSource?.close()
    status.value = 'closed'
  }
  
  onMounted(() => {
    connect()
  })
  
  onUnmounted(() => {
    close()
  })
  
  return {
    data,
    status,
    error,
    lastEventId,
    close,
    reconnect: connect
  }
}

四、实战场景 #

场景 1:实时聊天室(WebSocket) #

需求:多人在线聊天,支持私聊、群聊、消息已读状态

Vue 3 聊天组件 #

<!-- components/ChatRoom.vue -->
<template>
  <div class="chat-room">
    <!-- 在线用户列表 -->
    <div class="users-panel">
      <h3>在线用户 ({{ onlineUsers.length }})</h3>
      <ul>
        <li v-for="user in onlineUsers" :key="user.id">
          <span class="status-dot" :class="{ online: user.online }"></span>
          {{ user.name }}
        </li>
      </ul>
    </div>
    
    <!-- 消息列表 -->
    <div class="messages-panel" ref="messagesContainer">
      <div 
        v-for="msg in messages" 
        :key="msg.id"
        :class="['message', { 'my-message': msg.userId === currentUserId }]"
      >
        <div class="message-header">
          <span class="username">{{ msg.username }}</span>
          <span class="time">{{ formatTime(msg.timestamp) }}</span>
        </div>
        <div class="message-content">{{ msg.content }}</div>
        <div v-if="msg.userId !== currentUserId" class="message-status">
          {{ msg.read ? '已读' : '未读' }}
        </div>
      </div>
    </div>
    
    <!-- 输入框 -->
    <div class="input-panel">
      <input
        v-model="inputMessage"
        @keyup.enter="sendMessage"
        placeholder="输入消息..."
        :disabled="wsStatus !== 'open'"
      />
      <button @click="sendMessage" :disabled="!inputMessage || wsStatus !== 'open'">
        发送
      </button>
    </div>
    
    <!-- 连接状态 -->
    <div class="status-bar" :class="wsStatus">
      {{ statusText }}
    </div>
  </div>
</template>
 
<script setup>
import { ref, computed, watch, nextTick, onMounted, onUnmounted } from 'vue'
 
// Props
const props = defineProps({
  roomId: {
    type: String,
    required: true
  },
  currentUserId: {
    type: String,
    required: true
  }
})
 
// 状态
const messages = ref([])
const onlineUsers = ref([])
const inputMessage = ref('')
const wsStatus = ref('connecting')
const messagesContainer = ref(null)
const error = ref(null)
 
let ws = null
let reconnectAttempts = 0
const maxReconnectAttempts = 10
 
// 计算属性
const statusText = computed(() => {
  switch (wsStatus.value) {
    case 'connecting': return '连接中...'
    case 'open': return '已连接'
    case 'closed': return '已断开'
    case 'error': return `连接错误: ${error.value?.message || '未知错误'}`
    default: return '未知状态'
  }
})
 
// WebSocket 连接
const connect = () => {
  const wsUrl = `${import.meta.env.VITE_WS_URL}/chat?roomId=${props.roomId}&userId=${props.currentUserId}`
  ws = new WebSocket(wsUrl)
  
  ws.onopen = () => {
    wsStatus.value = 'open'
    reconnectAttempts = 0
    // 发送心跳
    startHeartbeat()
  }
  
  ws.onmessage = (event) => {
    const data = JSON.parse(event.data)
    handleMessage(data)
  }
  
  ws.onclose = () => {
    wsStatus.value = 'closed'
    stopHeartbeat()
    attemptReconnect()
  }
  
  ws.onerror = (err) => {
    wsStatus.value = 'error'
    error.value = err
  }
}
 
// 处理消息
const handleMessage = (data) => {
  switch (data.type) {
    case 'message':
      messages.value.push(data.payload)
      scrollToBottom()
      break
    case 'userList':
      onlineUsers.value = data.payload
      break
    case 'userJoin':
      onlineUsers.value.push(data.payload)
      break
    case 'userLeave':
      onlineUsers.value = onlineUsers.value.filter(u => u.id !== data.payload.userId)
      break
    case 'messageRead':
      // 更新消息已读状态
      const msg = messages.value.find(m => m.id === data.payload.messageId)
      if (msg) msg.read = true
      break
  }
}
 
// 发送消息
const sendMessage = () => {
  if (!inputMessage.value.trim() || wsStatus.value !== 'open') return
  
  const message = {
    type: 'message',
    payload: {
      content: inputMessage.value.trim(),
      userId: props.currentUserId,
      timestamp: Date.now()
    }
  }
  
  ws.send(JSON.stringify(message))
  inputMessage.value = ''
}
 
// 重连机制
const attemptReconnect = () => {
  if (reconnectAttempts >= maxReconnectAttempts) {
    error.value = new Error('重连失败,请刷新页面')
    return
  }
  
  reconnectAttempts++
  setTimeout(() => {
    connect()
  }, Math.min(1000 * reconnectAttempts, 30000))
}
 
// 心跳保活
let heartbeatTimer = null
const startHeartbeat = () => {
  heartbeatTimer = setInterval(() => {
    if (ws?.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ type: 'ping' }))
    }
  }, 30000)
}
 
const stopHeartbeat = () => {
  if (heartbeatTimer) {
    clearInterval(heartbeatTimer)
    heartbeatTimer = null
  }
}
 
// 滚动到底部
const scrollToBottom = () => {
  nextTick(() => {
    if (messagesContainer.value) {
      messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight
    }
  })
}
 
// 格式化时间
const formatTime = (timestamp) => {
  return new Date(timestamp).toLocaleTimeString('zh-CN', {
    hour: '2-digit',
    minute: '2-digit'
  })
}
 
// 监听房间变化
watch(() => props.roomId, () => {
  messages.value = []
  ws?.close()
  connect()
})
 
// 生命周期
onMounted(() => {
  connect()
})
 
onUnmounted(() => {
  stopHeartbeat()
  ws?.close()
})
</script>
 
<style scoped>
.chat-room {
  display: grid;
  grid-template-columns: 200px 1fr;
  grid-template-rows: 1fr auto auto;
  height: 100vh;
}
 
.users-panel {
  grid-row: 1 / 3;
  border-right: 1px solid #ddd;
  padding: 16px;
  overflow-y: auto;
}
 
.messages-panel {
  padding: 16px;
  overflow-y: auto;
}
 
.message {
  margin-bottom: 12px;
  padding: 8px 12px;
  border-radius: 8px;
  background: #f5f5f5;
}
 
.message.my-message {
  background: #e3f2fd;
  margin-left: auto;
  max-width: 70%;
}
 
.input-panel {
  display: flex;
  gap: 8px;
  padding: 16px;
  border-top: 1px solid #ddd;
}
 
.input-panel input {
  flex: 1;
  padding: 8px 12px;
  border: 1px solid #ddd;
  border-radius: 4px;
}
 
.status-bar {
  padding: 8px;
  text-align: center;
  font-size: 12px;
}
 
.status-bar.open { background: #c8e6c9; }
.status-bar.connecting { background: #fff9c4; }
.status-bar.closed, .status-bar.error { background: #ffcdd2; }
 
.status-dot {
  display: inline-block;
  width: 8px;
  height: 8px;
  border-radius: 50%;
  background: #ccc;
  margin-right: 8px;
}
 
.status-dot.online { background: #4caf50; }
</style>

Node.js WebSocket 服务器(使用 ws 库) #

// server.js
const WebSocket = require('ws')
const http = require('http')
 
const server = http.createServer()
const wss = new WebSocket.Server({ server })
 
// 存储房间和用户信息
const rooms = new Map() // roomId -> Set<ws>
const userSockets = new Map() // userId -> ws
 
wss.on('connection', (ws, req) => {
  // 解析 URL 参数
  const url = new URL(req.url, `http://${req.headers.host}`)
  const roomId = url.searchParams.get('roomId')
  const userId = url.searchParams.get('userId')
  
  // 绑定用户信息到 ws
  ws.roomId = roomId
  ws.userId = userId
  ws.isAlive = true
  
  // 加入房间
  if (!rooms.has(roomId)) {
    rooms.set(roomId, new Set())
  }
  rooms.get(roomId).add(ws)
  userSockets.set(userId, ws)
  
  // 通知房间内其他用户
  broadcastToRoom(roomId, {
    type: 'userJoin',
    payload: { id: userId, name: `User${userId.slice(0, 4)}`, online: true }
  }, ws)
  
  // 发送当前在线用户列表
  const users = Array.from(rooms.get(roomId))
    .map(client => ({
      id: client.userId,
      name: `User${client.userId.slice(0, 4)}`,
      online: true
    }))
  ws.send(JSON.stringify({ type: 'userList', payload: users }))
  
  // 接收消息
  ws.on('message', (data) => {
    const message = JSON.parse(data.toString())
    
    if (message.type === 'ping') {
      ws.isAlive = true
      ws.send(JSON.stringify({ type: 'pong' }))
      return
    }
    
    if (message.type === 'message') {
      const newMessage = {
        id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
        type: 'message',
        payload: {
          ...message.payload,
          id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
          userId,
          username: `User${userId.slice(0, 4)}`,
          read: false
        }
      }
      broadcastToRoom(roomId, newMessage)
    }
  })
  
  // 断开连接
  ws.on('close', () => {
    rooms.get(roomId)?.delete(ws)
    userSockets.delete(userId)
    
    broadcastToRoom(roomId, {
      type: 'userLeave',
      payload: { userId }
    })
  })
})
 
// 广播到房间
function broadcastToRoom(roomId, message, excludeWs = null) {
  const clients = rooms.get(roomId)
  if (!clients) return
  
  const data = JSON.stringify(message)
  clients.forEach(client => {
    if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
      client.send(data)
    }
  })
}
 
// 心跳检测
setInterval(() => {
  wss.clients.forEach(ws => {
    if (!ws.isAlive) {
      return ws.terminate()
    }
    ws.isAlive = false
    ws.send(JSON.stringify({ type: 'ping' }))
  })
}, 30000)
 
server.listen(8080, () => {
  console.log('WebSocket server running on ws://localhost:8080')
})

场景 2:实时通知推送(SSE) #

需求:系统通知实时推送到前端,支持未读计数、通知类型分类

Vue 3 通知组件 #

<!-- components/NotificationCenter.vue -->
<template>
  <div class="notification-center">
    <!-- 通知图标 -->
    <div class="notification-icon" @click="togglePanel">
      <span class="icon">🔔</span>
      <span v-if="unreadCount > 0" class="badge">{{ unreadCount > 99 ? '99+' : unreadCount }}</span>
    </div>
    
    <!-- 通知面板 -->
    <div v-if="showPanel" class="notification-panel">
      <div class="panel-header">
        <h3>通知</h3>
        <button @click="markAllRead" :disabled="unreadCount === 0">全部已读</button>
      </div>
      
      <div class="panel-tabs">
        <button 
          v-for="tab in tabs" 
          :key="tab.key"
          :class="['tab', { active: activeTab === tab.key }]"
          @click="activeTab = tab.key"
        >
          {{ tab.label }}
          <span v-if="tab.count > 0" class="tab-count">{{ tab.count }}</span>
        </button>
      </div>
      
      <div class="panel-content">
        <div v-if="filteredNotifications.length === 0" class="empty">
          暂无通知
        </div>
        <div 
          v-for="notification in filteredNotifications" 
          :key="notification.id"
          :class="['notification-item', { unread: !notification.read }]"
          @click="handleNotificationClick(notification)"
        >
          <div class="notification-icon-type" :class="notification.type">
            {{ getNotificationIcon(notification.type) }}
          </div>
          <div class="notification-content">
            <div class="notification-title">{{ notification.title }}</div>
            <div class="notification-message">{{ notification.message }}</div>
            <div class="notification-time">{{ formatTime(notification.createdAt) }}</div>
          </div>
          <button class="mark-read-btn" @click.stop="markRead(notification.id)">
            {{ notification.read ? '已读' : '标记已读' }}
          </button>
        </div>
      </div>
      
      <div class="panel-footer">
        <span :class="['status', sseStatus]">
          {{ sseStatus === 'open' ? '实时更新中' : '连接断开' }}
        </span>
      </div>
    </div>
  </div>
</template>
 
<script setup>
import { ref, computed, onMounted, onUnmounted } from 'vue'
 
// Props
const props = defineProps({
  userId: {
    type: String,
    required: true
  }
})
 
// 状态
const notifications = ref([])
const showPanel = ref(false)
const activeTab = ref('all')
const sseStatus = ref('connecting')
 
let eventSource = null
 
// 通知类型
const tabs = computed(() => [
  { 
    key: 'all', 
    label: '全部', 
    count: notifications.value.filter(n => !n.read).length 
  },
  { 
    key: 'system', 
    label: '系统', 
    count: notifications.value.filter(n => n.type === 'system' && !n.read).length 
  },
  { 
    key: 'message', 
    label: '消息', 
    count: notifications.value.filter(n => n.type === 'message' && !n.read).length 
  },
  { 
    key: 'alert', 
    label: '警告', 
    count: notifications.value.filter(n => n.type === 'alert' && !n.read).length 
  }
])
 
// 未读数量
const unreadCount = computed(() => {
  return notifications.value.filter(n => !n.read).length
})
 
// 过滤后的通知
const filteredNotifications = computed(() => {
  if (activeTab.value === 'all') {
    return notifications.value
  }
  return notifications.value.filter(n => n.type === activeTab.value)
})
 
// SSE 连接
const connectSSE = () => {
  const sseUrl = `${import.meta.env.VITE_API_URL}/notifications/stream?userId=${props.userId}`
  
  eventSource = new EventSource(sseUrl)
  
  eventSource.onopen = () => {
    sseStatus.value = 'open'
    console.log('SSE 连接成功')
  }
  
  // 默认消息
  eventSource.onmessage = (event) => {
    const notification = JSON.parse(event.data)
    addNotification(notification)
  }
  
  // 自定义事件:新通知
  eventSource.addEventListener('notification', (event) => {
    const notification = JSON.parse(event.data)
    addNotification(notification)
  })
  
  // 自定义事件:通知已读
  eventSource.addEventListener('read', (event) => {
    const { notificationId } = JSON.parse(event.data)
    markRead(notificationId)
  })
  
  // 自定义事件:全部已读
  eventSource.addEventListener('readAll', () => {
    notifications.value.forEach(n => n.read = true)
  })
  
  eventSource.onerror = () => {
    sseStatus.value = 'error'
    // SSE 会自动重连
  }
}
 
// 添加通知
const addNotification = (notification) => {
  // 避免重复
  if (notifications.value.some(n => n.id === notification.id)) {
    return
  }
  
  notifications.value.unshift({
    ...notification,
    read: false,
    createdAt: notification.createdAt || new Date().toISOString()
  })
  
  // 显示桌面通知
  if (Notification.permission === 'granted') {
    new Notification(notification.title, {
      body: notification.message,
      icon: '/favicon.ico'
    })
  }
}
 
// 标记已读
const markRead = async (notificationId) => {
  const notification = notifications.value.find(n => n.id === notificationId)
  if (notification) {
    notification.read = true
    
    // 通知服务器
    await fetch(`${import.meta.env.VITE_API_URL}/notifications/${notificationId}/read`, {
      method: 'POST'
    })
  }
}
 
// 全部标记已读
const markAllRead = async () => {
  notifications.value.forEach(n => n.read = true)
  
  await fetch(`${import.meta.env.VITE_API_URL}/notifications/read-all?userId=${props.userId}`, {
    method: 'POST'
  })
}
 
// 点击通知
const handleNotificationClick = async (notification) => {
  await markRead(notification.id)
  
  // 跳转到相关页面
  if (notification.link) {
    window.location.href = notification.link
  }
}
 
// 切换面板
const togglePanel = () => {
  showPanel.value = !showPanel.value
}
 
// 获取通知图标
const getNotificationIcon = (type) => {
  const icons = {
    system: '⚙️',
    message: '💬',
    alert: '⚠️',
    success: '✅',
    warning: '🔔'
  }
  return icons[type] || '📌'
}
 
// 格式化时间
const formatTime = (isoString) => {
  const date = new Date(isoString)
  const now = new Date()
  const diff = now - date
  
  if (diff < 60000) return '刚刚'
  if (diff < 3600000) return `${Math.floor(diff / 60000)} 分钟前`
  if (diff < 86400000) return `${Math.floor(diff / 3600000)} 小时前`
  return date.toLocaleDateString('zh-CN')
}
 
// 请求桌面通知权限
const requestNotificationPermission = async () => {
  if ('Notification' in window && Notification.permission === 'default') {
    await Notification.requestPermission()
  }
}
 
// 生命周期
onMounted(() => {
  requestNotificationPermission()
  connectSSE()
})
 
onUnmounted(() => {
  eventSource?.close()
})
</script>
 
<style scoped>
.notification-center {
  position: relative;
}
 
.notification-icon {
  position: relative;
  cursor: pointer;
}
 
.notification-icon .icon {
  font-size: 24px;
}
 
.badge {
  position: absolute;
  top: -5px;
  right: -5px;
  background: #f44336;
  color: white;
  font-size: 10px;
  padding: 2px 6px;
  border-radius: 10px;
}
 
.notification-panel {
  position: absolute;
  top: 100%;
  right: 0;
  width: 360px;
  max-height: 480px;
  background: white;
  border-radius: 8px;
  box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
  z-index: 1000;
  display: flex;
  flex-direction: column;
}
 
.panel-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 12px 16px;
  border-bottom: 1px solid #eee;
}
 
.panel-tabs {
  display: flex;
  border-bottom: 1px solid #eee;
}
 
.tab {
  flex: 1;
  padding: 8px;
  background: none;
  border: none;
  cursor: pointer;
  font-size: 13px;
}
 
.tab.active {
  border-bottom: 2px solid #1976d2;
  color: #1976d2;
}
 
.tab-count {
  margin-left: 4px;
  background: #eee;
  padding: 2px 6px;
  border-radius: 10px;
  font-size: 11px;
}
 
.panel-content {
  flex: 1;
  overflow-y: auto;
}
 
.notification-item {
  display: flex;
  gap: 12px;
  padding: 12px 16px;
  border-bottom: 1px solid #f5f5f5;
  cursor: pointer;
}
 
.notification-item:hover {
  background: #fafafa;
}
 
.notification-item.unread {
  background: #e3f2fd;
}
 
.notification-icon-type {
  width: 36px;
  height: 36px;
  border-radius: 50%;
  display: flex;
  align-items: center;
  justify-content: center;
  font-size: 18px;
}
 
.notification-icon-type.system { background: #e3f2fd; }
.notification-icon-type.message { background: #f3e5f5; }
.notification-icon-type.alert { background: #fff3e0; }
 
.notification-content {
  flex: 1;
}
 
.notification-title {
  font-weight: 500;
  margin-bottom: 4px;
}
 
.notification-message {
  font-size: 13px;
  color: #666;
  margin-bottom: 4px;
}
 
.notification-time {
  font-size: 11px;
  color: #999;
}
 
.mark-read-btn {
  font-size: 11px;
  padding: 4px 8px;
  background: none;
  border: 1px solid #ddd;
  border-radius: 4px;
  cursor: pointer;
}
 
.panel-footer {
  padding: 8px 16px;
  border-top: 1px solid #eee;
  text-align: center;
}
 
.status {
  font-size: 12px;
  color: #999;
}
 
.status.open { color: #4caf50; }
.status.error { color: #f44336; }
 
.empty {
  padding: 40px;
  text-align: center;
  color: #999;
}
</style>

Node.js SSE 服务器(Express) #

// server.js
const express = require('express')
const cors = require('cors')
 
const app = express()
app.use(cors())
app.use(express.json())
 
// 存储客户端连接
const clients = new Map() // userId -> res
 
// SSE 端点
app.get('/notifications/stream', (req, res) => {
  const userId = req.query.userId
  
  // 设置 SSE 响应头
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  res.setHeader('X-Accel-Buffering', 'no') // Nginx 需要这个
  
  // 保存连接
  clients.set(userId, res)
  console.log(`User ${userId} connected. Total clients: ${clients.size}`)
  
  // 发送初始连接成功事件
  res.write(`event: connected\ndata: ${JSON.stringify({ userId })}\n\n`)
  
  // 定期发送心跳,保持连接
  const heartbeat = setInterval(() => {
    res.write(': heartbeat\n\n') // 注释行作为心跳
  }, 15000)
  
  // 客户端断开
  req.on('close', () => {
    clearInterval(heartbeat)
    clients.delete(userId)
    console.log(`User ${userId} disconnected. Total clients: ${clients.size}`)
  })
})
 
// 发送通知 API
app.post('/notifications/send', (req, res) => {
  const { userId, notification } = req.body
  
  const client = clients.get(userId)
  if (client) {
    // 发送 SSE 事件
    client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
    res.json({ success: true, message: 'Notification sent' })
  } else {
    res.json({ success: false, message: 'User not connected' })
  }
})
 
// 批量发送通知
app.post('/notifications/broadcast', (req, res) => {
  const { notification, userIds } = req.body
  
  let sentCount = 0
  const targetUsers = userIds || Array.from(clients.keys())
  
  targetUsers.forEach(userId => {
    const client = clients.get(userId)
    if (client) {
      client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
      sentCount++
    }
  })
  
  res.json({ success: true, sentCount })
})
 
// 标记已读
app.post('/notifications/:id/read', (req, res) => {
  // 这里应该更新数据库
  // 通知客户端更新状态
  res.json({ success: true })
})
 
// 全部已读
app.post('/notifications/read-all', (req, res) => {
  const { userId } = req.query
  const client = clients.get(userId)
  
  if (client) {
    client.write(`event: readAll\ndata: {}\n\n`)
  }
  
  res.json({ success: true })
})
 
// 模拟发送通知(测试用)
app.post('/notifications/test', (req, res) => {
  const notification = {
    id: `notif-${Date.now()}`,
    type: ['system', 'message', 'alert'][Math.floor(Math.random() * 3)],
    title: '测试通知',
    message: '这是一条测试通知消息',
    createdAt: new Date().toISOString()
  }
  
  // 广播给所有客户端
  clients.forEach(client => {
    client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
  })
  
  res.json({ success: true, clientCount: clients.size })
})
 
const PORT = process.env.PORT || 3000
app.listen(PORT, () => {
  console.log(`SSE server running on http://localhost:${PORT}`)
})

场景 3:实时数据大屏(SSE + Vue 3) #

需求:实时展示业务数据,支持多种图表、自动刷新

<!-- components/RealtimeDashboard.vue -->
<template>
  <div class="dashboard">
    <div class="dashboard-header">
      <h1>实时数据大屏</h1>
      <div class="status">
        <span :class="['connection-status', sseStatus]">
          {{ sseStatus === 'open' ? '● 实时' : '○ 离线' }}
        </span>
        <span class="update-time">更新于: {{ lastUpdateTime }}</span>
      </div>
    </div>
    
    <div class="dashboard-grid">
      <!-- 关键指标卡片 -->
      <div v-for="metric in metrics" :key="metric.key" class="metric-card">
        <div class="metric-label">{{ metric.label }}</div>
        <div class="metric-value">
          <span class="value">{{ formatNumber(metric.value) }}</span>
          <span :class="['trend', metric.trend]">
            {{ metric.trend === 'up' ? '↑' : metric.trend === 'down' ? '↓' : '-' }}
            {{ metric.change }}%
          </span>
        </div>
      </div>
      
      <!-- 实时折线图 -->
      <div class="chart-card">
        <h3>实时访问量</h3>
        <div ref="lineChartRef" class="chart"></div>
      </div>
      
      <!-- 实时柱状图 -->
      <div class="chart-card">
        <h3>地区分布</h3>
        <div ref="barChartRef" class="chart"></div>
      </div>
      
      <!-- 实时数据流 -->
      <div class="data-stream-card">
        <h3>实时数据流</h3>
        <div class="data-stream">
          <div 
            v-for="(item, index) in dataStream" 
            :key="item.id"
            class="stream-item"
            :style="{ animationDelay: `${index * 0.1}s` }"
          >
            <span class="stream-time">{{ item.time }}</span>
            <span class="stream-type">{{ item.type }}</span>
            <span class="stream-content">{{ item.content }}</span>
          </div>
        </div>
      </div>
    </div>
  </div>
</template>
 
<script setup>
import { ref, onMounted, onUnmounted, watch } from 'vue'
import * as echarts from 'echarts'
 
// 状态
const sseStatus = ref('connecting')
const lastUpdateTime = ref('-')
const metrics = ref([
  { key: 'totalUsers', label: '总用户数', value: 0, trend: 'up', change: 0 },
  { key: 'activeUsers', label: '在线用户', value: 0, trend: 'up', change: 0 },
  { key: 'pageViews', label: '页面浏览', value: 0, trend: 'up', change: 0 },
  { key: 'conversions', label: '转化数', value: 0, trend: 'down', change: 0 }
])
const dataStream = ref([])
const lineChartRef = ref(null)
const barChartRef = ref(null)
 
let eventSource = null
let lineChart = null
let barChart = null
let lineChartData = []
 
// SSE 连接
const connectSSE = () => {
  const sseUrl = `${import.meta.env.VITE_API_URL}/dashboard/stream`
  
  eventSource = new EventSource(sseUrl)
  
  eventSource.onopen = () => {
    sseStatus.value = 'open'
  }
  
  // 实时指标更新
  eventSource.addEventListener('metrics', (event) => {
    const data = JSON.parse(event.data)
    updateMetrics(data)
    lastUpdateTime.value = new Date().toLocaleTimeString('zh-CN')
  })
  
  // 折线图数据
  eventSource.addEventListener('lineChart', (event) => {
    const data = JSON.parse(event.data)
    updateLineChart(data)
  })
  
  // 柱状图数据
  eventSource.addEventListener('barChart', (event) => {
    const data = JSON.parse(event.data)
    updateBarChart(data)
  })
  
  // 数据流
  eventSource.addEventListener('dataStream', (event) => {
    const data = JSON.parse(event.data)
    addDataStreamItem(data)
  })
  
  eventSource.onerror = () => {
    sseStatus.value = 'error'
  }
}
 
// 更新指标
const updateMetrics = (data) => {
  metrics.value = metrics.value.map(metric => ({
    ...metric,
    value: data[metric.key]?.value ?? metric.value,
    trend: data[metric.key]?.trend ?? metric.trend,
    change: data[metric.key]?.change ?? metric.change
  }))
}
 
// 初始化折线图
const initLineChart = () => {
  if (!lineChartRef.value) return
  
  lineChart = echarts.init(lineChartRef.value)
  lineChart.setOption({
    tooltip: { trigger: 'axis' },
    xAxis: {
      type: 'category',
      data: []
    },
    yAxis: { type: 'value' },
    series: [{
      type: 'line',
      smooth: true,
      data: [],
      areaStyle: { opacity: 0.3 }
    }]
  })
}
 
// 更新折线图
const updateLineChart = (data) => {
  lineChartData.push(data)
  
  // 只保留最近 30 个数据点
  if (lineChartData.length > 30) {
    lineChartData.shift()
  }
  
  lineChart?.setOption({
    xAxis: {
      data: lineChartData.map(d => d.time)
    },
    series: [{
      data: lineChartData.map(d => d.value)
    }]
  })
}
 
// 初始化柱状图
const initBarChart = () => {
  if (!barChartRef.value) return
  
  barChart = echarts.init(barChartRef.value)
  barChart.setOption({
    tooltip: { trigger: 'axis' },
    xAxis: { type: 'category' },
    yAxis: { type: 'value' },
    series: [{
      type: 'bar',
      data: []
    }]
  })
}
 
// 更新柱状图
const updateBarChart = (data) => {
  barChart?.setOption({
    xAxis: { data: data.map(d => d.name) },
    series: [{ data: data.map(d => d.value) }]
  })
}
 
// 添加数据流项
const addDataStreamItem = (item) => {
  dataStream.value.unshift({
    id: Date.now(),
    time: new Date().toLocaleTimeString('zh-CN'),
    type: item.type,
    content: item.content
  })
  
  // 只保留最近 50 条
  if (dataStream.value.length > 50) {
    dataStream.value.pop()
  }
}
 
// 格式化数字
const formatNumber = (num) => {
  if (num >= 1000000) return (num / 1000000).toFixed(1) + 'M'
  if (num >= 1000) return (num / 1000).toFixed(1) + 'K'
  return num.toLocaleString()
}
 
// 窗口大小变化时重绘图表
const handleResize = () => {
  lineChart?.resize()
  barChart?.resize()
}
 
// 生命周期
onMounted(() => {
  initLineChart()
  initBarChart()
  connectSSE()
  window.addEventListener('resize', handleResize)
})
 
onUnmounted(() => {
  eventSource?.close()
  lineChart?.dispose()
  barChart?.dispose()
  window.removeEventListener('resize', handleResize)
})
</script>
 
<style scoped>
.dashboard {
  background: #1a1a2e;
  min-height: 100vh;
  padding: 20px;
  color: white;
}
 
.dashboard-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  margin-bottom: 20px;
}
 
.dashboard-header h1 {
  font-size: 24px;
  font-weight: 500;
}
 
.connection-status {
  font-size: 14px;
  margin-right: 16px;
}
 
.connection-status.open { color: #4caf50; }
.connection-status.error { color: #f44336; }
 
.update-time {
  font-size: 12px;
  color: #888;
}
 
.dashboard-grid {
  display: grid;
  grid-template-columns: repeat(4, 1fr);
  gap: 20px;
}
 
.metric-card {
  background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
  padding: 20px;
  border-radius: 12px;
}
 
.metric-label {
  font-size: 14px;
  opacity: 0.8;
  margin-bottom: 8px;
}
 
.metric-value {
  display: flex;
  align-items: baseline;
  gap: 12px;
}
 
.metric-value .value {
  font-size: 32px;
  font-weight: 600;
}
 
.trend {
  font-size: 14px;
}
 
.trend.up { color: #4caf50; }
.trend.down { color: #f44336; }
 
.chart-card {
  grid-column: span 2;
  background: #16213e;
  padding: 20px;
  border-radius: 12px;
}
 
.chart-card h3 {
  margin-bottom: 16px;
  font-weight: 500;
}
 
.chart {
  height: 300px;
}
 
.data-stream-card {
  grid-column: span 4;
  background: #16213e;
  padding: 20px;
  border-radius: 12px;
  max-height: 300px;
  overflow: hidden;
}
 
.data-stream {
  overflow-y: auto;
  max-height: 220px;
}
 
.stream-item {
  display: flex;
  gap: 16px;
  padding: 8px 12px;
  margin-bottom: 4px;
  background: rgba(255, 255, 255, 0.05);
  border-radius: 4px;
  animation: fadeIn 0.3s ease;
}
 
@keyframes fadeIn {
  from { opacity: 0; transform: translateY(-10px); }
  to { opacity: 1; transform: translateY(0); }
}
 
.stream-time {
  color: #888;
  font-size: 12px;
}
 
.stream-type {
  background: #667eea;
  padding: 2px 8px;
  border-radius: 4px;
  font-size: 12px;
}
 
.stream-content {
  flex: 1;
  font-size: 14px;
}
</style>

场景 4:协作编辑(WebSocket + OT) #

需求:多人同时编辑文档,实时同步,支持操作转换(OT)

<!-- components/CollaborativeEditor.vue -->
<template>
  <div class="collaborative-editor">
    <div class="editor-header">
      <div class="document-info">
        <h2>{{ documentTitle }}</h2>
        <span class="last-saved">最后保存: {{ lastSaved }}</span>
      </div>
      <div class="collaborators">
        <div 
          v-for="user in collaborators" 
          :key="user.id"
          class="collaborator"
          :style="{ borderColor: user.color }"
        >
          <span class="avatar" :style="{ background: user.color }">
            {{ user.name[0] }}
          </span>
          <span class="cursor-indicator" v-if="user.cursor">
            {{ user.name }} 正在编辑...
          </span>
        </div>
      </div>
    </div>
    
    <div class="editor-container">
      <textarea
        ref="textareaRef"
        v-model="content"
        @input="handleInput"
        @select="handleSelect"
        @keydown="handleKeydown"
        placeholder="开始编辑..."
      ></textarea>
      
      <!-- 远程光标 -->
      <div 
        v-for="user in collaborators.filter(u => u.cursor && u.id !== currentUserId)"
        :key="user.id"
        class="remote-cursor"
        :style="{
          top: user.cursor.line * 20 + 'px',
          left: user.cursor.column * 8 + 'px',
          borderColor: user.color
        }"
      >
        <span class="cursor-name" :style="{ background: user.color }">
          {{ user.name }}
        </span>
      </div>
    </div>
    
    <div class="editor-footer">
      <span :class="['sync-status', syncStatus]">
        {{ syncStatusText }}
      </span>
      <span class="word-count">{{ content.length }} 字符</span>
    </div>
  </div>
</template>
 
<script setup>
import { ref, computed, onMounted, onUnmounted, watch } from 'vue'
 
// Props
const props = defineProps({
  documentId: {
    type: String,
    required: true
  },
  currentUserId: {
    type: String,
    required: true
  },
  userName: {
    type: String,
    default: 'Anonymous'
  }
})
 
// 状态
const documentTitle = ref('未命名文档')
const content = ref('')
const lastSaved = ref('-')
const syncStatus = ref('synced') // synced | syncing | offline
const collaborators = ref([])
const textareaRef = ref(null)
 
let ws = null
let pendingOps = []
let revision = 0
 
// 计算属性
const syncStatusText = computed(() => {
  switch (syncStatus.value) {
    case 'synced': return '已同步'
    case 'syncing': return '同步中...'
    case 'offline': return '离线'
    default: return '未知'
  }
})
 
// WebSocket 连接
const connect = () => {
  const wsUrl = `${import.meta.env.VITE_WS_URL}/collab?docId=${props.documentId}&userId=${props.currentUserId}&userName=${encodeURIComponent(props.userName)}`
  ws = new WebSocket(wsUrl)
  
  ws.onopen = () => {
    syncStatus.value = 'synced'
    // 发送待处理的操作
    flushPendingOps()
  }
  
  ws.onmessage = (event) => {
    const data = JSON.parse(event.data)
    handleMessage(data)
  }
  
  ws.onclose = () => {
    syncStatus.value = 'offline'
    // 尝试重连
    setTimeout(connect, 3000)
  }
  
  ws.onerror = () => {
    syncStatus.value = 'offline'
  }
}
 
// 处理消息
const handleMessage = (data) => {
  switch (data.type) {
    case 'init':
      // 初始化文档
      content.value = data.content
      documentTitle.value = data.title
      revision = data.revision
      break
      
    case 'op':
      // 收到操作
      applyOperation(data.operation, data.revision)
      break
      
    case 'ack':
      // 操作确认
      revision = data.revision
      syncStatus.value = 'synced'
      lastSaved.value = new Date().toLocaleTimeString('zh-CN')
      break
      
    case 'userJoin':
      collaborators.value.push(data.user)
      break
      
    case 'userLeave':
      collaborators.value = collaborators.value.filter(u => u.id !== data.userId)
      break
      
    case 'cursor':
      // 更新远程光标
      const user = collaborators.value.find(u => u.id === data.userId)
      if (user) {
        user.cursor = data.cursor
      }
      break
      
    case 'users':
      collaborators.value = data.users
      break
  }
}
 
// 处理输入
const handleInput = (event) => {
  if (!textareaRef.value) return
  
  const { selectionStart, selectionEnd } = textareaRef.value
  const inputValue = event.target.value
  
  // 检测变化
  if (inputValue.length > content.value.length) {
    // 插入操作
    const inserted = inputValue.slice(selectionStart - (inputValue.length - content.value.length))
    const op = {
      type: 'insert',
      position: selectionStart - inserted.length,
      text: inserted
    }
    sendOperation(op)
  } else if (inputValue.length < content.value.length) {
    // 删除操作
    const op = {
      type: 'delete',
      position: selectionEnd,
      length: content.value.length - inputValue.length
    }
    sendOperation(op)
  }
  
  content.value = inputValue
  syncStatus.value = 'syncing'
}
 
// 处理选择(光标位置)
const handleSelect = () => {
  if (!textareaRef.value || !ws) return
  
  const { selectionStart, selectionEnd } = textareaRef.value
  const lines = content.value.substring(0, selectionStart).split('\n')
  
  const cursor = {
    line: lines.length - 1,
    column: lines[lines.length - 1].length,
    start: selectionStart,
    end: selectionEnd
  }
  
  // 发送光标位置
  ws.send(JSON.stringify({
    type: 'cursor',
    cursor
  }))
}
 
// 发送操作
const sendOperation = (op) => {
  const message = {
    type: 'op',
    operation: op,
    revision
  }
  
  if (ws?.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify(message))
  } else {
    // 离线时缓存操作
    pendingOps.push(message)
  }
}
 
// 应用远程操作
const applyOperation = (op, remoteRevision) => {
  // 简化的 OT 实现(生产环境应使用成熟的 OT 库如 ShareDB)
  let newContent = content.value
  
  if (op.type === 'insert') {
    newContent = newContent.slice(0, op.position) + op.text + newContent.slice(op.position)
  } else if (op.type === 'delete') {
    newContent = newContent.slice(0, op.position) + newContent.slice(op.position + op.length)
  }
  
  content.value = newContent
  revision = remoteRevision
}
 
// 发送待处理的操作
const flushPendingOps = () => {
  pendingOps.forEach(op => {
    ws.send(JSON.stringify(op))
  })
  pendingOps = []
}
 
// 生命周期
onMounted(() => {
  connect()
})
 
onUnmounted(() => {
  ws?.close()
})
</script>
 
<style scoped>
.collaborative-editor {
  display: flex;
  flex-direction: column;
  height: 100vh;
  background: #1e1e1e;
}
 
.editor-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 12px 20px;
  background: #252526;
  border-bottom: 1px solid #3c3c3c;
}
 
.document-info h2 {
  margin: 0;
  font-size: 16px;
  color: #fff;
}
 
.last-saved {
  font-size: 12px;
  color: #888;
  margin-left: 12px;
}
 
.collaborators {
  display: flex;
  gap: 8px;
}
 
.collaborator {
  display: flex;
  align-items: center;
  padding: 4px 8px;
  border: 2px solid transparent;
  border-radius: 20px;
}
 
.avatar {
  width: 24px;
  height: 24px;
  border-radius: 50%;
  display: flex;
  align-items: center;
  justify-content: center;
  color: white;
  font-size: 12px;
  font-weight: 600;
}
 
.cursor-indicator {
  font-size: 11px;
  color: #888;
  margin-left: 8px;
}
 
.editor-container {
  flex: 1;
  position: relative;
  padding: 16px;
}
 
textarea {
  width: 100%;
  height: 100%;
  background: transparent;
  border: none;
  color: #d4d4d4;
  font-family: 'Monaco', 'Menlo', monospace;
  font-size: 14px;
  line-height: 20px;
  resize: none;
  outline: none;
}
 
.remote-cursor {
  position: absolute;
  width: 2px;
  height: 20px;
  pointer-events: none;
  animation: blink 1s infinite;
}
 
@keyframes blink {
  0%, 50% { opacity: 1; }
  51%, 100% { opacity: 0; }
}
 
.cursor-name {
  position: absolute;
  top: -20px;
  left: 0;
  padding: 2px 6px;
  border-radius: 3px;
  font-size: 11px;
  color: white;
  white-space: nowrap;
}
 
.editor-footer {
  display: flex;
  justify-content: space-between;
  padding: 8px 20px;
  background: #252526;
  border-top: 1px solid #3c3c3c;
  font-size: 12px;
  color: #888;
}
 
.sync-status.synced { color: #4caf50; }
.sync-status.syncing { color: #ff9800; }
.sync-status.offline { color: #f44336; }
</style>

场景 5:文件上传进度(WebSocket) #

需求:大文件上传,实时显示进度、速度、剩余时间

<!-- components/FileUploader.vue -->
<template>
  <div class="file-uploader">
    <div class="upload-area" @drop.prevent="handleDrop" @dragover.prevent>
      <input 
        ref="fileInputRef" 
        type="file" 
        multiple 
        @change="handleFileSelect"
        style="display: none"
      />
      <button @click="$refs.fileInputRef.click()">选择文件</button>
      <span>或拖拽文件到这里</span>
    </div>
    
    <div class="upload-list">
      <div 
        v-for="file in uploadFiles" 
        :key="file.id"
        class="upload-item"
      >
        <div class="file-info">
          <span class="file-icon">{{ getFileIcon(file.name) }}</span>
          <div class="file-details">
            <div class="file-name">{{ file.name }}</div>
            <div class="file-size">{{ formatSize(file.size) }}</div>
          </div>
        </div>
        
        <div class="upload-progress">
          <div class="progress-bar">
            <div 
              class="progress-fill" 
              :style="{ width: file.progress + '%' }"
              :class="{ error: file.status === 'error' }"
            ></div>
          </div>
          <div class="progress-info">
            <span class="progress-percent">{{ file.progress }}%</span>
            <span class="upload-speed" v-if="file.status === 'uploading'">
              {{ file.speed }}
            </span>
            <span class="remaining-time" v-if="file.status === 'uploading' && file.remainingTime">
              剩余 {{ file.remainingTime }}
            </span>
          </div>
        </div>
        
        <div class="upload-status">
          <span v-if="file.status === 'pending'" class="status-pending">等待中</span>
          <span v-else-if="file.status === 'uploading'" class="status-uploading">上传中</span>
          <span v-else-if="file.status === 'completed'" class="status-completed">✓ 完成</span>
          <span v-else-if="file.status === 'error'" class="status-error">失败</span>
          <button 
            v-if="file.status === 'error'" 
            @click="retryUpload(file)"
            class="retry-btn"
          >
            重试
          </button>
        </div>
      </div>
    </div>
    
    <div class="upload-summary" v-if="totalFiles > 0">
      <span>总计: {{ completedFiles }}/{{ totalFiles }} 个文件</span>
      <span>总大小: {{ formatSize(totalSize) }}</span>
    </div>
  </div>
</template>
 
<script setup>
import { ref, computed, onMounted, onUnmounted } from 'vue'
 
// 状态
const uploadFiles = ref([])
const fileInputRef = ref(null)
 
let ws = null
let fileId = 0
 
// 计算属性
const totalFiles = computed(() => uploadFiles.value.length)
const completedFiles = computed(() => 
  uploadFiles.value.filter(f => f.status === 'completed').length
)
const totalSize = computed(() => 
  uploadFiles.value.reduce((sum, f) => sum + f.size, 0)
)
 
// WebSocket 连接
const connectWebSocket = () => {
  const wsUrl = `${import.meta.env.VITE_WS_URL}/upload`
  ws = new WebSocket(wsUrl)
  
  ws.onmessage = (event) => {
    const data = JSON.parse(event.data)
    handleUploadProgress(data)
  }
  
  ws.onclose = () => {
    // 重连
    setTimeout(connectWebSocket, 3000)
  }
}
 
// 处理上传进度
const handleUploadProgress = (data) => {
  const file = uploadFiles.value.find(f => f.id === data.fileId)
  if (!file) return
  
  file.progress = data.progress
  file.speed = data.speed
  file.remainingTime = data.remainingTime
  
  if (data.status === 'completed') {
    file.status = 'completed'
    file.url = data.url
  } else if (data.status === 'error') {
    file.status = 'error'
    file.error = data.error
  }
}
 
// 处理文件选择
const handleFileSelect = (event) => {
  const files = Array.from(event.target.files)
  files.forEach(file => addFile(file))
}
 
// 处理拖拽
const handleDrop = (event) => {
  const files = Array.from(event.dataTransfer.files)
  files.forEach(file => addFile(file))
}
 
// 添加文件
const addFile = (file) => {
  const uploadFile = {
    id: `file-${++fileId}`,
    name: file.name,
    size: file.size,
    type: file.type,
    progress: 0,
    speed: '',
    remainingTime: '',
    status: 'pending',
    file: file
  }
  
  uploadFiles.value.push(uploadFile)
  uploadFile(uploadFile)
}
 
// 上传文件(分片上传)
const uploadFile = async (uploadFile) => {
  uploadFile.status = 'uploading'
  
  const CHUNK_SIZE = 1024 * 1024 // 1MB
  const totalChunks = Math.ceil(uploadFile.size / CHUNK_SIZE)
  const chunks = []
  
  for (let i = 0; i < totalChunks; i++) {
    const start = i * CHUNK_SIZE
    const end = Math.min(start + CHUNK_SIZE, uploadFile.size)
    chunks.push(uploadFile.file.slice(start, end))
  }
  
  // 发送文件信息
  ws.send(JSON.stringify({
    type: 'init',
    fileId: uploadFile.id,
    fileName: uploadFile.name,
    fileSize: uploadFile.size,
    totalChunks
  }))
  
  // 分片上传
  for (let i = 0; i < chunks.length; i++) {
    const chunk = chunks[i]
    const reader = new FileReader()
    
    reader.onload = () => {
      ws.send(JSON.stringify({
        type: 'chunk',
        fileId: uploadFile.id,
        chunkIndex: i,
        data: reader.result
      }))
    }
    
    reader.readAsArrayBuffer(chunk)
    
    // 等待服务器确认
    await new Promise(resolve => {
      const handler = (event) => {
        const data = JSON.parse(event.data)
        if (data.fileId === uploadFile.id && data.chunkIndex === i) {
          ws.removeEventListener('message', handler)
          resolve()
        }
      }
      ws.addEventListener('message', handler)
    })
  }
}
 
// 重试上传
const retryUpload = (file) => {
  file.progress = 0
  file.status = 'pending'
  uploadFile(file)
}
 
// 格式化文件大小
const formatSize = (bytes) => {
  if (bytes < 1024) return bytes + ' B'
  if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB'
  if (bytes < 1024 * 1024 * 1024) return (bytes / (1024 * 1024)).toFixed(1) + ' MB'
  return (bytes / (1024 * 1024 * 1024)).toFixed(1) + ' GB'
}
 
// 获取文件图标
const getFileIcon = (fileName) => {
  const ext = fileName.split('.').pop().toLowerCase()
  const icons = {
    pdf: '📄',
    doc: '📝', docx: '📝',
    xls: '📊', xlsx: '📊',
    ppt: '📽️', pptx: '📽️',
    zip: '📦', rar: '📦', '7z': '📦',
    mp3: '🎵', wav: '🎵',
    mp4: '🎬', avi: '🎬', mkv: '🎬',
    jpg: '🖼️', jpeg: '🖼️', png: '🖼️', gif: '🖼️',
    js: '📜', ts: '📜',
    default: '📁'
  }
  return icons[ext] || icons.default
}
 
// 生命周期
onMounted(() => {
  connectWebSocket()
})
 
onUnmounted(() => {
  ws?.close()
})
</script>
 
<style scoped>
.file-uploader {
  max-width: 600px;
  margin: 0 auto;
  padding: 20px;
}
 
.upload-area {
  border: 2px dashed #ccc;
  border-radius: 8px;
  padding: 40px;
  text-align: center;
  cursor: pointer;
  transition: all 0.3s;
}
 
.upload-area:hover {
  border-color: #1976d2;
  background: #f5f5f5;
}
 
.upload-area button {
  background: #1976d2;
  color: white;
  border: none;
  padding: 10px 20px;
  border-radius: 4px;
  cursor: pointer;
  margin-right: 12px;
}
 
.upload-list {
  margin-top: 20px;
}
 
.upload-item {
  display: flex;
  align-items: center;
  gap: 16px;
  padding: 12px;
  border: 1px solid #eee;
  border-radius: 8px;
  margin-bottom: 12px;
}
 
.file-info {
  display: flex;
  align-items: center;
  gap: 12px;
  min-width: 200px;
}
 
.file-icon {
  font-size: 32px;
}
 
.file-name {
  font-weight: 500;
  margin-bottom: 4px;
}
 
.file-size {
  font-size: 12px;
  color: #888;
}
 
.upload-progress {
  flex: 1;
}
 
.progress-bar {
  height: 8px;
  background: #eee;
  border-radius: 4px;
  overflow: hidden;
}
 
.progress-fill {
  height: 100%;
  background: #1976d2;
  transition: width 0.3s;
}
 
.progress-fill.error {
  background: #f44336;
}
 
.progress-info {
  display: flex;
  justify-content: space-between;
  margin-top: 4px;
  font-size: 12px;
  color: #666;
}
 
.upload-status {
  text-align: right;
  min-width: 80px;
}
 
.status-pending { color: #888; }
.status-uploading { color: #1976d2; }
.status-completed { color: #4caf50; }
.status-error { color: #f44336; }
 
.retry-btn {
  margin-top: 4px;
  padding: 4px 8px;
  font-size: 12px;
  background: #f44336;
  color: white;
  border: none;
  border-radius: 4px;
  cursor: pointer;
}
 
.upload-summary {
  display: flex;
  justify-content: space-between;
  padding: 12px;
  background: #f5f5f5;
  border-radius: 8px;
  font-size: 14px;
  color: #666;
}
</style>

五、常见问题 #

Q1: WebSocket 和 SSE 如何选择? #

选 WebSocket:

  • 需要双向通信(聊天、游戏、协作编辑)
  • 需要传输二进制数据
  • 需要高频率的客户端→服务器消息

选 SSE:

  • 只需要服务器推送(通知、股票行情、日志流)
  • 想要更简单的实现
  • 需要断点续传(通过 last-event-id)
  • HTTP/HTTPS 协议,更容易穿透防火墙

Q2: WebSocket 连接经常断开怎么办? #

原因分析:

  • 网络不稳定
  • 服务器超时断开
  • 代理/防火墙限制

解决方案:

// 1. 心跳保活
setInterval(() => ws.send('ping'), 30000)
 
// 2. 自动重连(指数退避)
const reconnect = (delay = 1000) => {
  setTimeout(() => {
    ws = new WebSocket(url)
    ws.onclose = () => reconnect(Math.min(delay * 2, 30000))
  }, delay)
}
 
// 3. 检测僵尸连接
ws.onmessage = (e) => {
  if (e.data === 'pong') {
    lastPongTime = Date.now()
  }
}
setInterval(() => {
  if (Date.now() - lastPongTime > 60000) {
    ws.close() // 超过60秒没响应,主动断开重连
  }
}, 30000)

Q3: SSE 在 IE 浏览器兼容吗? #

原生 SSE 在 IE 不支持,需要 polyfill:

// 使用 eventsource-polyfill
import { EventSourcePolyfill } from 'eventsource-polyfill'
 
const eventSource = new EventSourcePolyfill(url, {
  headers: {
    'Authorization': 'Bearer token'
  }
})

Q4: 如何处理 WebSocket 消息顺序问题? #

WebSocket 本身是有序的,但在处理时可能出现乱序:

// 使用消息队列和序号
const messageQueue = []
let expectedSeq = 0
 
ws.onmessage = (event) => {
  const message = JSON.parse(event.data)
  messageQueue.push(message)
  processQueue()
}
 
const processQueue = () => {
  // 按序号处理
  while (messageQueue.length > 0) {
    const msg = messageQueue.find(m => m.seq === expectedSeq)
    if (msg) {
      handleMessage(msg)
      messageQueue.splice(messageQueue.indexOf(msg), 1)
      expectedSeq++
    } else {
      break // 等待缺失的消息
    }
  }
}

Q5: 如何在生产环境部署 WebSocket? #

Nginx 配置:

upstream websocket {
  server backend1:8080;
  server backend2:8080;
}
 
server {
  listen 80;
  server_name example.com;
 
  location /ws {
    proxy_pass http://websocket;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_read_timeout 3600s; # 1小时超时
    proxy_send_timeout 3600s;
  }
}

负载均衡注意:

  • 使用 sticky session 或共享存储(Redis)
  • 推荐使用共享存储,把消息发布到 Redis,所有节点订阅

Q6: 如何调试 WebSocket? #

浏览器 DevTools:

  1. 打开 Network 标签
  2. 筛选 WS(WebSocket)
  3. 点击连接查看 Frames(消息)

Chrome 扩展:

  • WebSocket King Client
  • WebSocket Test Client

Node.js 调试:

const WebSocket = require('ws')
const ws = new WebSocket('ws://localhost:8080')
 
ws.on('open', () => console.log('Connected'))
ws.on('message', data => console.log('Received:', data.toString()))
ws.on('error', err => console.error('Error:', err))
 
// 发送测试消息
ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'test', data: 'hello' }))
})

六、总结速记 #

WebSocket 核心要点 #

类型 要点
协议 ws:// 或 wss://,一次握手持久连接
状态 0=CONNECTING, 1=OPEN, 2=CLOSING, 3=CLOSED
事件 onopen, onmessage, onclose, onerror
方法 send(), close()
保活 心跳检测(ping/pong)
重连 需手动实现(指数退避)
代理 Nginx 需要 upgrade 头

SSE 核心要点 #

类型 要点
协议 HTTP/HTTPS,Content-Type: text/event-stream
API EventSource,自动重连
事件 onopen, onmessage, onerror, addEventListener
格式 data: 消息内容\n\n
断点续传 通过 last-event-id
兼容性 IE 需 polyfill

Vue 3 组合式 API 封装 #

// useWebSocket
const { data, status, send, close } = useWebSocket(url, options)
 
// useSSE
const { data, status, lastEventId, close } = useSSE(url, options)

最佳实践 #

场景 推荐
聊天/游戏 WebSocket
通知推送 SSE
协作编辑 WebSocket + OT
数据大屏 SSE
文件上传 WebSocket(进度)
断点续传 SSE(last-event-id)
高并发 WebSocket + Redis Pub/Sub
穿透防火墙 SSE 更友好

附录 #

推荐库 #

库名 用途 Stars
socket.io WebSocket 封装,自动重连 ⭐ 60k+
ws Node.js WebSocket 服务器 ⭐ 21k+
EventSource 原生 SSE API -
eventsource Node.js SSE 客户端 ⭐ 1k+
reconnecting-websocket 自动重连 WebSocket ⭐ 1k+

官方文档 #


最后更新:2026-04-01