• 前言

    平时开发中其实很少使用 Buffer 对象,因为在绝大多数场景下字符串和数组已经完全能满足需求了;不过博主最近在尝试做一些 RPC 协议的开发,使用 Buffer 来处理二进制数据就必不可少了,虽然之前有了解过一些基本的知识,但是也想借这个机会深入理解一下 Node.js 中 Buffer 的设计。(基于 Node.js v14.19.0

Buffer 结构

基本认识

在分析 Buffer 结构之前,我们首先需要对这个对象有一个基本的认识:

  • 简单来讲它可以被看作一个「存放字节的数组」,或者是和字符串相似的「字节串」。
  • 和数组/字符串不同的是,它的基本单位是二进制「字节」,不过它仍可以轻松地在各种 js 常用对象之间互相转换,并且在 IO 上有着无与伦比的性能。
  • 因此,它常被作为文件,网络 IO 的「缓冲区」,是连接数据流和 js 常用对象之间的桥梁。

JS 与 C++ 模块概览

  • Buffer 一共有三个核心文件,姑且算将它分为三个模块,由两个 js 模块和一个 C++ 模块组成,分别是 /lib/buffer.js /lib/internal/buffer.js /src/node_buffer.cc
  • 三个模块形成依次嵌套的关系
    • 第一个模块基本封装了 Node.js Buffer API 中的常用方法,如 Buffer.alloc() Buffer.from() Buffer.byteLength() 等,最后导出了 Buffer 对象
    • 第二个模块封装了 Buffer 原型链上的各种基本方法,包括各种读写,转换等
    • 最后 C++ 模块是对第二个模块中具体方法的实现,包括 C++ 层面的内存分配,赋值,初始化等操作,当然,也调用了其他的 js 方法,如 Uint8Array。

自顶向下分析

  • 虽然 Node.js 提供了相当多操作 Buffer 的 API ,但是这些 API 的操作始终会被抽象为三种类型:

    1. 读:从一个 Buffer 对象的某个偏移量开始,读出某些数据
    2. 写:从一个 Buffer 对象的某个偏移量开始,写入某些数据
    3. 内存分配:为一个 Buffer 对象分配内存
  • 首先,我们从内存分配说起。虽然很多文章喜欢用 Buffer.from() 这个经常被使用的 API 作为例子,但是其操作其实比较特殊,他并没有显式地被拆分为 分配 + 写入 两个操作,而是单独调用了一系列函数进行内存分配和赋值(在 C++ 中,单独实现了 CreateFromString 方法,专门为 Buffer.from() 提供 API,猜测是为了性能优化)。

    Buffer 的内存分配是由 C++ 直接申请的,通常来讲是通过 C++ 向系统申请内存,然后通过 js 来管理和使用内存。使用 slab 分配机制,每个块大小规定为 8 KB,也以此来区别「大对象」和「小对象」,createPool() 方法就可以创建一个 slab 块。

    在当前版本下,我列举了三个 API 可以直接为 Buffer 分配空间:

        /**
         * Creates a new filled Buffer instance.
         * alloc(size[, fill[, encoding]])
         */
        Buffer.alloc = function alloc(size, fill, encoding) {
          assertSize(size);
          if (fill !== undefined && fill !== 0 && size > 0) {
            const buf = createUnsafeBuffer(size);
            return _fill(buf, fill, 0, buf.length, encoding);
          }
          return new FastBuffer(size);
        };
    
        /**
         * Equivalent to Buffer(num), by default creates a non-zero-filled Buffer
         * instance. If `--zero-fill-buffers` is set, will zero-fill the buffer.
         */
        Buffer.allocUnsafe = function allocUnsafe(size) {
          assertSize(size);
          return allocate(size);
        };
    
        /**
         * Equivalent to SlowBuffer(num), by default creates a non-zero-filled
         * Buffer instance that is not allocated off the pre-initialized pool.
         * If `--zero-fill-buffers` is set, will zero-fill the buffer.
         */
        Buffer.allocUnsafeSlow = function allocUnsafeSlow(size) {
          assertSize(size);
          return createUnsafeBuffer(size);
        };
    

    看起来这三个函数都调用了不同的方法去分配内存:

    • 第一种和最后一种看起来比较接近,都用了 createUnsafeBuffer 这个方法
    • 第二种使用的是 allocate 方法
    • 而第一种中,还用到了 return new FastBuffer(size) 作为返回值,那么我们可以暂时理解为 FastBuffer 对象,就是 Buffer 的内存空间。

    在此之外,还有 SlowBuffer 方法可以为 Buffer 分配内存,不过它是直接绑定在原型链上的方法,直接继承了 Uint8Array 进行内存分配,是专门为「大对象」设计的。因此,它实际上由 C++ 直接定义,从模块上讲,略有不同于以上三种方法。

    分析以上内存分配的方法,他们都有共同参数 size, 意味着我们需要指定内存空间的大小(字节为单位)。那么让我们看看这几个方法的具体实现:

    // allocate 方法,poolSize 为常量,规定每8 KB 为一个slab块
    // Buffer.poolSize = 8 * 1024
    function allocate(size) {
    if (size <= 0) {
        return new FastBuffer()
    }
    if (size < Buffer.poolSize >>> 1) {
        if (size > poolSize - poolOffset) createPool()
        const b = new FastBuffer(allocPool, poolOffset, size)
        poolOffset += size
        alignPool()
        return b
    }
    return createUnsafeBuffer(size)
    }
    

    显然,allocate 方法依旧是调用了 createUnsafeBuffer 来进行内存分配,在某些特殊情况下,会通过返回 FastBuffer + 地址偏移量 来进行内存分配,那么让我们看看其中具体的逻辑:

    • if (size <= 0):size <= 0,分配空内存

    • if (size < Buffer.poolSize >>> 1):size < 4 KB

      • if (size > poolSize - poolOffset) 但是,当前块中剩下空间不足以装下当前大小的 Buffer,那么就新创建一块 slab ,创建新 slab 会自动把 offset 置为0。

      此时,已经保证当前 slab 块可以装下 Buffer,那么就为当前Buffer 分配空间,同时进行一下8 KB对齐。

    • else:size >= 4 KB,调用 createUnsafeBuffer 来分配内存。同理,size > 8 KB 的大对象,也是通过这个方法来实现分配内存

    // createUnsafeBuffer 方法,加入了一个置零的标识符
    function createUnsafeBuffer(size) {
      zeroFill[0] = 0
      try {
        return new FastBuffer(size)
      } finally {
        zeroFill[0] = 1
      }
    }
    

    createUnsafeBuffer 方法,实际上也是调用了 FastBuffer 来实现内存分配。

    综上所述,各种不同的内存分配方法实际上都是由 FastBuffer 来实现的。那么,让我们深入 FastBuffer 看看其中的逻辑

    // /lib/internal/buffer.js
    class FastBuffer extends Uint8Array {
      // Using an explicit constructor here is necessary to avoid relying on
      // `Array.prototype[Symbol.iterator]`, which can be mutated by users.
      // eslint-disable-next-line no-useless-constructor
      constructor(bufferOrLength, byteOffset, length) {
        super(bufferOrLength, byteOffset, length)
      }
    }
    

    看起来相当简单,只是在 Uint8Array 的构造器上添加了几个额外参数,最后作为模块导出,在 /lib/buffer.js 中被使用。
    那么,什么是 Uint8Array,它和 Buffer 有什么关系呢?

    • Uint8Array 在其他语言中叫做 ByteArray,其本质就是一个 8 位无符号整型的「二进制数组」
    • 其中每个单元都是 0 - 255 的数字
    • 它和 js 中的数组 Array 没有任何关系!
    • 因为每个单元格大小固定,为 8 位,因此可以把它理解为一片整齐连续的内存空间

    我们在打印 Buffer 的时候,通常会展示出 <Buffer ff ff ff ff 0f 00 00 00> 这样的结果,两个 16 进制数为一组,正好就对应 Uint8Array 中一个的「格子」,也就是 Buffer 对应的内存空间了。

    那么,C++ 是如何为 Uint8Array 分配内存的呢?

    // /src/node_buffer.cc
    Local<ArrayBuffer> CallbackInfo::CreateTrackedArrayBuffer(
        Environment* env,
        char* data,
        size_t length,
        FreeCallback callback,
        void* hint) {
      CHECK_NOT_NULL(callback);
      CHECK_IMPLIES(data == nullptr, length == 0);
    
      CallbackInfo* self = new CallbackInfo(env, callback, data, hint);
      // 创建 BackingStore,设置 GC 回调
      std::unique_ptr<BackingStore> bs =
          ArrayBuffer::NewBackingStore(data, length, [](void*, size_t, void* arg) {
            static_cast<CallbackInfo*>(arg)->OnBackingStoreFree();
          }, self);
      // 创建 ArrayBuffer
      Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(), std::move(bs));
    
      // V8 simply ignores the BackingStore deleter callback if data == nullptr,
      // but our API contract requires it being called.
      if (data == nullptr) {
        ab->Detach();
        self->OnBackingStoreFree();  // This calls `callback` asynchronously.
      } else {
        // Store the ArrayBuffer so that we can detach it later.
        self->persistent_.Reset(env->isolate(), ab);
        self->persistent_.SetWeak();
      }
    
      return ab;
    }
    

    显然,C++ 通过 CreateTrackedArrayBuffer 创建了一个 ArrayBuffer,再通过 ArrayBuffer 创建了一个 Uint8Array。
    至于如何创建 Backing Store 以及 V8 如何管理 Buffer 的 GC,博主就不再深入介绍了(其实是我懒,我也没去看

  • 知道了 Buffer 的内存分配机制,接下来就可以了解一下如何操作 Buffer 了。博主在此以写入为例,简单分析对 Buffer 的写入操作,读取操作同理可得。

    写入与读取操作大同小异,在 Buffer 中都分为「大端」和「小端」,具体可以参考这里,在不同的平台,会根据判断的字节序,在导出时决定如何操作非整型的数据。

    // /lib/internel/buffer.js
    // Temporary buffers to convert numbers.
    const float32Array = new Float32Array(1);
    const uInt8Float32Array = new Uint8Array(float32Array.buffer);
    const float64Array = new Float64Array(1);
    const uInt8Float64Array = new Uint8Array(float64Array.buffer);
    
    // Check endianness.
    float32Array[0] = -1; // 0xBF800000
    // Either it is [0, 0, 128, 191] or [191, 128, 0, 0]. It is not possible to
    // check this with `os.endianness()` because that is determined at compile time.
    const bigEndian = uInt8Float32Array[3] === 0;
    

    那么,写入操作具体如何执行呢?以小端序写入 UInt64 为例:

    function writeBigU_Int64LE(buf, value, offset, min, max) {
      checkInt(value, min, max, buf, offset, 7)
    
      let lo = Number(value & 0xffffffffn)
      buf[offset++] = lo
      lo = lo >> 8
      buf[offset++] = lo
      lo = lo >> 8
      buf[offset++] = lo
      lo = lo >> 8
      buf[offset++] = lo
      let hi = Number((value >> 32n) & 0xffffffffn)
      buf[offset++] = hi
      hi = hi >> 8
      buf[offset++] = hi
      hi = hi >> 8
      buf[offset++] = hi
      hi = hi >> 8
      buf[offset++] = hi
      return offset
    }
    
    function writeBigUInt64LE(value, offset = 0) {
      return writeBigU_Int64LE(this, value, offset, 0n, 0xffffffffffffffffn)
    }
    
    

    与其他类型稍有不同,UInt64 在写入时分为了高 32 位和低 32 位,通过 offset 迭代,每次取出其中 8 位放入 Buffer 的一个「格子」。因为之前内存分配时说过,Uint8Array 每个「格子」正好是 8 位,因此每次赋值时,都只是把当前数据的前 8 位写入了,因此只需要不停地将当前数据右移 8 位,就能依此将整个 64 位的数据写入。

Buffer 的转换与拼接

转换

  • Buffer 可以和字符串直接相互转换,只要是 Node.js 支持的编码类型,都可以互相转换。
  • 甚至,在 Buffer 中,可以指定某一段序列按照某种编码转为字符串,如 buf.toString([encoding],[start],[end]) 方法。
  • 遇到无法转换的内容时,会降级处理,输出�
    const buf = Buffer.alloc(8);
    buf[0] = 0xC0;
    buf[1] = 0xFF;
    
    console.log(buf.toString('utf-8')) // output: "�"
    

拼接

在使用 Buffer 时,通常会和流一起配合使用,通过 Buffer 维护分段式缓冲区来提升性能。那么就不得不引入 Buffer 拼接时可能出现的问题:

  • 拼接存在宽字节
    const fs = require('fs')
    const rs = fs.createReadStream('./a.txt', { highWaterMark: 7 })
    let data = ''
    rs.on('data', (chunk) => {
      data += chunk
    })
    rs.on('end', () => {
      console.log(data)
    })
    // 我好���做��然小姐的���啊��可嘉然小���说��喜欢猫
    

    在执行 data += chunk 时,实际上对 chunk 做了隐式类型转换,将 Buffer 转为了 String。那么当编码中存在宽字节,如中文的 utf-8 编码(占用 3 个字节)时,则会出现无法转换导致降级。

那么如何正确拼接 Buffer 呢?

  • 既然问题是因为在分段处理时做了隐式类型转换,解决起来也并非难事。很容易想到先将每一段 Buffer 内容存起来,然后再统一拼接:
    const fs = require('fs')
    const rs = fs.createReadStream('./a.txt', { highWaterMark: 7 })
    const chunks = []
    let size = 0
    rs.on('data', (chunk) => {
        chunks.push(chunk)
        size += chunk.length
        }
    )
    rs.on('end', () => {
        console.log(Buffer.concat(chunks, size).toString())
    })
    // 我好想做嘉然小姐的狗啊,可嘉然小姐说她喜欢猫
    

以上~


What is broken can be reforged.