Skip to content

WebSocket技术详解

WebSocket基础

什么是WebSocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它提供了在Web浏览器和服务器之间建立持久连接的标准方法,使得服务器可以主动向客户端推送数据。

与HTTP的区别

  1. 连接特点

    • HTTP是非持久的、单向的
    • WebSocket是持久的、双向的
  2. 数据格式

    • HTTP基于请求-响应模式
    • WebSocket支持双向实时数据传输
  3. 协议标识

    • HTTP: http://https://
    • WebSocket: ws://wss://

应用场景

  • 实时通讯(聊天室、即时消息)
  • 实时数据展示(股票行情、体育赛事)
  • 在线游戏
  • 协同编辑
  • 实时监控

WebSocket API

基础用法

js
// 创建WebSocket连接
const ws = new WebSocket('ws://example.com/socketserver')

// 连接建立时触发
ws.onopen = function () {
  console.log('连接已建立')
  // 发送数据
  ws.send('Hello Server!')
}

// 接收消息
ws.onmessage = function (event) {
  console.log('收到消息:', event.data)
}

// 连接关闭时触发
ws.onclose = function () {
  console.log('连接已关闭')
}

// 发生错误时触发
ws.onerror = function (error) {
  console.error('WebSocket错误:', error)
}

数据格式

WebSocket支持发送多种类型的数据:

js
// 发送文本
ws.send('Hello')

// 发送JSON对象
ws.send(JSON.stringify({ type: 'message', content: 'Hello' }))

// 发送二进制数据
const buffer = new ArrayBuffer(8)
ws.send(buffer)

// 发送Blob对象
const blob = new Blob(['Hello'], { type: 'text/plain' })
ws.send(blob)

WebSocket客户端封装

基础封装

js
// WebSocket 错误类型枚举
export enum WebSocketErrorType {
  MANUAL_CLOSE = 'MANUAL_CLOSE',
  HEARTBEAT_TIMEOUT = 'HEARTBEAT_TIMEOUT',
  CONNECT_ERROR = 'CONNECT_ERROR',
  SEND_ERROR = 'SEND_ERROR',
  MAX_RECONNECT = 'MAX_RECONNECT',
  UNKNOWN = 'UNKNOWN',
}

/**
 * WebSocket客户端配置选项
 * @interface WebSocketOptions
 * @property {number} [maxReconnectAttempts=5] - 最大重连尝试次数
 * @property {number} [reconnectInterval=3000] - 重连间隔时间(毫秒)
 * @property {number} [heartbeatInterval=30000] - 心跳包发送间隔(毫秒)
 * @property {number} [heartbeatTimeout=5000] - 心跳包超时时间(毫秒)
 * @property {number} [heartbeatTimeout=5000] - 心跳包超时时间(毫秒)
 * @property {() => void} [onOpen] - WebSocket连接成功回调
 * @property {() => void} [onClose] - WebSocket关闭回调
 * @property {(error: Event) => void} [onError] - WebSocket错误回调
 * @property {(data: any) => void} [onMessage] - WebSocket消息回调
 */
export interface WebSocketOptions {
  maxReconnectAttempts?: number;
  reconnectInterval?: number;
  heartbeatInterval?: number;
  heartbeatTimeout?: number;
  onOpen?: () => void;
  onClose?: (reason: { type: WebSocketErrorType; message: string }) => void;
  onError?: (error: { type: WebSocketErrorType; message: string; originalError?: any }) => void;
  onMessage?: (data: any) => void;
}

/**
 * WebSocket消息格式
 * @interface WebSocketMessage
 * @property {string} messageType - 消息类型
 * @property {string} content - 消息主体内容
 * @property {string} contentType - 消息主体内容具体类型
 * @property {any} [property: string] - 其他自定义字段
 */
export interface WebSocketMessage {
  content: string; // 消息主体 字符串或json字符串
  contentType: WebsocketContentType;
  messageType: WebsocketMessageType;
  sendStatus?: number; // 1成功  2.失败
  [property: string]: any;
}

// 消息主体内容具体类型
export enum WebsocketContentType {
  /** 文本 */
  Text = 'text',
  /** 图片 */
  Picture = 'picture',
  /** 未知类型 */
  Unknown = 'unknown',
	// ...其它类型主体 如视频/语音等
}

// 消息类型
export enum WebsocketMessageType {
  /** 心跳检测发送消息 */
  Ping = 'ping',
	/** 心跳检测响应消息 */
  Pong = 'pong',
  /** 用户发送消息 */
  User = 'user',
  /** 系统回复消息 */
  System = 'system',
  /** 系统验证消息 */
  Auth = 'auth',
  /** 未知类型 */
  Unknown = 'unknown',
	/** 其它类型的消息 对应角色 */
}

