Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stream JSON to BigQuery table using createWriteStream?

I have a large JSON file that I want to slightly transform and send to Google BigQuery as a new table. I've used streams in node.js in the past to good effect, which I think is a decent solution for this problem. I am using the official Google node.js BigQuery API. I'm able to create a table with the correct schema without issue. I think I have a solution ready. The program completes fine, but no data ends up landing in my BigQuery table.

Relevant code follows

My node.js streams code:

fs.createReadStream('testfile.json')
        .pipe(require('split')())
        .pipe(require('event-stream').mapSync((data) => {
            if (data.length > 1) {
                let obj;
                try {
                    obj = JSON.parse('{' + (data[data.length - 1] === ',' ? data.slice(0, data.length - 1) : data) + '}');
                } catch (e) {
                    console.error('error parsing!', e, data);
                }
                let user = Object.keys(obj)[0];
                let company = obj[user][0];
                let item = {
                    user: user,
                    company: company
                };
                console.log(item);
                return JSON.stringify(item);
            }
        }))
        .pipe(table.createWriteStream('json'))
        .on('error', (e) => {
            console.error('Error!', e);
        })
        .on('complete', (job) => {
            console.log('All done!', job);
        });

testfile.json looks like this:

{
  "a":["a company", "1234567"],
  "b":["b company", "1234214"],
  "c":["c company", "12332231"]
}

And when I run the program the output looks like:

{ user: 'a', company: 'a company' }
{ user: 'b', company: 'b company' }
{ user: 'c', company: 'c company' }
All done! Job {
  metadata:
   { kind: 'bigquery#job',
   /* lots more data here */

The docs for createWriteStream aren't super detailed as to what format the data should be in to pump into the write stream, so I feel like I'm kind of flying blind.

like image 257
fil maj Avatar asked Oct 16 '25 11:10

fil maj


1 Answers

Found out what I needed to do to a) make the import work and b) get more visibility into what's going on.

Fixing Import

  1. Specify that you will provide a newline-delimited JSON file to createWriteStream:

    let firehose = table.createWriteStream({
        sourceFormat: 'NEWLINE_DELIMITED_JSON'
    });
    

and

  1. Ensure the JSON transformer returns newline-delimited JSON:

    return JSON.stringify(item) + '\n';
    

Visibility Into Stream and Job State

The firehose writeStream has error and complete events that you can subscribe to, but the table's writeStream's complete event provides a Job as an argument that itself has more events you can subscribe to to gain more insight.

let moment = require('moment');
firehose.on('error', (e) => {
    console.error('firehose error!', e);
});
firehose.on('complete', (job) => {
    console.log('Firehose into BigQuery emptied! BigQuery Job details:', job.metadata.status.state, job.metadata.jobReference.jobId);
    console.log('Now we wait for the Job to finish...');
    job.on('complete', (job) => {
        console.log('BigQuery Job loaded', job.statistics.load.inputFileBytes, 'bytes yielding', job.statistics.load.outputRows, 'rows and', job.statistics.load.badRecords, 'bad records in', moment(parseInt(job.statistics.endTime)).from(moment(parseInt(job.statistics.startTime)), true));
    });
    job.on('error', (e) => { console.error('Job error', e); });
});
like image 159
fil maj Avatar answered Oct 19 '25 02:10

fil maj