使用Stream完美地进行内容转换

echosoar 原创发表于 2018/11/20 09:09:02
#nodejs

背景

周末的时候在处理一些从05-13年的所有专利申请的公司、地址等数据,需要把其中的省市区县给匹配出来放到一行的最后; 最初时候的方式就是:使用 fs.readFile 来读取文件,然后对文本进行split,一个循环去处理每一行的数据,然后把结果再写回文件。
流程是没有问题的,也跑通了,就是随着年份的增加,专利申请数量在上升,电脑处理这些数据的时候轰轰轰的声音也就越来越大。大家都知道使用这种方式会将整个文件的内容读取进内存,文件越大,对内存的占用越大。大家也都知道使用stream流会在内部的缓冲器中存储数据,缓冲的大小为初始化时候传入的 highWaterMark,当调用 stream.push 或者 writable.write 的时候都会去检测可读缓冲和可写缓冲占用是否超过 highWaterMark,这样就保证了对于内存的占用是很小的。

改成Stream与遇到的坑

大家都知道一个可读流的pipe可以将数据从缓冲区流向一个可写流,也可以先流向一个转换流(Transform),再通过pipe的链式调用流向下一个可写流或者双工流。转换流也可以认为是一个双工流,可以会输入做一些计算或者处理然后输出,因此这就是处理文件数据最好的选择。
const fs = require('fs');
const path = require('path');
const stream = require('stream');
const execData = new stream.Transform();
execData. _transform = (chunk, encoding, cb) => {
  let str = chunk.toString();
  let dataList = str.split('\n');

  let dataRes = dataList.map(data => {
    let reg = /\*\s+(\d+)\s+.*?\*\s+(.*)/;
    if (reg.test(data)) {
      let execRes = reg.exec(data);
      let num = execRes[1];
      let name = execRes[2].replace( /(制作所|集团|技术|开发|产业|科技|电子|有限|责任|股份|株式会社|公司|\(.*?\))/ig,'');
      if (name) {
        return `${name} ${num} ${execRes[2].replace(/\s/ig, '')}`;
      }
    }
    return false;
  }).filter(v => v);

  execData.push(dataRes.join("\n"));
  cb();
}
exec.on('error', (err) => {
  console.log(err);
});

fs.createReadStream(
        path.resolve(__dirname, './from.txt')
    )
    .pipe(exec)
    .pipe(
        fs.createWriteStream(path.resolve(__dirname, './res.txt'))
    );
上面这代码看起来挺完美的,写完我还很得意的重重地在键盘上敲下了执行的命令,刚开始看结果数据的时候发现挺好,不过看着看着我发现有一个很严重的问题: 有些汉字在上一个chunk被阶段了,一个汉字有两个字节,在上一个chunk最后有一个字节,在下一个chunk最开始有一个字节,这样在读取的时候就会出现这两个字节的地方出现两个乱码。

如何解决

在node中有一个内置的模块,叫做 string_decoder 字符解码器,提供了一个实例方法StringDecoder来把 Buffer 对象解码成字符串。当一个 Buffer 实例被写入 StringDecoder 实例时,会使用一个内部的 buffer 来确保解码后的字符串不会包含残缺的多字节字符。 残缺的多字节字符会被保存在这个 buffer 中,直到下次调用 stringDecoder.write() 或直到 stringDecoder.end() 被调用。
另外还有一个问题是每次chunk都不一定是行的结尾,也不一定从行的开头开始,所以就会出现一行有一部分在上一个chunk中,然后根据 \n 分隔的时候就会让原本一行的内容变为两行,所以需要有一个中间变量,来存储上一个chunk的最后一行,在处理下一个chunk的时候将它添加到开头。
因此,把上面的代码改造如下:
const fs = require('fs');
const path = require('path');
const stream = require('stream');
const StringDecoder = require('string_decoder').StringDecoder;
const execData = new stream.Transform();
const decoder = new StringDecoder('utf8');

let preLastLine = '';

execData. _transform = (chunk, encoding, cb) => {
  let str = decoder.write(chunk);
  let dataList = (preLastLine + str).split('\n');

 if (str)  preLastLine = dataList.pop();

  let dataRes = dataList.map(data => {
    let reg = /\*\s+(\d+)\s+.*?\*\s+(.*)/;
    if (reg.test(data)) {
      let execRes = reg.exec(data);
      let num = execRes[1];
      let name = execRes[2].replace( /(制作所|集团|技术|开发|产业|科技|电子|有限|责任|股份|株式会社|公司|\(.*?\))/ig,'');
      if (name) {
        return `${name} ${num} ${execRes[2].replace(/\s/ig, '')}`;
      }
    }
    return false;
  }).filter(v => v);

  execData.push(dataRes.join("\n") + "\n");
  cb();
}
exec.on('error', (err) => {
  console.log(err);
});

fs.createReadStream(
        path.resolve(__dirname, './from.txt')
    )
    .pipe(exec)
    .pipe(
        fs.createWriteStream(path.resolve(__dirname, './res.txt'))
    );