/**
 * WebSocket客户端封装类
 * 提供自动重连、心跳检测、消息订阅等功能
 *
 * @example
 * ```typescript
 * // 创建WebSocket客户端实例
 * const ws = new WebSocketClient('ws://example.com', {
 *   maxReconnectAttempts: 5,
 *   reconnectInterval: 3000,
 *   heartbeatInterval: 30000,
 *   heartbeatTimeout: 5000,
 *   onOpen: () => console.log('连接成功'),
 *   onClose: () => console.log('连接关闭'),
 *   onError: (error) => console.error('连接错误:', error),
 *   onMessage: (data) => console.log('收到消息:', data)
 * });
 *
 * // 订阅特定类型的消息
 * ws.on('custom_event', (data) => {
 *   console.log('收到自定义消息:', data);
 * });
 *
 * // 发送消息
 * ws.send({ messageType: 'custom_event', data: 'hello' });
 *
 * // 关闭连接
 * ws.close();
 * ```
 */
export class WebSocketClient {
  private url: string;
  private options: WebSocketOptions;
  private ws: WebSocket | null;
  private status: 'CLOSED' | 'CLOSING' | 'CONNECTING' | 'OPEN';
  private messageCallbacks: Map<string, (data: any) => void>;
  private reconnectAttempts: number;
  private maxReconnectAttempts: number;
  private reconnectInterval: number;
  private heartbeatInterval: number;
  private heartbeatTimeout: number;
  private heartbeatTimer: NodeJS.Timeout | null;
  private heartbeatTimeoutTimer: NodeJS.Timeout | null;

  constructor(url: string, options: WebSocketOptions = {}) {
    this.url = url;
    this.options = options;
    this.ws = null;
    this.status = 'CLOSED';
    this.messageCallbacks = new Map();
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
    this.reconnectInterval = options.reconnectInterval || 3000;
    this.heartbeatInterval = options.heartbeatInterval || 30000;
    this.heartbeatTimeout = options.heartbeatTimeout || 5000;
    this.heartbeatTimer = null;
    this.heartbeatTimeoutTimer = null;
    this.init();
  }

  private init(): void {
    try {
      this.ws = new WebSocket(this.url);
      this.status = 'CONNECTING';
      this.bindEvents();
    } catch (error) {
      console.error('WebSocket initialization failed:', error);
      this.handleError(error, WebSocketErrorType.CONNECT_ERROR);
    }
  }

  private bindEvents(): void {
    if (!this.ws) return;

    this.ws.onopen = () => {
      this.status = 'OPEN';
      this.reconnectAttempts = 0;
      this.startHeartbeat();
      this.options.onOpen?.();
    };

    this.ws.onclose = () => {
      this.stopHeartbeat();
      this.cleanupWebSocket();
      this.options.onClose?.({
        type: this.status === 'CLOSING' ? WebSocketErrorType.MANUAL_CLOSE : WebSocketErrorType.UNKNOWN,
        message: this.status === 'CLOSING' ? '用户主动关闭连接' : '连接意外关闭',
      });
      // 只在非主动关闭时触发重连
      if (this.status !== 'CLOSING') {
        this.reconnect();
      }
      this.status = 'CLOSED';
    };

    this.ws.onerror = (error: Event) => {
      this.handleError(error);
    };

    this.ws.onmessage = (event: MessageEvent) => {
      try {
        const data = this.parseMessage(event.data);
        if (data?.messageType === 'pong') {
          this.handlePong();
        } else {
          this.handleMessage(data);
        }
      } catch (error) {
        console.error('Message handling error:', error);
        this.handleError(error);
      }
    };
  }

  private parseMessage(message: string): WebSocketMessage {
    try {
      return typeof message === 'string' ? JSON.parse(message) : message;
    } catch (e) {
      console.warn('Message parsing failed:', e);
      return { messageType: WebsocketMessageType.Unknown, content: message, contentType: WebsocketContentType.Unknown };
    }
  }

  /**
   * 发送消息到WebSocket服务器
   * @param {string | object} data - 要发送的消息,可以是字符串或对象
   * @throws {Error} 当WebSocket未连接时抛出错误
   */
  public send(data: string | object): void {
    if (this.status !== 'OPEN') {
      this.handleError(new Error('WebSocket is not connected'), WebSocketErrorType.SEND_ERROR);
    }

    try {
      const message = typeof data === 'string' ? data : JSON.stringify(data);
      this.ws?.send(message);
    } catch (error) {
      this.handleError(error, WebSocketErrorType.SEND_ERROR);
    }
  }

