NodeJS大量數(shù)據(jù)導(dǎo)出導(dǎo)致系統(tǒng)內(nèi)存溢出的解決辦法

大量數(shù)據(jù)導(dǎo)出導(dǎo)致系統(tǒng)內(nèi)存溢出的解決辦法

在web開發(fā)中,我們經(jīng)??赡軙龅綄?dǎo)出報表等統(tǒng)計功能,常規(guī)做法就是會從數(shù)據(jù)庫中拉取大量數(shù)據(jù),甚至有可能還會統(tǒng)計所有數(shù)據(jù)的一個總量。當我們一次性讀取所有數(shù)據(jù)到內(nèi)存中時,就極可能導(dǎo)致系統(tǒng)OOM。因為我的后臺系統(tǒng)使用的是NodeJS的Nest框架,數(shù)據(jù)庫ORM使用的是Sequelize,下面就這種問題我總結(jié)一下處理的方法。

方法一、修改V8內(nèi)存大小

從《深入淺出NodeJS》中我們得知64位系統(tǒng)內(nèi)存限制約為1.4GB,所以我們一次性講數(shù)據(jù)加載進內(nèi)存中,就有可能超過這個限制,導(dǎo)致OOM。但是NodeJS提供了一個程序運行參數(shù) --max-old-space-size,可以通過該參數(shù)指定V8所占用的內(nèi)存空間,這樣可在一定程度上便面程序內(nèi)存溢出

方法二、使用非V8內(nèi)存

這也是在項目中采用的方法。Buffer是一個NodeJS的擴展對象,使用底層的系統(tǒng)內(nèi)存,不占用V8內(nèi)存空間。與之相關(guān)的文件系統(tǒng)fs和流Stream流操作,都不會占用V8內(nèi)存。下面我就流操作的解決方法,詳細梳理一遍。

MySQL數(shù)據(jù)庫有流式讀取方式,不是一次性讀取所有數(shù)據(jù)到內(nèi)存,而是根據(jù)使用者的需要,部分的讀取數(shù)據(jù)。但是Sequelize這個ORM模型又不支持流式讀取,所以在系統(tǒng)中我們額外引入了支持流式查詢的Knex查詢構(gòu)造器和Mysql2驅(qū)動程序。

1、構(gòu)建數(shù)據(jù)庫服務(wù)

import { Injectable } from "@nestjs/common";
import * as config from "config";
import { getLogger } from "log4js";
import * as Knex from "knex";

const logger = getLogger("Knex");
interface MysqlConfig {
  name: string;
  read: [
    {
      host: string,
      port: number,
      username: string,
      password: string
    }
  ];
}

const { name: database, read: [conf] } = config.get<MysqlConfig>("mysql");
const knex = Knex({
  client: "mysql2",
  pool: {
    min: 1,
    max: 10
  },
  connection: {
    database,
    port: conf.port,
    host: conf.host,
    user: conf.username,
    password: conf.password,
    decimalNumbers: true
  }
});

@Injectable()
export class ReadService {
  async* exec(sql, params, limit = 5000) {
    // 讀取limit條數(shù)據(jù),通過yield返回數(shù)據(jù)
    // xxx...
  }
}

2、創(chuàng)建Csv相關(guān)服務(wù)

import * as path from "path";
import * as convert from "iconv-lite";
import * as Achiver from "archiver";
import { getLogger } from "log4js";
import { createReadStream, promises as fs, writeFileSync } from "fs";

const logger = getLogger("ExportCsv");

export class Csv {
  private readonly name: string;
  private readonly path: string;
  // 是否壓縮
  private readonly compress: boolean;

  constructor(name: string, columns: string[], rows: any[] = [], compress: boolean = false) {
    this.compress = compress;
    this.name = path.basename(name, ".csv");
    this.path = path.format({
      ext: ".csv",
      root: process.cwd(),
      name: Date.now() + this.name
    });

    let txt = columns.join(",");
    if (Array.isArray(rows)) {
      txt += Csv.parse(rows);
    }
    // 同步創(chuàng)建一個只包含字段名的文件
    writeFileSync(this.path, convert.encode(txt, "GBK", { addBOM: true }));
  }

