← 返回首页
文章
2026-04-01
WebSocket 和 SSE 实战教程
WebSocket 和 SSE 实战教程:Vue 3 实时通信全攻略 从零掌握两种实时通信技术,5个实战场景 + 完整代码,搞定实时聊天、推送通知、数据大屏! 一、核心概念 1.1 什么是实时通信? 传统网页是"你问我答...
WebSocket 和 SSE 实战教程:Vue 3 实时通信全攻略 #
从零掌握两种实时通信技术,5个实战场景 + 完整代码,搞定实时聊天、推送通知、数据大屏!
一、核心概念 #
1.1 什么是实时通信? #
传统网页是"你问我答"模式:用户点击按钮 → 发送请求 → 服务器返回结果。
但有些场景需要"服务器主动说话":
- 📱 微信收到新消息,不用刷新页面
- 📈 股票价格实时变动
- 🔔 有新通知自动弹出
- 🎮 多人在线游戏同步
这就是实时通信,常见技术有两种:WebSocket 和 SSE。
1.2 WebSocket vs SSE:选哪个? #
| 特性 | WebSocket | SSE (Server-Sent Events) |
|---|---|---|
| 通信方向 | 双向(客户端 ↔ 服务器) | 单向(服务器 → 客户端) |
| 协议 | ws:// 或 wss:// | HTTP/HTTPS |
| 数据格式 | 文本、二进制 | 仅文本(UTF-8) |
| 断线重连 | 需手动实现 | 浏览器自动重连 |
| 兼容性 | IE10+ | IE 不支持(需 polyfill) |
| 适用场景 | 聊天、游戏、协作编辑 | 推送通知、股票行情、日志流 |
一句话总结:
- 需要双向通信(客户端也要发消息)→ WebSocket
- 只需要服务器推送(客户端只接收)→ SSE 更简单
1.3 核心名词解释 #
| 名词 | 解释 | 类比 |
|---|---|---|
| WebSocket | 全双工通信协议,一次握手,持久连接 | 电话通话(双方都能说话) |
| SSE | 服务器向客户端推送事件流 | 收音机(只能听广播) |
| Handshake | WebSocket 连接建立过程 | 拨号建立通话 |
| Heartbeat | 心跳检测,保持连接活跃 | 定期说"喂,还在吗?" |
| Reconnect | 断线重连机制 | 信号不好自动重拨 |
二、基础使用 #
2.1 WebSocket 基础 #
原生 WebSocket API #
// 创建连接
const ws = new WebSocket('wss://example.com/ws')
// 连接成功
ws.onopen = () => {
console.log('连接成功')
ws.send('Hello Server!') // 发送消息
}
// 接收消息
ws.onmessage = (event) => {
console.log('收到消息:', event.data)
}
// 连接关闭
ws.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason)
}
// 连接错误
ws.onerror = (error) => {
console.error('连接错误:', error)
}
// 主动关闭连接
ws.close(1000, '正常关闭')readyState 状态值 #
| 状态值 | 常量 | 说明 |
|---|---|---|
| 0 | CONNECTING | 连接中 |
| 1 | OPEN | 已连接,可通信 |
| 2 | CLOSING | 关闭中 |
| 3 | CLOSED | 已关闭 |
2.2 SSE 基础 #
原生 EventSource API #
// 创建连接(只能 GET 请求)
const eventSource = new EventSource('https://example.com/events')
// 连接成功
eventSource.onopen = () => {
console.log('SSE 连接成功')
}
// 接收默认消息
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data)
}
// 监听自定义事件
eventSource.addEventListener('notification', (event) => {
console.log('收到通知:', event.data)
})
// 监听股票更新事件
eventSource.addEventListener('stock', (event) => {
const data = JSON.parse(event.data)
console.log('股票价格:', data.price)
})
// 连接错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error)
// 浏览器会自动重连
}
// 关闭连接
eventSource.close()SSE 服务器响应格式 #
服务器返回的 Content-Type 必须是 text/event-stream:
Content-Type: text/event-stream
// 默认消息
data: Hello World
// 带事件名称
event: notification
data: {"title": "新消息", "content": "有人评论了你的文章"}
// 带 ID(用于断点续传)
id: 123
event: stock
data: {"symbol": "AAPL", "price": 178.50}
// 多行数据
data: 第一行
data: 第二行三、进阶用法 #
3.1 WebSocket 重连机制 #
原生 WebSocket 没有自动重连,需要自己实现:
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url
this.reconnectInterval = options.reconnectInterval || 1000
this.maxReconnectInterval = options.maxReconnectInterval || 30000
this.reconnectDecay = options.reconnectDecay || 1.5
this.timeoutInterval = options.timeoutInterval || 5000
this.maxReconnectAttempts = options.maxReconnectAttempts || 10
this.ws = null
this.reconnectAttempts = 0
this.forcedClose = false
this.timedOut = false
// 事件回调
this.onopen = null
this.onmessage = null
this.onclose = null
this.onerror = null
this.connect()
}
connect() {
this.ws = new WebSocket(this.url)
// 超时检测
const timeout = setTimeout(() => {
this.timedOut = true
this.ws.close()
this.timedOut = false
}, this.timeoutInterval)
this.ws.onopen = (event) => {
clearTimeout(timeout)
this.reconnectAttempts = 0
this.onopen?.(event)
}
this.ws.onmessage = (event) => {
this.onmessage?.(event)
}
this.ws.onclose = (event) => {
clearTimeout(timeout)
this.onclose?.(event)
// 非主动关闭,尝试重连
if (!this.forcedClose && this.reconnectAttempts < this.maxReconnectAttempts) {
const interval = Math.min(
this.reconnectInterval * Math.pow(this.reconnectDecay, this.reconnectAttempts),
this.maxReconnectInterval
)
setTimeout(() => {
this.reconnectAttempts++
this.connect()
}, interval)
}
}
this.ws.onerror = (error) => {
this.onerror?.(error)
}
}
send(data) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(data)
}
}
close() {
this.forcedClose = true
this.ws?.close()
}
get readyState() {
return this.ws?.readyState ?? WebSocket.CLOSED
}
}
// 使用示例
const ws = new ReconnectingWebSocket('wss://example.com/ws', {
reconnectInterval: 1000,
maxReconnectAttempts: 10
})
ws.onopen = () => console.log('连接成功')
ws.onmessage = (e) => console.log('消息:', e.data)
ws.onclose = () => console.log('连接关闭')3.2 心跳检测 #
保持连接活跃,检测僵尸连接:
class WebSocketWithHeartbeat {
constructor(url, options = {}) {
this.url = url
this.heartbeatInterval = options.heartbeatInterval || 30000 // 30秒
this.heartbeatTimeout = options.heartbeatTimeout || 5000 // 5秒
this.ws = null
this.heartbeatTimer = null
this.heartbeatTimeoutTimer = null
this.connect()
}
connect() {
this.ws = new WebSocket(this.url)
this.ws.onopen = () => {
console.log('连接成功')
this.startHeartbeat()
}
this.ws.onmessage = (event) => {
// 收到心跳响应,清除超时定时器
if (event.data === 'pong') {
this.clearHeartbeatTimeout()
return
}
console.log('业务消息:', event.data)
}
this.ws.onclose = () => {
this.stopHeartbeat()
// 重连逻辑...
}
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
// 发送心跳
this.ws.send('ping')
// 设置超时检测
this.heartbeatTimeoutTimer = setTimeout(() => {
console.warn('心跳超时,断开重连')
this.ws.close()
}, this.heartbeatTimeout)
}
}, this.heartbeatInterval)
}
stopHeartbeat() {
clearInterval(this.heartbeatTimer)
this.clearHeartbeatTimeout()
}
clearHeartbeatTimeout() {
clearTimeout(this.heartbeatTimeoutTimer)
this.heartbeatTimeoutTimer = null
}
}
// 使用示例
const ws = new WebSocketWithHeartbeat('wss://example.com/ws', {
heartbeatInterval: 30000, // 30秒发一次心跳
heartbeatTimeout: 5000 // 5秒没响应算超时
})3.3 Vue 3 组合式 API 封装 #
封装成可复用的 composable:
// composables/useWebSocket.js
import { ref, onMounted, onUnmounted } from 'vue'
export function useWebSocket(url, options = {}) {
const data = ref(null)
const status = ref('connecting') // connecting | open | closed | error
const error = ref(null)
let ws = null
let reconnectAttempts = 0
let reconnectTimer = null
const {
reconnect = true,
reconnectInterval = 1000,
maxReconnectAttempts = 5,
heartbeat = false,
heartbeatInterval = 30000,
onOpen,
onMessage,
onClose,
onError
} = options
const connect = () => {
status.value = 'connecting'
ws = new WebSocket(url)
ws.onopen = (event) => {
status.value = 'open'
error.value = null
reconnectAttempts = 0
onOpen?.(event)
if (heartbeat) {
startHeartbeat()
}
}
ws.onmessage = (event) => {
data.value = event.data
onMessage?.(event.data)
}
ws.onclose = (event) => {
status.value = 'closed'
onClose?.(event)
if (reconnect && reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++
reconnectTimer = setTimeout(connect, reconnectInterval * reconnectAttempts)
}
}
ws.onerror = (err) => {
status.value = 'error'
error.value = err
onError?.(err)
}
}
const send = (message) => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(typeof message === 'string' ? message : JSON.stringify(message))
return true
}
return false
}
const close = () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
}
stopHeartbeat()
ws?.close()
}
// 心跳逻辑
let heartbeatTimer = null
const startHeartbeat = () => {
heartbeatTimer = setInterval(() => {
send('ping')
}, heartbeatInterval)
}
const stopHeartbeat = () => {
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
}
onMounted(() => {
connect()
})
onUnmounted(() => {
close()
})
return {
data,
status,
error,
send,
close,
reconnect: connect
}
}3.4 SSE 组合式 API 封装 #
// composables/useSSE.js
import { ref, onMounted, onUnmounted } from 'vue'
export function useSSE(url, options = {}) {
const data = ref(null)
const status = ref('connecting') // connecting | open | closed | error
const error = ref(null)
const lastEventId = ref(null)
let eventSource = null
const {
withCredentials = false,
onOpen,
onMessage,
onError,
events = {} // 自定义事件监听
} = options
const connect = () => {
status.value = 'connecting'
// 断点续传:带上上次的事件 ID
const urlWithId = lastEventId.value
? `${url}?lastEventId=${lastEventId.value}`
: url
eventSource = new EventSource(urlWithId, { withCredentials })
eventSource.onopen = (event) => {
status.value = 'open'
error.value = null
onOpen?.(event)
}
eventSource.onmessage = (event) => {
lastEventId.value = event.lastEventId
data.value = event.data
onMessage?.(event.data)
}
// 注册自定义事件
Object.entries(events).forEach(([eventName, handler]) => {
eventSource.addEventListener(eventName, (event) => {
lastEventId.value = event.lastEventId
handler(JSON.parse(event.data))
})
})
eventSource.onerror = (err) => {
status.value = 'error'
error.value = err
onError?.(err)
// SSE 会自动重连
}
}
const close = () => {
eventSource?.close()
status.value = 'closed'
}
onMounted(() => {
connect()
})
onUnmounted(() => {
close()
})
return {
data,
status,
error,
lastEventId,
close,
reconnect: connect
}
}四、实战场景 #
场景 1:实时聊天室(WebSocket) #
需求:多人在线聊天,支持私聊、群聊、消息已读状态
Vue 3 聊天组件 #
<!-- components/ChatRoom.vue -->
<template>
<div class="chat-room">
<!-- 在线用户列表 -->
<div class="users-panel">
<h3>在线用户 ({{ onlineUsers.length }})</h3>
<ul>
<li v-for="user in onlineUsers" :key="user.id">
<span class="status-dot" :class="{ online: user.online }"></span>
{{ user.name }}
</li>
</ul>
</div>
<!-- 消息列表 -->
<div class="messages-panel" ref="messagesContainer">
<div
v-for="msg in messages"
:key="msg.id"
:class="['message', { 'my-message': msg.userId === currentUserId }]"
>
<div class="message-header">
<span class="username">{{ msg.username }}</span>
<span class="time">{{ formatTime(msg.timestamp) }}</span>
</div>
<div class="message-content">{{ msg.content }}</div>
<div v-if="msg.userId !== currentUserId" class="message-status">
{{ msg.read ? '已读' : '未读' }}
</div>
</div>
</div>
<!-- 输入框 -->
<div class="input-panel">
<input
v-model="inputMessage"
@keyup.enter="sendMessage"
placeholder="输入消息..."
:disabled="wsStatus !== 'open'"
/>
<button @click="sendMessage" :disabled="!inputMessage || wsStatus !== 'open'">
发送
</button>
</div>
<!-- 连接状态 -->
<div class="status-bar" :class="wsStatus">
{{ statusText }}
</div>
</div>
</template>
<script setup>
import { ref, computed, watch, nextTick, onMounted, onUnmounted } from 'vue'
// Props
const props = defineProps({
roomId: {
type: String,
required: true
},
currentUserId: {
type: String,
required: true
}
})
// 状态
const messages = ref([])
const onlineUsers = ref([])
const inputMessage = ref('')
const wsStatus = ref('connecting')
const messagesContainer = ref(null)
const error = ref(null)
let ws = null
let reconnectAttempts = 0
const maxReconnectAttempts = 10
// 计算属性
const statusText = computed(() => {
switch (wsStatus.value) {
case 'connecting': return '连接中...'
case 'open': return '已连接'
case 'closed': return '已断开'
case 'error': return `连接错误: ${error.value?.message || '未知错误'}`
default: return '未知状态'
}
})
// WebSocket 连接
const connect = () => {
const wsUrl = `${import.meta.env.VITE_WS_URL}/chat?roomId=${props.roomId}&userId=${props.currentUserId}`
ws = new WebSocket(wsUrl)
ws.onopen = () => {
wsStatus.value = 'open'
reconnectAttempts = 0
// 发送心跳
startHeartbeat()
}
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
handleMessage(data)
}
ws.onclose = () => {
wsStatus.value = 'closed'
stopHeartbeat()
attemptReconnect()
}
ws.onerror = (err) => {
wsStatus.value = 'error'
error.value = err
}
}
// 处理消息
const handleMessage = (data) => {
switch (data.type) {
case 'message':
messages.value.push(data.payload)
scrollToBottom()
break
case 'userList':
onlineUsers.value = data.payload
break
case 'userJoin':
onlineUsers.value.push(data.payload)
break
case 'userLeave':
onlineUsers.value = onlineUsers.value.filter(u => u.id !== data.payload.userId)
break
case 'messageRead':
// 更新消息已读状态
const msg = messages.value.find(m => m.id === data.payload.messageId)
if (msg) msg.read = true
break
}
}
// 发送消息
const sendMessage = () => {
if (!inputMessage.value.trim() || wsStatus.value !== 'open') return
const message = {
type: 'message',
payload: {
content: inputMessage.value.trim(),
userId: props.currentUserId,
timestamp: Date.now()
}
}
ws.send(JSON.stringify(message))
inputMessage.value = ''
}
// 重连机制
const attemptReconnect = () => {
if (reconnectAttempts >= maxReconnectAttempts) {
error.value = new Error('重连失败,请刷新页面')
return
}
reconnectAttempts++
setTimeout(() => {
connect()
}, Math.min(1000 * reconnectAttempts, 30000))
}
// 心跳保活
let heartbeatTimer = null
const startHeartbeat = () => {
heartbeatTimer = setInterval(() => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }))
}
}, 30000)
}
const stopHeartbeat = () => {
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
}
// 滚动到底部
const scrollToBottom = () => {
nextTick(() => {
if (messagesContainer.value) {
messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight
}
})
}
// 格式化时间
const formatTime = (timestamp) => {
return new Date(timestamp).toLocaleTimeString('zh-CN', {
hour: '2-digit',
minute: '2-digit'
})
}
// 监听房间变化
watch(() => props.roomId, () => {
messages.value = []
ws?.close()
connect()
})
// 生命周期
onMounted(() => {
connect()
})
onUnmounted(() => {
stopHeartbeat()
ws?.close()
})
</script>
<style scoped>
.chat-room {
display: grid;
grid-template-columns: 200px 1fr;
grid-template-rows: 1fr auto auto;
height: 100vh;
}
.users-panel {
grid-row: 1 / 3;
border-right: 1px solid #ddd;
padding: 16px;
overflow-y: auto;
}
.messages-panel {
padding: 16px;
overflow-y: auto;
}
.message {
margin-bottom: 12px;
padding: 8px 12px;
border-radius: 8px;
background: #f5f5f5;
}
.message.my-message {
background: #e3f2fd;
margin-left: auto;
max-width: 70%;
}
.input-panel {
display: flex;
gap: 8px;
padding: 16px;
border-top: 1px solid #ddd;
}
.input-panel input {
flex: 1;
padding: 8px 12px;
border: 1px solid #ddd;
border-radius: 4px;
}
.status-bar {
padding: 8px;
text-align: center;
font-size: 12px;
}
.status-bar.open { background: #c8e6c9; }
.status-bar.connecting { background: #fff9c4; }
.status-bar.closed, .status-bar.error { background: #ffcdd2; }
.status-dot {
display: inline-block;
width: 8px;
height: 8px;
border-radius: 50%;
background: #ccc;
margin-right: 8px;
}
.status-dot.online { background: #4caf50; }
</style>Node.js WebSocket 服务器(使用 ws 库) #
// server.js
const WebSocket = require('ws')
const http = require('http')
const server = http.createServer()
const wss = new WebSocket.Server({ server })
// 存储房间和用户信息
const rooms = new Map() // roomId -> Set<ws>
const userSockets = new Map() // userId -> ws
wss.on('connection', (ws, req) => {
// 解析 URL 参数
const url = new URL(req.url, `http://${req.headers.host}`)
const roomId = url.searchParams.get('roomId')
const userId = url.searchParams.get('userId')
// 绑定用户信息到 ws
ws.roomId = roomId
ws.userId = userId
ws.isAlive = true
// 加入房间
if (!rooms.has(roomId)) {
rooms.set(roomId, new Set())
}
rooms.get(roomId).add(ws)
userSockets.set(userId, ws)
// 通知房间内其他用户
broadcastToRoom(roomId, {
type: 'userJoin',
payload: { id: userId, name: `User${userId.slice(0, 4)}`, online: true }
}, ws)
// 发送当前在线用户列表
const users = Array.from(rooms.get(roomId))
.map(client => ({
id: client.userId,
name: `User${client.userId.slice(0, 4)}`,
online: true
}))
ws.send(JSON.stringify({ type: 'userList', payload: users }))
// 接收消息
ws.on('message', (data) => {
const message = JSON.parse(data.toString())
if (message.type === 'ping') {
ws.isAlive = true
ws.send(JSON.stringify({ type: 'pong' }))
return
}
if (message.type === 'message') {
const newMessage = {
id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
type: 'message',
payload: {
...message.payload,
id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
userId,
username: `User${userId.slice(0, 4)}`,
read: false
}
}
broadcastToRoom(roomId, newMessage)
}
})
// 断开连接
ws.on('close', () => {
rooms.get(roomId)?.delete(ws)
userSockets.delete(userId)
broadcastToRoom(roomId, {
type: 'userLeave',
payload: { userId }
})
})
})
// 广播到房间
function broadcastToRoom(roomId, message, excludeWs = null) {
const clients = rooms.get(roomId)
if (!clients) return
const data = JSON.stringify(message)
clients.forEach(client => {
if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
}
// 心跳检测
setInterval(() => {
wss.clients.forEach(ws => {
if (!ws.isAlive) {
return ws.terminate()
}
ws.isAlive = false
ws.send(JSON.stringify({ type: 'ping' }))
})
}, 30000)
server.listen(8080, () => {
console.log('WebSocket server running on ws://localhost:8080')
})场景 2:实时通知推送(SSE) #
需求:系统通知实时推送到前端,支持未读计数、通知类型分类
Vue 3 通知组件 #
<!-- components/NotificationCenter.vue -->
<template>
<div class="notification-center">
<!-- 通知图标 -->
<div class="notification-icon" @click="togglePanel">
<span class="icon">🔔</span>
<span v-if="unreadCount > 0" class="badge">{{ unreadCount > 99 ? '99+' : unreadCount }}</span>
</div>
<!-- 通知面板 -->
<div v-if="showPanel" class="notification-panel">
<div class="panel-header">
<h3>通知</h3>
<button @click="markAllRead" :disabled="unreadCount === 0">全部已读</button>
</div>
<div class="panel-tabs">
<button
v-for="tab in tabs"
:key="tab.key"
:class="['tab', { active: activeTab === tab.key }]"
@click="activeTab = tab.key"
>
{{ tab.label }}
<span v-if="tab.count > 0" class="tab-count">{{ tab.count }}</span>
</button>
</div>
<div class="panel-content">
<div v-if="filteredNotifications.length === 0" class="empty">
暂无通知
</div>
<div
v-for="notification in filteredNotifications"
:key="notification.id"
:class="['notification-item', { unread: !notification.read }]"
@click="handleNotificationClick(notification)"
>
<div class="notification-icon-type" :class="notification.type">
{{ getNotificationIcon(notification.type) }}
</div>
<div class="notification-content">
<div class="notification-title">{{ notification.title }}</div>
<div class="notification-message">{{ notification.message }}</div>
<div class="notification-time">{{ formatTime(notification.createdAt) }}</div>
</div>
<button class="mark-read-btn" @click.stop="markRead(notification.id)">
{{ notification.read ? '已读' : '标记已读' }}
</button>
</div>
</div>
<div class="panel-footer">
<span :class="['status', sseStatus]">
{{ sseStatus === 'open' ? '实时更新中' : '连接断开' }}
</span>
</div>
</div>
</div>
</template>
<script setup>
import { ref, computed, onMounted, onUnmounted } from 'vue'
// Props
const props = defineProps({
userId: {
type: String,
required: true
}
})
// 状态
const notifications = ref([])
const showPanel = ref(false)
const activeTab = ref('all')
const sseStatus = ref('connecting')
let eventSource = null
// 通知类型
const tabs = computed(() => [
{
key: 'all',
label: '全部',
count: notifications.value.filter(n => !n.read).length
},
{
key: 'system',
label: '系统',
count: notifications.value.filter(n => n.type === 'system' && !n.read).length
},
{
key: 'message',
label: '消息',
count: notifications.value.filter(n => n.type === 'message' && !n.read).length
},
{
key: 'alert',
label: '警告',
count: notifications.value.filter(n => n.type === 'alert' && !n.read).length
}
])
// 未读数量
const unreadCount = computed(() => {
return notifications.value.filter(n => !n.read).length
})
// 过滤后的通知
const filteredNotifications = computed(() => {
if (activeTab.value === 'all') {
return notifications.value
}
return notifications.value.filter(n => n.type === activeTab.value)
})
// SSE 连接
const connectSSE = () => {
const sseUrl = `${import.meta.env.VITE_API_URL}/notifications/stream?userId=${props.userId}`
eventSource = new EventSource(sseUrl)
eventSource.onopen = () => {
sseStatus.value = 'open'
console.log('SSE 连接成功')
}
// 默认消息
eventSource.onmessage = (event) => {
const notification = JSON.parse(event.data)
addNotification(notification)
}
// 自定义事件:新通知
eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data)
addNotification(notification)
})
// 自定义事件:通知已读
eventSource.addEventListener('read', (event) => {
const { notificationId } = JSON.parse(event.data)
markRead(notificationId)
})
// 自定义事件:全部已读
eventSource.addEventListener('readAll', () => {
notifications.value.forEach(n => n.read = true)
})
eventSource.onerror = () => {
sseStatus.value = 'error'
// SSE 会自动重连
}
}
// 添加通知
const addNotification = (notification) => {
// 避免重复
if (notifications.value.some(n => n.id === notification.id)) {
return
}
notifications.value.unshift({
...notification,
read: false,
createdAt: notification.createdAt || new Date().toISOString()
})
// 显示桌面通知
if (Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.message,
icon: '/favicon.ico'
})
}
}
// 标记已读
const markRead = async (notificationId) => {
const notification = notifications.value.find(n => n.id === notificationId)
if (notification) {
notification.read = true
// 通知服务器
await fetch(`${import.meta.env.VITE_API_URL}/notifications/${notificationId}/read`, {
method: 'POST'
})
}
}
// 全部标记已读
const markAllRead = async () => {
notifications.value.forEach(n => n.read = true)
await fetch(`${import.meta.env.VITE_API_URL}/notifications/read-all?userId=${props.userId}`, {
method: 'POST'
})
}
// 点击通知
const handleNotificationClick = async (notification) => {
await markRead(notification.id)
// 跳转到相关页面
if (notification.link) {
window.location.href = notification.link
}
}
// 切换面板
const togglePanel = () => {
showPanel.value = !showPanel.value
}
// 获取通知图标
const getNotificationIcon = (type) => {
const icons = {
system: '⚙️',
message: '💬',
alert: '⚠️',
success: '✅',
warning: '🔔'
}
return icons[type] || '📌'
}
// 格式化时间
const formatTime = (isoString) => {
const date = new Date(isoString)
const now = new Date()
const diff = now - date
if (diff < 60000) return '刚刚'
if (diff < 3600000) return `${Math.floor(diff / 60000)} 分钟前`
if (diff < 86400000) return `${Math.floor(diff / 3600000)} 小时前`
return date.toLocaleDateString('zh-CN')
}
// 请求桌面通知权限
const requestNotificationPermission = async () => {
if ('Notification' in window && Notification.permission === 'default') {
await Notification.requestPermission()
}
}
// 生命周期
onMounted(() => {
requestNotificationPermission()
connectSSE()
})
onUnmounted(() => {
eventSource?.close()
})
</script>
<style scoped>
.notification-center {
position: relative;
}
.notification-icon {
position: relative;
cursor: pointer;
}
.notification-icon .icon {
font-size: 24px;
}
.badge {
position: absolute;
top: -5px;
right: -5px;
background: #f44336;
color: white;
font-size: 10px;
padding: 2px 6px;
border-radius: 10px;
}
.notification-panel {
position: absolute;
top: 100%;
right: 0;
width: 360px;
max-height: 480px;
background: white;
border-radius: 8px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
z-index: 1000;
display: flex;
flex-direction: column;
}
.panel-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 12px 16px;
border-bottom: 1px solid #eee;
}
.panel-tabs {
display: flex;
border-bottom: 1px solid #eee;
}
.tab {
flex: 1;
padding: 8px;
background: none;
border: none;
cursor: pointer;
font-size: 13px;
}
.tab.active {
border-bottom: 2px solid #1976d2;
color: #1976d2;
}
.tab-count {
margin-left: 4px;
background: #eee;
padding: 2px 6px;
border-radius: 10px;
font-size: 11px;
}
.panel-content {
flex: 1;
overflow-y: auto;
}
.notification-item {
display: flex;
gap: 12px;
padding: 12px 16px;
border-bottom: 1px solid #f5f5f5;
cursor: pointer;
}
.notification-item:hover {
background: #fafafa;
}
.notification-item.unread {
background: #e3f2fd;
}
.notification-icon-type {
width: 36px;
height: 36px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-size: 18px;
}
.notification-icon-type.system { background: #e3f2fd; }
.notification-icon-type.message { background: #f3e5f5; }
.notification-icon-type.alert { background: #fff3e0; }
.notification-content {
flex: 1;
}
.notification-title {
font-weight: 500;
margin-bottom: 4px;
}
.notification-message {
font-size: 13px;
color: #666;
margin-bottom: 4px;
}
.notification-time {
font-size: 11px;
color: #999;
}
.mark-read-btn {
font-size: 11px;
padding: 4px 8px;
background: none;
border: 1px solid #ddd;
border-radius: 4px;
cursor: pointer;
}
.panel-footer {
padding: 8px 16px;
border-top: 1px solid #eee;
text-align: center;
}
.status {
font-size: 12px;
color: #999;
}
.status.open { color: #4caf50; }
.status.error { color: #f44336; }
.empty {
padding: 40px;
text-align: center;
color: #999;
}
</style>Node.js SSE 服务器(Express) #
// server.js
const express = require('express')
const cors = require('cors')
const app = express()
app.use(cors())
app.use(express.json())
// 存储客户端连接
const clients = new Map() // userId -> res
// SSE 端点
app.get('/notifications/stream', (req, res) => {
const userId = req.query.userId
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no') // Nginx 需要这个
// 保存连接
clients.set(userId, res)
console.log(`User ${userId} connected. Total clients: ${clients.size}`)
// 发送初始连接成功事件
res.write(`event: connected\ndata: ${JSON.stringify({ userId })}\n\n`)
// 定期发送心跳,保持连接
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n') // 注释行作为心跳
}, 15000)
// 客户端断开
req.on('close', () => {
clearInterval(heartbeat)
clients.delete(userId)
console.log(`User ${userId} disconnected. Total clients: ${clients.size}`)
})
})
// 发送通知 API
app.post('/notifications/send', (req, res) => {
const { userId, notification } = req.body
const client = clients.get(userId)
if (client) {
// 发送 SSE 事件
client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
res.json({ success: true, message: 'Notification sent' })
} else {
res.json({ success: false, message: 'User not connected' })
}
})
// 批量发送通知
app.post('/notifications/broadcast', (req, res) => {
const { notification, userIds } = req.body
let sentCount = 0
const targetUsers = userIds || Array.from(clients.keys())
targetUsers.forEach(userId => {
const client = clients.get(userId)
if (client) {
client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
sentCount++
}
})
res.json({ success: true, sentCount })
})
// 标记已读
app.post('/notifications/:id/read', (req, res) => {
// 这里应该更新数据库
// 通知客户端更新状态
res.json({ success: true })
})
// 全部已读
app.post('/notifications/read-all', (req, res) => {
const { userId } = req.query
const client = clients.get(userId)
if (client) {
client.write(`event: readAll\ndata: {}\n\n`)
}
res.json({ success: true })
})
// 模拟发送通知(测试用)
app.post('/notifications/test', (req, res) => {
const notification = {
id: `notif-${Date.now()}`,
type: ['system', 'message', 'alert'][Math.floor(Math.random() * 3)],
title: '测试通知',
message: '这是一条测试通知消息',
createdAt: new Date().toISOString()
}
// 广播给所有客户端
clients.forEach(client => {
client.write(`event: notification\ndata: ${JSON.stringify(notification)}\n\n`)
})
res.json({ success: true, clientCount: clients.size })
})
const PORT = process.env.PORT || 3000
app.listen(PORT, () => {
console.log(`SSE server running on http://localhost:${PORT}`)
})场景 3:实时数据大屏(SSE + Vue 3) #
需求:实时展示业务数据,支持多种图表、自动刷新
<!-- components/RealtimeDashboard.vue -->
<template>
<div class="dashboard">
<div class="dashboard-header">
<h1>实时数据大屏</h1>
<div class="status">
<span :class="['connection-status', sseStatus]">
{{ sseStatus === 'open' ? '● 实时' : '○ 离线' }}
</span>
<span class="update-time">更新于: {{ lastUpdateTime }}</span>
</div>
</div>
<div class="dashboard-grid">
<!-- 关键指标卡片 -->
<div v-for="metric in metrics" :key="metric.key" class="metric-card">
<div class="metric-label">{{ metric.label }}</div>
<div class="metric-value">
<span class="value">{{ formatNumber(metric.value) }}</span>
<span :class="['trend', metric.trend]">
{{ metric.trend === 'up' ? '↑' : metric.trend === 'down' ? '↓' : '-' }}
{{ metric.change }}%
</span>
</div>
</div>
<!-- 实时折线图 -->
<div class="chart-card">
<h3>实时访问量</h3>
<div ref="lineChartRef" class="chart"></div>
</div>
<!-- 实时柱状图 -->
<div class="chart-card">
<h3>地区分布</h3>
<div ref="barChartRef" class="chart"></div>
</div>
<!-- 实时数据流 -->
<div class="data-stream-card">
<h3>实时数据流</h3>
<div class="data-stream">
<div
v-for="(item, index) in dataStream"
:key="item.id"
class="stream-item"
:style="{ animationDelay: `${index * 0.1}s` }"
>
<span class="stream-time">{{ item.time }}</span>
<span class="stream-type">{{ item.type }}</span>
<span class="stream-content">{{ item.content }}</span>
</div>
</div>
</div>
</div>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted, watch } from 'vue'
import * as echarts from 'echarts'
// 状态
const sseStatus = ref('connecting')
const lastUpdateTime = ref('-')
const metrics = ref([
{ key: 'totalUsers', label: '总用户数', value: 0, trend: 'up', change: 0 },
{ key: 'activeUsers', label: '在线用户', value: 0, trend: 'up', change: 0 },
{ key: 'pageViews', label: '页面浏览', value: 0, trend: 'up', change: 0 },
{ key: 'conversions', label: '转化数', value: 0, trend: 'down', change: 0 }
])
const dataStream = ref([])
const lineChartRef = ref(null)
const barChartRef = ref(null)
let eventSource = null
let lineChart = null
let barChart = null
let lineChartData = []
// SSE 连接
const connectSSE = () => {
const sseUrl = `${import.meta.env.VITE_API_URL}/dashboard/stream`
eventSource = new EventSource(sseUrl)
eventSource.onopen = () => {
sseStatus.value = 'open'
}
// 实时指标更新
eventSource.addEventListener('metrics', (event) => {
const data = JSON.parse(event.data)
updateMetrics(data)
lastUpdateTime.value = new Date().toLocaleTimeString('zh-CN')
})
// 折线图数据
eventSource.addEventListener('lineChart', (event) => {
const data = JSON.parse(event.data)
updateLineChart(data)
})
// 柱状图数据
eventSource.addEventListener('barChart', (event) => {
const data = JSON.parse(event.data)
updateBarChart(data)
})
// 数据流
eventSource.addEventListener('dataStream', (event) => {
const data = JSON.parse(event.data)
addDataStreamItem(data)
})
eventSource.onerror = () => {
sseStatus.value = 'error'
}
}
// 更新指标
const updateMetrics = (data) => {
metrics.value = metrics.value.map(metric => ({
...metric,
value: data[metric.key]?.value ?? metric.value,
trend: data[metric.key]?.trend ?? metric.trend,
change: data[metric.key]?.change ?? metric.change
}))
}
// 初始化折线图
const initLineChart = () => {
if (!lineChartRef.value) return
lineChart = echarts.init(lineChartRef.value)
lineChart.setOption({
tooltip: { trigger: 'axis' },
xAxis: {
type: 'category',
data: []
},
yAxis: { type: 'value' },
series: [{
type: 'line',
smooth: true,
data: [],
areaStyle: { opacity: 0.3 }
}]
})
}
// 更新折线图
const updateLineChart = (data) => {
lineChartData.push(data)
// 只保留最近 30 个数据点
if (lineChartData.length > 30) {
lineChartData.shift()
}
lineChart?.setOption({
xAxis: {
data: lineChartData.map(d => d.time)
},
series: [{
data: lineChartData.map(d => d.value)
}]
})
}
// 初始化柱状图
const initBarChart = () => {
if (!barChartRef.value) return
barChart = echarts.init(barChartRef.value)
barChart.setOption({
tooltip: { trigger: 'axis' },
xAxis: { type: 'category' },
yAxis: { type: 'value' },
series: [{
type: 'bar',
data: []
}]
})
}
// 更新柱状图
const updateBarChart = (data) => {
barChart?.setOption({
xAxis: { data: data.map(d => d.name) },
series: [{ data: data.map(d => d.value) }]
})
}
// 添加数据流项
const addDataStreamItem = (item) => {
dataStream.value.unshift({
id: Date.now(),
time: new Date().toLocaleTimeString('zh-CN'),
type: item.type,
content: item.content
})
// 只保留最近 50 条
if (dataStream.value.length > 50) {
dataStream.value.pop()
}
}
// 格式化数字
const formatNumber = (num) => {
if (num >= 1000000) return (num / 1000000).toFixed(1) + 'M'
if (num >= 1000) return (num / 1000).toFixed(1) + 'K'
return num.toLocaleString()
}
// 窗口大小变化时重绘图表
const handleResize = () => {
lineChart?.resize()
barChart?.resize()
}
// 生命周期
onMounted(() => {
initLineChart()
initBarChart()
connectSSE()
window.addEventListener('resize', handleResize)
})
onUnmounted(() => {
eventSource?.close()
lineChart?.dispose()
barChart?.dispose()
window.removeEventListener('resize', handleResize)
})
</script>
<style scoped>
.dashboard {
background: #1a1a2e;
min-height: 100vh;
padding: 20px;
color: white;
}
.dashboard-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
}
.dashboard-header h1 {
font-size: 24px;
font-weight: 500;
}
.connection-status {
font-size: 14px;
margin-right: 16px;
}
.connection-status.open { color: #4caf50; }
.connection-status.error { color: #f44336; }
.update-time {
font-size: 12px;
color: #888;
}
.dashboard-grid {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 20px;
}
.metric-card {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 20px;
border-radius: 12px;
}
.metric-label {
font-size: 14px;
opacity: 0.8;
margin-bottom: 8px;
}
.metric-value {
display: flex;
align-items: baseline;
gap: 12px;
}
.metric-value .value {
font-size: 32px;
font-weight: 600;
}
.trend {
font-size: 14px;
}
.trend.up { color: #4caf50; }
.trend.down { color: #f44336; }
.chart-card {
grid-column: span 2;
background: #16213e;
padding: 20px;
border-radius: 12px;
}
.chart-card h3 {
margin-bottom: 16px;
font-weight: 500;
}
.chart {
height: 300px;
}
.data-stream-card {
grid-column: span 4;
background: #16213e;
padding: 20px;
border-radius: 12px;
max-height: 300px;
overflow: hidden;
}
.data-stream {
overflow-y: auto;
max-height: 220px;
}
.stream-item {
display: flex;
gap: 16px;
padding: 8px 12px;
margin-bottom: 4px;
background: rgba(255, 255, 255, 0.05);
border-radius: 4px;
animation: fadeIn 0.3s ease;
}
@keyframes fadeIn {
from { opacity: 0; transform: translateY(-10px); }
to { opacity: 1; transform: translateY(0); }
}
.stream-time {
color: #888;
font-size: 12px;
}
.stream-type {
background: #667eea;
padding: 2px 8px;
border-radius: 4px;
font-size: 12px;
}
.stream-content {
flex: 1;
font-size: 14px;
}
</style>场景 4:协作编辑(WebSocket + OT) #
需求:多人同时编辑文档,实时同步,支持操作转换(OT)
<!-- components/CollaborativeEditor.vue -->
<template>
<div class="collaborative-editor">
<div class="editor-header">
<div class="document-info">
<h2>{{ documentTitle }}</h2>
<span class="last-saved">最后保存: {{ lastSaved }}</span>
</div>
<div class="collaborators">
<div
v-for="user in collaborators"
:key="user.id"
class="collaborator"
:style="{ borderColor: user.color }"
>
<span class="avatar" :style="{ background: user.color }">
{{ user.name[0] }}
</span>
<span class="cursor-indicator" v-if="user.cursor">
{{ user.name }} 正在编辑...
</span>
</div>
</div>
</div>
<div class="editor-container">
<textarea
ref="textareaRef"
v-model="content"
@input="handleInput"
@select="handleSelect"
@keydown="handleKeydown"
placeholder="开始编辑..."
></textarea>
<!-- 远程光标 -->
<div
v-for="user in collaborators.filter(u => u.cursor && u.id !== currentUserId)"
:key="user.id"
class="remote-cursor"
:style="{
top: user.cursor.line * 20 + 'px',
left: user.cursor.column * 8 + 'px',
borderColor: user.color
}"
>
<span class="cursor-name" :style="{ background: user.color }">
{{ user.name }}
</span>
</div>
</div>
<div class="editor-footer">
<span :class="['sync-status', syncStatus]">
{{ syncStatusText }}
</span>
<span class="word-count">{{ content.length }} 字符</span>
</div>
</div>
</template>
<script setup>
import { ref, computed, onMounted, onUnmounted, watch } from 'vue'
// Props
const props = defineProps({
documentId: {
type: String,
required: true
},
currentUserId: {
type: String,
required: true
},
userName: {
type: String,
default: 'Anonymous'
}
})
// 状态
const documentTitle = ref('未命名文档')
const content = ref('')
const lastSaved = ref('-')
const syncStatus = ref('synced') // synced | syncing | offline
const collaborators = ref([])
const textareaRef = ref(null)
let ws = null
let pendingOps = []
let revision = 0
// 计算属性
const syncStatusText = computed(() => {
switch (syncStatus.value) {
case 'synced': return '已同步'
case 'syncing': return '同步中...'
case 'offline': return '离线'
default: return '未知'
}
})
// WebSocket 连接
const connect = () => {
const wsUrl = `${import.meta.env.VITE_WS_URL}/collab?docId=${props.documentId}&userId=${props.currentUserId}&userName=${encodeURIComponent(props.userName)}`
ws = new WebSocket(wsUrl)
ws.onopen = () => {
syncStatus.value = 'synced'
// 发送待处理的操作
flushPendingOps()
}
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
handleMessage(data)
}
ws.onclose = () => {
syncStatus.value = 'offline'
// 尝试重连
setTimeout(connect, 3000)
}
ws.onerror = () => {
syncStatus.value = 'offline'
}
}
// 处理消息
const handleMessage = (data) => {
switch (data.type) {
case 'init':
// 初始化文档
content.value = data.content
documentTitle.value = data.title
revision = data.revision
break
case 'op':
// 收到操作
applyOperation(data.operation, data.revision)
break
case 'ack':
// 操作确认
revision = data.revision
syncStatus.value = 'synced'
lastSaved.value = new Date().toLocaleTimeString('zh-CN')
break
case 'userJoin':
collaborators.value.push(data.user)
break
case 'userLeave':
collaborators.value = collaborators.value.filter(u => u.id !== data.userId)
break
case 'cursor':
// 更新远程光标
const user = collaborators.value.find(u => u.id === data.userId)
if (user) {
user.cursor = data.cursor
}
break
case 'users':
collaborators.value = data.users
break
}
}
// 处理输入
const handleInput = (event) => {
if (!textareaRef.value) return
const { selectionStart, selectionEnd } = textareaRef.value
const inputValue = event.target.value
// 检测变化
if (inputValue.length > content.value.length) {
// 插入操作
const inserted = inputValue.slice(selectionStart - (inputValue.length - content.value.length))
const op = {
type: 'insert',
position: selectionStart - inserted.length,
text: inserted
}
sendOperation(op)
} else if (inputValue.length < content.value.length) {
// 删除操作
const op = {
type: 'delete',
position: selectionEnd,
length: content.value.length - inputValue.length
}
sendOperation(op)
}
content.value = inputValue
syncStatus.value = 'syncing'
}
// 处理选择(光标位置)
const handleSelect = () => {
if (!textareaRef.value || !ws) return
const { selectionStart, selectionEnd } = textareaRef.value
const lines = content.value.substring(0, selectionStart).split('\n')
const cursor = {
line: lines.length - 1,
column: lines[lines.length - 1].length,
start: selectionStart,
end: selectionEnd
}
// 发送光标位置
ws.send(JSON.stringify({
type: 'cursor',
cursor
}))
}
// 发送操作
const sendOperation = (op) => {
const message = {
type: 'op',
operation: op,
revision
}
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message))
} else {
// 离线时缓存操作
pendingOps.push(message)
}
}
// 应用远程操作
const applyOperation = (op, remoteRevision) => {
// 简化的 OT 实现(生产环境应使用成熟的 OT 库如 ShareDB)
let newContent = content.value
if (op.type === 'insert') {
newContent = newContent.slice(0, op.position) + op.text + newContent.slice(op.position)
} else if (op.type === 'delete') {
newContent = newContent.slice(0, op.position) + newContent.slice(op.position + op.length)
}
content.value = newContent
revision = remoteRevision
}
// 发送待处理的操作
const flushPendingOps = () => {
pendingOps.forEach(op => {
ws.send(JSON.stringify(op))
})
pendingOps = []
}
// 生命周期
onMounted(() => {
connect()
})
onUnmounted(() => {
ws?.close()
})
</script>
<style scoped>
.collaborative-editor {
display: flex;
flex-direction: column;
height: 100vh;
background: #1e1e1e;
}
.editor-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 12px 20px;
background: #252526;
border-bottom: 1px solid #3c3c3c;
}
.document-info h2 {
margin: 0;
font-size: 16px;
color: #fff;
}
.last-saved {
font-size: 12px;
color: #888;
margin-left: 12px;
}
.collaborators {
display: flex;
gap: 8px;
}
.collaborator {
display: flex;
align-items: center;
padding: 4px 8px;
border: 2px solid transparent;
border-radius: 20px;
}
.avatar {
width: 24px;
height: 24px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
color: white;
font-size: 12px;
font-weight: 600;
}
.cursor-indicator {
font-size: 11px;
color: #888;
margin-left: 8px;
}
.editor-container {
flex: 1;
position: relative;
padding: 16px;
}
textarea {
width: 100%;
height: 100%;
background: transparent;
border: none;
color: #d4d4d4;
font-family: 'Monaco', 'Menlo', monospace;
font-size: 14px;
line-height: 20px;
resize: none;
outline: none;
}
.remote-cursor {
position: absolute;
width: 2px;
height: 20px;
pointer-events: none;
animation: blink 1s infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
.cursor-name {
position: absolute;
top: -20px;
left: 0;
padding: 2px 6px;
border-radius: 3px;
font-size: 11px;
color: white;
white-space: nowrap;
}
.editor-footer {
display: flex;
justify-content: space-between;
padding: 8px 20px;
background: #252526;
border-top: 1px solid #3c3c3c;
font-size: 12px;
color: #888;
}
.sync-status.synced { color: #4caf50; }
.sync-status.syncing { color: #ff9800; }
.sync-status.offline { color: #f44336; }
</style>场景 5:文件上传进度(WebSocket) #
需求:大文件上传,实时显示进度、速度、剩余时间
<!-- components/FileUploader.vue -->
<template>
<div class="file-uploader">
<div class="upload-area" @drop.prevent="handleDrop" @dragover.prevent>
<input
ref="fileInputRef"
type="file"
multiple
@change="handleFileSelect"
style="display: none"
/>
<button @click="$refs.fileInputRef.click()">选择文件</button>
<span>或拖拽文件到这里</span>
</div>
<div class="upload-list">
<div
v-for="file in uploadFiles"
:key="file.id"
class="upload-item"
>
<div class="file-info">
<span class="file-icon">{{ getFileIcon(file.name) }}</span>
<div class="file-details">
<div class="file-name">{{ file.name }}</div>
<div class="file-size">{{ formatSize(file.size) }}</div>
</div>
</div>
<div class="upload-progress">
<div class="progress-bar">
<div
class="progress-fill"
:style="{ width: file.progress + '%' }"
:class="{ error: file.status === 'error' }"
></div>
</div>
<div class="progress-info">
<span class="progress-percent">{{ file.progress }}%</span>
<span class="upload-speed" v-if="file.status === 'uploading'">
{{ file.speed }}
</span>
<span class="remaining-time" v-if="file.status === 'uploading' && file.remainingTime">
剩余 {{ file.remainingTime }}
</span>
</div>
</div>
<div class="upload-status">
<span v-if="file.status === 'pending'" class="status-pending">等待中</span>
<span v-else-if="file.status === 'uploading'" class="status-uploading">上传中</span>
<span v-else-if="file.status === 'completed'" class="status-completed">✓ 完成</span>
<span v-else-if="file.status === 'error'" class="status-error">失败</span>
<button
v-if="file.status === 'error'"
@click="retryUpload(file)"
class="retry-btn"
>
重试
</button>
</div>
</div>
</div>
<div class="upload-summary" v-if="totalFiles > 0">
<span>总计: {{ completedFiles }}/{{ totalFiles }} 个文件</span>
<span>总大小: {{ formatSize(totalSize) }}</span>
</div>
</div>
</template>
<script setup>
import { ref, computed, onMounted, onUnmounted } from 'vue'
// 状态
const uploadFiles = ref([])
const fileInputRef = ref(null)
let ws = null
let fileId = 0
// 计算属性
const totalFiles = computed(() => uploadFiles.value.length)
const completedFiles = computed(() =>
uploadFiles.value.filter(f => f.status === 'completed').length
)
const totalSize = computed(() =>
uploadFiles.value.reduce((sum, f) => sum + f.size, 0)
)
// WebSocket 连接
const connectWebSocket = () => {
const wsUrl = `${import.meta.env.VITE_WS_URL}/upload`
ws = new WebSocket(wsUrl)
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
handleUploadProgress(data)
}
ws.onclose = () => {
// 重连
setTimeout(connectWebSocket, 3000)
}
}
// 处理上传进度
const handleUploadProgress = (data) => {
const file = uploadFiles.value.find(f => f.id === data.fileId)
if (!file) return
file.progress = data.progress
file.speed = data.speed
file.remainingTime = data.remainingTime
if (data.status === 'completed') {
file.status = 'completed'
file.url = data.url
} else if (data.status === 'error') {
file.status = 'error'
file.error = data.error
}
}
// 处理文件选择
const handleFileSelect = (event) => {
const files = Array.from(event.target.files)
files.forEach(file => addFile(file))
}
// 处理拖拽
const handleDrop = (event) => {
const files = Array.from(event.dataTransfer.files)
files.forEach(file => addFile(file))
}
// 添加文件
const addFile = (file) => {
const uploadFile = {
id: `file-${++fileId}`,
name: file.name,
size: file.size,
type: file.type,
progress: 0,
speed: '',
remainingTime: '',
status: 'pending',
file: file
}
uploadFiles.value.push(uploadFile)
uploadFile(uploadFile)
}
// 上传文件(分片上传)
const uploadFile = async (uploadFile) => {
uploadFile.status = 'uploading'
const CHUNK_SIZE = 1024 * 1024 // 1MB
const totalChunks = Math.ceil(uploadFile.size / CHUNK_SIZE)
const chunks = []
for (let i = 0; i < totalChunks; i++) {
const start = i * CHUNK_SIZE
const end = Math.min(start + CHUNK_SIZE, uploadFile.size)
chunks.push(uploadFile.file.slice(start, end))
}
// 发送文件信息
ws.send(JSON.stringify({
type: 'init',
fileId: uploadFile.id,
fileName: uploadFile.name,
fileSize: uploadFile.size,
totalChunks
}))
// 分片上传
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i]
const reader = new FileReader()
reader.onload = () => {
ws.send(JSON.stringify({
type: 'chunk',
fileId: uploadFile.id,
chunkIndex: i,
data: reader.result
}))
}
reader.readAsArrayBuffer(chunk)
// 等待服务器确认
await new Promise(resolve => {
const handler = (event) => {
const data = JSON.parse(event.data)
if (data.fileId === uploadFile.id && data.chunkIndex === i) {
ws.removeEventListener('message', handler)
resolve()
}
}
ws.addEventListener('message', handler)
})
}
}
// 重试上传
const retryUpload = (file) => {
file.progress = 0
file.status = 'pending'
uploadFile(file)
}
// 格式化文件大小
const formatSize = (bytes) => {
if (bytes < 1024) return bytes + ' B'
if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB'
if (bytes < 1024 * 1024 * 1024) return (bytes / (1024 * 1024)).toFixed(1) + ' MB'
return (bytes / (1024 * 1024 * 1024)).toFixed(1) + ' GB'
}
// 获取文件图标
const getFileIcon = (fileName) => {
const ext = fileName.split('.').pop().toLowerCase()
const icons = {
pdf: '📄',
doc: '📝', docx: '📝',
xls: '📊', xlsx: '📊',
ppt: '📽️', pptx: '📽️',
zip: '📦', rar: '📦', '7z': '📦',
mp3: '🎵', wav: '🎵',
mp4: '🎬', avi: '🎬', mkv: '🎬',
jpg: '🖼️', jpeg: '🖼️', png: '🖼️', gif: '🖼️',
js: '📜', ts: '📜',
default: '📁'
}
return icons[ext] || icons.default
}
// 生命周期
onMounted(() => {
connectWebSocket()
})
onUnmounted(() => {
ws?.close()
})
</script>
<style scoped>
.file-uploader {
max-width: 600px;
margin: 0 auto;
padding: 20px;
}
.upload-area {
border: 2px dashed #ccc;
border-radius: 8px;
padding: 40px;
text-align: center;
cursor: pointer;
transition: all 0.3s;
}
.upload-area:hover {
border-color: #1976d2;
background: #f5f5f5;
}
.upload-area button {
background: #1976d2;
color: white;
border: none;
padding: 10px 20px;
border-radius: 4px;
cursor: pointer;
margin-right: 12px;
}
.upload-list {
margin-top: 20px;
}
.upload-item {
display: flex;
align-items: center;
gap: 16px;
padding: 12px;
border: 1px solid #eee;
border-radius: 8px;
margin-bottom: 12px;
}
.file-info {
display: flex;
align-items: center;
gap: 12px;
min-width: 200px;
}
.file-icon {
font-size: 32px;
}
.file-name {
font-weight: 500;
margin-bottom: 4px;
}
.file-size {
font-size: 12px;
color: #888;
}
.upload-progress {
flex: 1;
}
.progress-bar {
height: 8px;
background: #eee;
border-radius: 4px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background: #1976d2;
transition: width 0.3s;
}
.progress-fill.error {
background: #f44336;
}
.progress-info {
display: flex;
justify-content: space-between;
margin-top: 4px;
font-size: 12px;
color: #666;
}
.upload-status {
text-align: right;
min-width: 80px;
}
.status-pending { color: #888; }
.status-uploading { color: #1976d2; }
.status-completed { color: #4caf50; }
.status-error { color: #f44336; }
.retry-btn {
margin-top: 4px;
padding: 4px 8px;
font-size: 12px;
background: #f44336;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.upload-summary {
display: flex;
justify-content: space-between;
padding: 12px;
background: #f5f5f5;
border-radius: 8px;
font-size: 14px;
color: #666;
}
</style>五、常见问题 #
Q1: WebSocket 和 SSE 如何选择? #
选 WebSocket:
- 需要双向通信(聊天、游戏、协作编辑)
- 需要传输二进制数据
- 需要高频率的客户端→服务器消息
选 SSE:
- 只需要服务器推送(通知、股票行情、日志流)
- 想要更简单的实现
- 需要断点续传(通过 last-event-id)
- HTTP/HTTPS 协议,更容易穿透防火墙
Q2: WebSocket 连接经常断开怎么办? #
原因分析:
- 网络不稳定
- 服务器超时断开
- 代理/防火墙限制
解决方案:
// 1. 心跳保活
setInterval(() => ws.send('ping'), 30000)
// 2. 自动重连(指数退避)
const reconnect = (delay = 1000) => {
setTimeout(() => {
ws = new WebSocket(url)
ws.onclose = () => reconnect(Math.min(delay * 2, 30000))
}, delay)
}
// 3. 检测僵尸连接
ws.onmessage = (e) => {
if (e.data === 'pong') {
lastPongTime = Date.now()
}
}
setInterval(() => {
if (Date.now() - lastPongTime > 60000) {
ws.close() // 超过60秒没响应,主动断开重连
}
}, 30000)Q3: SSE 在 IE 浏览器兼容吗? #
原生 SSE 在 IE 不支持,需要 polyfill:
// 使用 eventsource-polyfill
import { EventSourcePolyfill } from 'eventsource-polyfill'
const eventSource = new EventSourcePolyfill(url, {
headers: {
'Authorization': 'Bearer token'
}
})Q4: 如何处理 WebSocket 消息顺序问题? #
WebSocket 本身是有序的,但在处理时可能出现乱序:
// 使用消息队列和序号
const messageQueue = []
let expectedSeq = 0
ws.onmessage = (event) => {
const message = JSON.parse(event.data)
messageQueue.push(message)
processQueue()
}
const processQueue = () => {
// 按序号处理
while (messageQueue.length > 0) {
const msg = messageQueue.find(m => m.seq === expectedSeq)
if (msg) {
handleMessage(msg)
messageQueue.splice(messageQueue.indexOf(msg), 1)
expectedSeq++
} else {
break // 等待缺失的消息
}
}
}Q5: 如何在生产环境部署 WebSocket? #
Nginx 配置:
upstream websocket {
server backend1:8080;
server backend2:8080;
}
server {
listen 80;
server_name example.com;
location /ws {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 3600s; # 1小时超时
proxy_send_timeout 3600s;
}
}负载均衡注意:
- 使用 sticky session 或共享存储(Redis)
- 推荐使用共享存储,把消息发布到 Redis,所有节点订阅
Q6: 如何调试 WebSocket? #
浏览器 DevTools:
- 打开 Network 标签
- 筛选 WS(WebSocket)
- 点击连接查看 Frames(消息)
Chrome 扩展:
- WebSocket King Client
- WebSocket Test Client
Node.js 调试:
const WebSocket = require('ws')
const ws = new WebSocket('ws://localhost:8080')
ws.on('open', () => console.log('Connected'))
ws.on('message', data => console.log('Received:', data.toString()))
ws.on('error', err => console.error('Error:', err))
// 发送测试消息
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'test', data: 'hello' }))
})六、总结速记 #
WebSocket 核心要点 #
| 类型 | 要点 |
|---|---|
| 协议 | ws:// 或 wss://,一次握手持久连接 |
| 状态 | 0=CONNECTING, 1=OPEN, 2=CLOSING, 3=CLOSED |
| 事件 | onopen, onmessage, onclose, onerror |
| 方法 | send(), close() |
| 保活 | 心跳检测(ping/pong) |
| 重连 | 需手动实现(指数退避) |
| 代理 | Nginx 需要 upgrade 头 |
SSE 核心要点 #
| 类型 | 要点 |
|---|---|
| 协议 | HTTP/HTTPS,Content-Type: text/event-stream |
| API | EventSource,自动重连 |
| 事件 | onopen, onmessage, onerror, addEventListener |
| 格式 | data: 消息内容\n\n |
| 断点续传 | 通过 last-event-id |
| 兼容性 | IE 需 polyfill |
Vue 3 组合式 API 封装 #
// useWebSocket
const { data, status, send, close } = useWebSocket(url, options)
// useSSE
const { data, status, lastEventId, close } = useSSE(url, options)最佳实践 #
| 场景 | 推荐 |
|---|---|
| 聊天/游戏 | WebSocket |
| 通知推送 | SSE |
| 协作编辑 | WebSocket + OT |
| 数据大屏 | SSE |
| 文件上传 | WebSocket(进度) |
| 断点续传 | SSE(last-event-id) |
| 高并发 | WebSocket + Redis Pub/Sub |
| 穿透防火墙 | SSE 更友好 |
附录 #
推荐库 #
| 库名 | 用途 | Stars |
|---|---|---|
| socket.io | WebSocket 封装,自动重连 | ⭐ 60k+ |
| ws | Node.js WebSocket 服务器 | ⭐ 21k+ |
| EventSource | 原生 SSE API | - |
| eventsource | Node.js SSE 客户端 | ⭐ 1k+ |
| reconnecting-websocket | 自动重连 WebSocket | ⭐ 1k+ |
官方文档 #
最后更新:2026-04-01
继续阅读
返回文章列表