  /**
   * 关闭WebSocket连接
   * 停止心跳检测并清理相关资源
   */
  public close(): void {
    this.status = 'CLOSING';
    this.stopHeartbeat();
    this.ws?.close();
  }

  private reconnect(): void {
    if (this.status === 'CLOSING') return;

    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      this.handleError(new Error('Max reconnection attempts reached'), WebSocketErrorType.MAX_RECONNECT);
      return;
    }

    this.reconnectAttempts++;
    console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

    setTimeout(() => {
      this.init();
    }, this.reconnectInterval);
  }

  /**
   * 订阅指定类型的消息
   * @param {string} event - 消息类型
   * @param {(data: any) => void} callback - 消息处理回调函数
   */
  public on(event: string, callback: (data: any) => void): void {
    this.messageCallbacks.set(event, callback);
  }

  /**
   * 取消订阅指定类型的消息
   * @param {string} event - 消息类型
   */
  public off(event: string): void {
    this.messageCallbacks.delete(event);
  }

  private handleMessage(data: WebSocketMessage): void {
    if (data.messageType && this.messageCallbacks.has(data.messageType)) {
      this.messageCallbacks.get(data.messageType)?.(data);
    }
    this.options.onMessage?.(data);
  }

  private startHeartbeat(): void {
    this.stopHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.status === 'OPEN') {
        try {
          this.send({ messageType: 'ping', content: `${Date.now()}`, contentType: WebsocketContentType.Text });
          this.waitForPong();
        } catch (error) {
          console.error('Heartbeat send failed:', error);
          this.handleError(error);
        }
      }
    }, this.heartbeatInterval);
  }

  private stopHeartbeat(): void {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
    if (this.heartbeatTimeoutTimer) {
      clearTimeout(this.heartbeatTimeoutTimer);
      this.heartbeatTimeoutTimer = null;
    }
  }

  private waitForPong(): void {
    this.heartbeatTimeoutTimer = setTimeout(() => {
      console.error('Heartbeat timeout');
      this.handleError(new Error('Heartbeat timeout'), WebSocketErrorType.HEARTBEAT_TIMEOUT);
      this.ws?.close();
    }, this.heartbeatTimeout);
  }

  private handlePong(): void {
    if (this.heartbeatTimeoutTimer) {
      clearTimeout(this.heartbeatTimeoutTimer);
      this.heartbeatTimeoutTimer = null;
    }
  }

  private handleError(error: any, type: WebSocketErrorType = WebSocketErrorType.UNKNOWN): void {
    console.error('WebSocket error:', error);
    this.options.onError?.({
      type,
      message: this.getErrorMessage(type),
      originalError: error,
    });
  }

  private cleanupWebSocket(): void {
    if (this.ws) {
      this.ws.onopen = null;
      this.ws.onclose = null;
      this.ws.onerror = null;
      this.ws.onmessage = null;
    }
  }

  private getErrorMessage(type: WebSocketErrorType): string {
    const errorMessages = {
      [WebSocketErrorType.MANUAL_CLOSE]: '用户主动关闭连接',
      [WebSocketErrorType.HEARTBEAT_TIMEOUT]: '心跳包超时',
      [WebSocketErrorType.CONNECT_ERROR]: '连接建立失败',
      [WebSocketErrorType.SEND_ERROR]: '消息发送失败',
      [WebSocketErrorType.MAX_RECONNECT]: '达到最大重连次数',
      [WebSocketErrorType.UNKNOWN]: '未知错误',
    };
    return errorMessages[type] || '未知错误';
  }
}

最佳实践

  1. 错误处理

    • 实现完善的错误处理机制
    • 记录错误日志
    • 提供用户友好的错误提示
  2. 重连策略

    • 使用指数退避算法
    • 设置最大重连次数
    • 在适当时机重置重连计数
  3. 心跳机制

    • 合理设置心跳间隔
    • 实现超时检测
    • 在连接不稳定时主动重连
  4. 消息处理

    • 实现消息队列
    • 处理消息乱序
    • 支持消息重发
  5. 安全性

    • 使用wss安全连接
    • 实现消息加密
    • 添加身份验证

注意事项

  1. 连接管理

    • 及时关闭不需要的连接
    • 处理页面卸载时的连接关闭
    • 避免创建过多的连接实例
  2. 性能优化

    • 控制心跳频率
    • 合理设置重连间隔
    • 优化消息格式
  3. 兼容性

    • 处理不同浏览器的实现差异
    • 提供降级方案
    • 考虑移动端的特殊情况
  4. 调试与监控

    • 添加日志记录
    • 实现性能监控
    • 提供调试工具
既来之,则安之。