  // 轉(zhuǎn)換數(shù)據(jù)中一些特殊符號
  private static parse(rows): string {
    return "\r\n" + rows.map(r => {
        return r
          .map(x => {
            if (typeof x === "string") {
              x = x.replace(/,/g, ",").replace(/\n/g, "-");
            }

            return x;
          })
          .join(",");
      }
    ).join("\r\n");
  }

  // 在同一個文件中,異步新增數(shù)據(jù)
  async append(rows: any[]) {
    return fs.appendFile(this.path, convert.encode(Csv.parse(rows), "GBK", { addBOM: true }));
  }

  // 根據(jù)傳入的compress參數(shù)判斷,是否壓縮文件
  // res也是一個流對象,所有可以進行流的相關(guān)操作
  async send(res) {
    if (this.compress) {
      // 返回壓縮后的文件
    } else {
      // 返回普通文件
    }
  }

  // 監(jiān)控當文件下載完畢后,則刪除服務(wù)器生成的文件
  private observe(res) {
    return new Promise((resolve, reject) => {
      res.on("end", () => {
        resolve();
        fs.unlink(this.path).catch(logger.error.bind(logger));
      });
      res.on("error", err => reject(err));
    });
  }
}

3、在控制器函數(shù)調(diào)用服務(wù)

// 用于統(tǒng)計總量的數(shù)據(jù)
const total = {
      money: 0,
      deposit: 0
    };
// 查詢大量數(shù)據(jù)的SQL
const sql = `xxx`;
// 實例化Csv對象
const csv = new Csv(`訂單結(jié)算數(shù)據(jù)${start}至${end}.csv`, title, null, true);
// 執(zhí)行SQL,一次性讀取10000條數(shù)據(jù),返回的是一個可遍歷的生成器
const reader = this.reader.exec(sql, params, 10000);
// 遍歷生成器,不停的向文件中寫數(shù)據(jù)
// 如果我們直接導(dǎo)出數(shù)據(jù),那么在csv.append方法中,直接返回[[1, 22, 33], [2, 44, 55]]
// 如果我們要根據(jù)所有數(shù)據(jù),進行一個統(tǒng)計計算,那么我們可以新寫一個parseRows方法來進行統(tǒng)計
for await (const x of reader) {
    await csv.append(await this.parseRows(x, { ems, sites, total }));
}
// 總量數(shù)據(jù)
const row = [
      _.round(total.money, 2),
      _.round(total.deposit, 2),
    ];
// 寫入文件
await csv.append([row]);
// 返回csv對象,在攔截器中處理
return csv;

4、攔截器中處理返回形式

import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from "@nestjs/common";
import { map } from "rxjs/operators";
import { Csv } from "@shared/commons";

export interface Response<T> {
  code: number;
  data: T;
}

@Injectable()
export class FormatInterceptor<T> implements NestInterceptor<T, Response<T>> {
  intercept(context: ExecutionContext, next: CallHandler) {
    return next.handle().pipe(map(data => {
      if (data instanceof Csv) {
        // 處理數(shù)據(jù)導(dǎo)出數(shù)據(jù)格式
        const res = context.switchToHttp().getResponse();
        // send方法就是csv服務(wù)中的導(dǎo)出方法
        return data.send(res);
      }

      // 處理常規(guī)API數(shù)據(jù)格式
      const result: any = { code: 0, data: "success" };
      if (data) {
        if (data.rows) {
          data.meta = data.rows;
          data.total = data.count;
          delete data.rows;
          delete data.count;
        }

        result.data = data;
      }

      return result;
    }));
  }
}

自此整個流程:流式讀取數(shù)據(jù)->異步寫入文件->壓縮文件->返回文件前端下載->刪除生成的文件就結(jié)束了。這樣我們分步驟處理數(shù)據(jù),就不會因為數(shù)據(jù)量過大導(dǎo)致內(nèi)存溢出了。部分方法中的代碼我沒有寫上來,如果有需要可以互相探討一下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容