Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read, Manipulate and Insert Data Efficiently using PG-Promise & PG-Query-Stream

I'm looking to do the following.

  1. Query a large table with a group by query to perform summarization of values.
  2. Run those records through a routine to add in some additional data
  3. Insert those into the DB efficiently.

I've tried to do this using pg-query-stream to read the data out as a stream and then count those records up into batches e.g. 1000 at a time and once we reach the batch limit to then use pg-promise pgp.helpers.insert to insert the data.

The problem I have is that I can't quite figure out how to get the stream to pause properly for the insert to complete before continuing. Especially on the on.end()

The code I've tried is below

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
  stream.pause()
  const t0 = performance.now()
  let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)

  if (options.onConflictExpression) {
    query += options.onConflictExpression
  }

  tenant.db.none(query)
    .then(() => {
      let t1 = performance.now()
      console.log('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
      stream.resume()
    })
    .catch(error => {
      throw error
    })
}

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
  try {
    return new Promise((resolve, reject) => {
      const query = new QueryStream(sql)

      // Set options as required
      options.batchSize = parseInt(options.batchSize) || 1000
      options.onConflictExpression = options.onConflictExpression || null

      let records = []
      let batchNumber = 1
      let recordCount = 0

      let t0 = performance.now()
      tenant.db.stream(query, (stream) => {
        stream.on('data', (record) => {
          const mappedRecord = recordMapper(record)
          records.push(mappedRecord)
          recordCount++

          if (records.length === options.batchSize) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.log(`Batch ${batchNumber} done`)
            batchNumber++
          }
        })
        stream.on('end', () => {
        // If any records are left that are not part of a batch insert here.
          if (records.length !== 0) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.log(`Batch ${batchNumber} done`)
            batchNumber++
            console.log('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          } else {
            console.log('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          }
        })
        stream.on('error', (error) => {
          throw error
        })
      })
        .then(data => {
          resolve()
        })
        .catch(error => {
          console.log('ERROR:', error)
          reject(error)
        })
    })
  } catch (err) {
    throw err
  }
}

I'm not not sure if the approach I'm trying is the best one. I've tried a few different things based on the documentation I can find around pg-promise and streams but had no joy.

Any help/advice is greatly appreciated.

Thanks

Paul

Attempt 2

Below is my second attempt trying to use getNextData and sequence as per the data imports page. Struggling to determine how to hook the stream into it to pull only batches of data at a time before inserting.

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new QueryStream(sql)

    function getNextData(transaction, index) {
      return new Promise(async (resolve, reject) => {
        if (index < options.batchSize) {
          let count = 1
          await transaction.stream(query, async (stream) => {
            let records = []
            await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {  
              stream.resume()
              count++
              console.log(count, streamIndex, streamData)        

              records.push(streamData[0])

              if (records.length === options.batchSize) {
                stream.pause()
                resolve(records)
              }
            }, {readChunks: true})

          })  
        }
        resolve(null)
      })
    }

    return tenant.db.tx('massive-insert', (transaction) => {
      return transaction.sequence((index) => {          
        return getNextData(transaction, index)
          .then((records) => {
            if (records > 0) {
              let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              return transaction.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.log('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })
            }
          })
      })
    })
  } catch (err) {
    throw err
  }
}
like image 238
Paul Mowat Avatar asked Dec 04 '25 03:12

Paul Mowat


1 Answers

I've got this working using a slightly different approach more focused around using streams directly, while still using pg-promise to deal with the DB side.

const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')

module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new tenant.lib.QueryStream(sql)

    const stream = tenant.db.client.query(query)

    return new Promise((resolve, reject) => {
      // We want to process this in batches
      const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})

      // We use a write stream to insert the batch into the database
      let insertDatabase = new Writable({
        objectMode: true,
        write(records, encoding, callback) {
          (async () => {

            try {
              /*
                If we have a record mapper then do it here prior to inserting the records.
                This way is much quicker than doing it as a transform stream below by
                about 10 seconds for 100,000 records
              */
              if (recordMapper) {
                records = records.map(record => recordMapper(record))
              }

              let query = tenant.lib.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              await tenant.db.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.log('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })

            } catch(e) {
              return callback(e)
            }

            callback()
          })()
        }
      })

      // Process the stream
      const t0 = performance.now()
      stream
        // Break it down into batches
        .pipe(batch)
        // Insert those batches into the database
        .pipe(insertDatabase)
        // Once we get here we are done :)
        .on('finish', () => {
          const t1 = performance.now()
          console.log('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
          resolve()
        })
        .on('error', (error) => {
          reject(error)
        })

    })
  } catch (err) {
    throw err
  }
}
like image 116
Paul Mowat Avatar answered Dec 05 '25 21:12

Paul Mowat