WebSocket技术详解
WebSocket基础
什么是WebSocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它提供了在Web浏览器和服务器之间建立持久连接的标准方法,使得服务器可以主动向客户端推送数据。
与HTTP的区别
连接特点
- HTTP是非持久的、单向的
- WebSocket是持久的、双向的
数据格式
- HTTP基于请求-响应模式
- WebSocket支持双向实时数据传输
协议标识
- HTTP:
http://
、https://
- WebSocket:
ws://
、wss://
- HTTP:
应用场景
- 实时通讯(聊天室、即时消息)
- 实时数据展示(股票行情、体育赛事)
- 在线游戏
- 协同编辑
- 实时监控
WebSocket API
基础用法
js
// 创建WebSocket连接
const ws = new WebSocket('ws://example.com/socketserver')
// 连接建立时触发
ws.onopen = function () {
console.log('连接已建立')
// 发送数据
ws.send('Hello Server!')
}
// 接收消息
ws.onmessage = function (event) {
console.log('收到消息:', event.data)
}
// 连接关闭时触发
ws.onclose = function () {
console.log('连接已关闭')
}
// 发生错误时触发
ws.onerror = function (error) {
console.error('WebSocket错误:', error)
}
数据格式
WebSocket支持发送多种类型的数据:
js
// 发送文本
ws.send('Hello')
// 发送JSON对象
ws.send(JSON.stringify({ type: 'message', content: 'Hello' }))
// 发送二进制数据
const buffer = new ArrayBuffer(8)
ws.send(buffer)
// 发送Blob对象
const blob = new Blob(['Hello'], { type: 'text/plain' })
ws.send(blob)
WebSocket客户端封装
基础封装
js
// WebSocket 错误类型枚举
export enum WebSocketErrorType {
MANUAL_CLOSE = 'MANUAL_CLOSE',
HEARTBEAT_TIMEOUT = 'HEARTBEAT_TIMEOUT',
CONNECT_ERROR = 'CONNECT_ERROR',
SEND_ERROR = 'SEND_ERROR',
MAX_RECONNECT = 'MAX_RECONNECT',
UNKNOWN = 'UNKNOWN',
}
/**
* WebSocket客户端配置选项
* @interface WebSocketOptions
* @property {number} [maxReconnectAttempts=5] - 最大重连尝试次数
* @property {number} [reconnectInterval=3000] - 重连间隔时间(毫秒)
* @property {number} [heartbeatInterval=30000] - 心跳包发送间隔(毫秒)
* @property {number} [heartbeatTimeout=5000] - 心跳包超时时间(毫秒)
* @property {number} [heartbeatTimeout=5000] - 心跳包超时时间(毫秒)
* @property {() => void} [onOpen] - WebSocket连接成功回调
* @property {() => void} [onClose] - WebSocket关闭回调
* @property {(error: Event) => void} [onError] - WebSocket错误回调
* @property {(data: any) => void} [onMessage] - WebSocket消息回调
*/
export interface WebSocketOptions {
maxReconnectAttempts?: number;
reconnectInterval?: number;
heartbeatInterval?: number;
heartbeatTimeout?: number;
onOpen?: () => void;
onClose?: (reason: { type: WebSocketErrorType; message: string }) => void;
onError?: (error: { type: WebSocketErrorType; message: string; originalError?: any }) => void;
onMessage?: (data: any) => void;
}
/**
* WebSocket消息格式
* @interface WebSocketMessage
* @property {string} messageType - 消息类型
* @property {string} content - 消息主体内容
* @property {string} contentType - 消息主体内容具体类型
* @property {any} [property: string] - 其他自定义字段
*/
export interface WebSocketMessage {
content: string; // 消息主体 字符串或json字符串
contentType: WebsocketContentType;
messageType: WebsocketMessageType;
sendStatus?: number; // 1成功 2.失败
[property: string]: any;
}
// 消息主体内容具体类型
export enum WebsocketContentType {
/** 文本 */
Text = 'text',
/** 图片 */
Picture = 'picture',
/** 未知类型 */
Unknown = 'unknown',
// ...其它类型主体 如视频/语音等
}
// 消息类型
export enum WebsocketMessageType {
/** 心跳检测发送消息 */
Ping = 'ping',
/** 心跳检测响应消息 */
Pong = 'pong',
/** 用户发送消息 */
User = 'user',
/** 系统回复消息 */
System = 'system',
/** 系统验证消息 */
Auth = 'auth',
/** 未知类型 */
Unknown = 'unknown',
/** 其它类型的消息 对应角色 */
}
/**
* WebSocket客户端封装类
* 提供自动重连、心跳检测、消息订阅等功能
*
* @example
* ```typescript
* // 创建WebSocket客户端实例
* const ws = new WebSocketClient('ws://example.com', {
* maxReconnectAttempts: 5,
* reconnectInterval: 3000,
* heartbeatInterval: 30000,
* heartbeatTimeout: 5000,
* onOpen: () => console.log('连接成功'),
* onClose: () => console.log('连接关闭'),
* onError: (error) => console.error('连接错误:', error),
* onMessage: (data) => console.log('收到消息:', data)
* });
*
* // 订阅特定类型的消息
* ws.on('custom_event', (data) => {
* console.log('收到自定义消息:', data);
* });
*
* // 发送消息
* ws.send({ messageType: 'custom_event', data: 'hello' });
*
* // 关闭连接
* ws.close();
* ```
*/
export class WebSocketClient {
private url: string;
private options: WebSocketOptions;
private ws: WebSocket | null;
private status: 'CLOSED' | 'CLOSING' | 'CONNECTING' | 'OPEN';
private messageCallbacks: Map<string, (data: any) => void>;
private reconnectAttempts: number;
private maxReconnectAttempts: number;
private reconnectInterval: number;
private heartbeatInterval: number;
private heartbeatTimeout: number;
private heartbeatTimer: NodeJS.Timeout | null;
private heartbeatTimeoutTimer: NodeJS.Timeout | null;
constructor(url: string, options: WebSocketOptions = {}) {
this.url = url;
this.options = options;
this.ws = null;
this.status = 'CLOSED';
this.messageCallbacks = new Map();
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectInterval = options.reconnectInterval || 3000;
this.heartbeatInterval = options.heartbeatInterval || 30000;
this.heartbeatTimeout = options.heartbeatTimeout || 5000;
this.heartbeatTimer = null;
this.heartbeatTimeoutTimer = null;
this.init();
}
private init(): void {
try {
this.ws = new WebSocket(this.url);
this.status = 'CONNECTING';
this.bindEvents();
} catch (error) {
console.error('WebSocket initialization failed:', error);
this.handleError(error, WebSocketErrorType.CONNECT_ERROR);
}
}
private bindEvents(): void {
if (!this.ws) return;
this.ws.onopen = () => {
this.status = 'OPEN';
this.reconnectAttempts = 0;
this.startHeartbeat();
this.options.onOpen?.();
};
this.ws.onclose = () => {
this.stopHeartbeat();
this.cleanupWebSocket();
this.options.onClose?.({
type: this.status === 'CLOSING' ? WebSocketErrorType.MANUAL_CLOSE : WebSocketErrorType.UNKNOWN,
message: this.status === 'CLOSING' ? '用户主动关闭连接' : '连接意外关闭',
});
// 只在非主动关闭时触发重连
if (this.status !== 'CLOSING') {
this.reconnect();
}
this.status = 'CLOSED';
};
this.ws.onerror = (error: Event) => {
this.handleError(error);
};
this.ws.onmessage = (event: MessageEvent) => {
try {
const data = this.parseMessage(event.data);
if (data?.messageType === 'pong') {
this.handlePong();
} else {
this.handleMessage(data);
}
} catch (error) {
console.error('Message handling error:', error);
this.handleError(error);
}
};
}
private parseMessage(message: string): WebSocketMessage {
try {
return typeof message === 'string' ? JSON.parse(message) : message;
} catch (e) {
console.warn('Message parsing failed:', e);
return { messageType: WebsocketMessageType.Unknown, content: message, contentType: WebsocketContentType.Unknown };
}
}
/**
* 发送消息到WebSocket服务器
* @param {string | object} data - 要发送的消息,可以是字符串或对象
* @throws {Error} 当WebSocket未连接时抛出错误
*/
public send(data: string | object): void {
if (this.status !== 'OPEN') {
this.handleError(new Error('WebSocket is not connected'), WebSocketErrorType.SEND_ERROR);
}
try {
const message = typeof data === 'string' ? data : JSON.stringify(data);
this.ws?.send(message);
} catch (error) {
this.handleError(error, WebSocketErrorType.SEND_ERROR);
}
}
/**
* 关闭WebSocket连接
* 停止心跳检测并清理相关资源
*/
public close(): void {
this.status = 'CLOSING';
this.stopHeartbeat();
this.ws?.close();
}
private reconnect(): void {
if (this.status === 'CLOSING') return;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.handleError(new Error('Max reconnection attempts reached'), WebSocketErrorType.MAX_RECONNECT);
return;
}
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
setTimeout(() => {
this.init();
}, this.reconnectInterval);
}
/**
* 订阅指定类型的消息
* @param {string} event - 消息类型
* @param {(data: any) => void} callback - 消息处理回调函数
*/
public on(event: string, callback: (data: any) => void): void {
this.messageCallbacks.set(event, callback);
}
/**
* 取消订阅指定类型的消息
* @param {string} event - 消息类型
*/
public off(event: string): void {
this.messageCallbacks.delete(event);
}
private handleMessage(data: WebSocketMessage): void {
if (data.messageType && this.messageCallbacks.has(data.messageType)) {
this.messageCallbacks.get(data.messageType)?.(data);
}
this.options.onMessage?.(data);
}
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.status === 'OPEN') {
try {
this.send({ messageType: 'ping', content: `${Date.now()}`, contentType: WebsocketContentType.Text });
this.waitForPong();
} catch (error) {
console.error('Heartbeat send failed:', error);
this.handleError(error);
}
}
}, this.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}
private waitForPong(): void {
this.heartbeatTimeoutTimer = setTimeout(() => {
console.error('Heartbeat timeout');
this.handleError(new Error('Heartbeat timeout'), WebSocketErrorType.HEARTBEAT_TIMEOUT);
this.ws?.close();
}, this.heartbeatTimeout);
}
private handlePong(): void {
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}
private handleError(error: any, type: WebSocketErrorType = WebSocketErrorType.UNKNOWN): void {
console.error('WebSocket error:', error);
this.options.onError?.({
type,
message: this.getErrorMessage(type),
originalError: error,
});
}
private cleanupWebSocket(): void {
if (this.ws) {
this.ws.onopen = null;
this.ws.onclose = null;
this.ws.onerror = null;
this.ws.onmessage = null;
}
}
private getErrorMessage(type: WebSocketErrorType): string {
const errorMessages = {
[WebSocketErrorType.MANUAL_CLOSE]: '用户主动关闭连接',
[WebSocketErrorType.HEARTBEAT_TIMEOUT]: '心跳包超时',
[WebSocketErrorType.CONNECT_ERROR]: '连接建立失败',
[WebSocketErrorType.SEND_ERROR]: '消息发送失败',
[WebSocketErrorType.MAX_RECONNECT]: '达到最大重连次数',
[WebSocketErrorType.UNKNOWN]: '未知错误',
};
return errorMessages[type] || '未知错误';
}
}
最佳实践
错误处理
- 实现完善的错误处理机制
- 记录错误日志
- 提供用户友好的错误提示
重连策略
- 使用指数退避算法
- 设置最大重连次数
- 在适当时机重置重连计数
心跳机制
- 合理设置心跳间隔
- 实现超时检测
- 在连接不稳定时主动重连
消息处理
- 实现消息队列
- 处理消息乱序
- 支持消息重发
安全性
- 使用wss安全连接
- 实现消息加密
- 添加身份验证
注意事项
连接管理
- 及时关闭不需要的连接
- 处理页面卸载时的连接关闭
- 避免创建过多的连接实例
性能优化
- 控制心跳频率
- 合理设置重连间隔
- 优化消息格式
兼容性
- 处理不同浏览器的实现差异
- 提供降级方案
- 考虑移动端的特殊情况
调试与监控
- 添加日志记录
- 实现性能监控
- 提供调试工具