星鉴网>技术干货>精通IPFS:保存内容之上篇

精通IPFS:保存内容之上篇

2019/7/4 10:35:32 3174人阅读

【导读】 接上一篇。


经过前面的分析,我们已经明白了 IPFS 启动过程,从今天起,我会分析一些常见的命令或动作,希望大家喜欢。


在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。

pull-stream 是一个新型的流库,数据被从源中拉取到目的中,它有两种基本类型的流:Source 源和 Sink 接收器。

除此之外,有两种复合类型的流:Through 通道流(比如转换)和 Duplex 双向流。

source 流,这类流返回一个匿名函数,这个匿名函数被称为 read 函数,它被后续的 sink 流函数或 through 流函数调用,从而读取 source 流中的内容。

sink 流,这类流最终都返回内部 drain.js 中的 sink 函数。

这类流主要是读取数据,并且对每一个读取到的数据进行处理,如果流已经结束,则调用用户指定结束函数进行处理。

through 流,这类流的函数会返回嵌套的匿名函数,第一层函数接收一个 source 流的 read 函数或其他 through 函数返回的第一层函数为参数。

第二层函数接收最终 sink 提供的写函数或其他 through 返回的第二层函数,第二层函数内部调用 read 函数,从而直接或间接从 source 中取得数据,获取数据后直接或间接调用 sink 函数,从而把数据写入到目的地址。

在 pull-streams 中,数据在流动之前,必须有一个完整的管道,这意味着一个源、零个或多个通道、一个接收器。但是仍然可以创建一个部分化的管道,这非常有用。

也就是说,可以创建一个完整的管道,比如 pull(source, sink) => undefined,也可以部分化的管道,比如 pull(through, sink) => sink,或者 pull(through1, through2) => through,我们在下面会大量遇到这种部分化的管道。

今天,我们看下第一个最常用的 add 命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作,add 命令就是干这个的,闲话少数,我们来看一段代码。


const {createNode} = require('ipfs')


const node = createNode({
  libp2p:{
    config:{
      dht:{
        enabled:true
      }
    }
  }
})

node.on('ready', async () => {

    const content = `我爱黑萤`;

    const filesAdded = await node.add({
      content: Buffer.from(content)
    },{
      chunkerOptions:{
        maxChunkSize:1000,
        avgChunkSize:1000
      }
    })


    console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})


这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。


在程序中,通过调用 IPFS 节点的 add 方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。

add 方法位于 core/components/files-regular/add.js 文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把 core/components/files-regular 目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的 add.js 文件。

下面,我们直接看这个函数的执行流程。

这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的 add 函数,后者才是主体,它的逻辑如下:

首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。


if (typeof options === 'function') {
  callback = options
  options = {}
}

定义检测内容的工具函数来检测我们要上传的内容。


const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj)
const isContentObject = obj => {
  if (typeof obj !== 'object') return false
  if (obj.content) return isBufferOrStream(obj.content)
  return Boolean(obj.path) && typeof obj.path === 'string'
}


const isInput = obj => isBufferOrStream(obj) || isContentObject(obj)
const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))


if (!ok) {
  return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects'))
}


接下来,执行 pull-stream 类库提供的 pull 函数。


我们来看 pull 函数的主要内容。它的第一个参数是 pull.values 函数执行的结果,这个 values 函数就是一个 source 流,它返回一个称为 read 的函数来读取我们提供的数据。


这个 read 函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。

第二个参数是 IPFS 对象的 addPullStream 方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的 add-pull-stream.js 文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。

第三个参数是 pull-sort 中定义的函数,这是一个依赖于 pull-stream 的库,根据一定规则来排序,这个函数我们不用管。

最后一个参数是 pull.collect 函数执行的结果,这个 collect 函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的 filesAdded 之所以是一个数组就是拜这个函数所赐。

上面逻辑的代码如下:

pull(
  pull.values([data]),
  self.addPullStream(options),
  sort((a, b) => {
    if (a.path < b.path) return 1
    if (a.path > b.path) return -1
    return 0
  }),
  pull.collect(callback)
)

在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。


现在,我们来看 addPullStream 方法,这个方法是保存内容的主体,add 方法是只开胃小菜。addPullStream 方法执行逻辑如下:


调用 parseChunkerString 函数,处理内容分块相关的选项。这个函数位于相同目录下的 utils.js 文件中,它检查用户指定的分块算法。


如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为 rabin 类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:


parseChunkerString = (chunker) => {
  if (!chunker) {
    return {
      chunker: 'fixed'
    }
  } else if (chunker.startsWith('size-')) {
    const sizeStr = chunker.split('-')[1]
    const size = parseInt(sizeStr)
    if (isNaN(size)) {
      throw new Error('Chunker parameter size must be an integer')
    }
    return {
      chunker: 'fixed',
      chunkerOptions: {
        maxChunkSize: size
      }
    }
  } else if (chunker.startsWith('rabin')) {
    return {
      chunker: 'rabin',
      chunkerOptions: parseRabinString(chunker)
    }
  } else {
    throw new Error(Unrecognized chunker option: ${chunker})
  }
}

注意:我们也可以通过重写这个函数来增加自己的分割算法。


合并整理选项变量。

const opts = Object.assign({}, {
  shardSplitThreshold: self._options.EXPERIMENTAL.sharding
    ? 1000
    : Infinity
}, options, chunkerOptions)

设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。


CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。


if (opts.hashAlg && opts.cidVersion !== 1) {
  opts.cidVersion = 1
}

设置进度处理函数,默认空实现。


const prog = opts.progress || noop
const progress = (bytes) => {
  total += bytes
  prog(total)
}



opts.progress = progress

用 pull 函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。


使用已经生成文件的 multihash 内容生成 CID 对象。


 作者介绍:  


乔疯,区块链狂热爱好者,熟悉比特币、EOS、以太坊源码及合约的开发,有着数年区块链开发经验,坚信技术是第一生产力,区块链改变整个人类,开设巴比特专栏以来已经获得 100多万次的阅读量。


参与湖南天河国云 Ulord 公链的开发和面向区块链行业的风险监控平台,后者在近期成功入选由工信部评选的 101 个网络安全技术应用试点示范项目


在爱健康金融金融有限公司参与组建彗星信息科技有限公司,并担任第一任技术部负责人,开发出了彗星播报等深受大家喜爱的区块链产品。


具有良好的协调沟通能力和团队协作精神!熟悉Scrum、XP、看板等敏捷项目管理,拥有PMP证书!


熟悉JAVA、Python、NodeJS、C/C++、Linux下的开发,熟悉分布式架构设计!熟悉互联网金融行业,具有丰富的互联网金融产品开发经验,对互联金融有着深入的了解。

26

参与讨论

登录后参加评论......

全部评论 0

作者

返回顶部