Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use of compaction for Parquet bulk format

Since version 1.15 of Apache Flink you can use the compaction feature to merge several files into one. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction

How can we use compaction with bulk Parquet format? The existing implementations for the RecordWiseFileCompactor.Reader (DecoderBasedReader and ImputFormatBasedReader) do not seem suitable for Parquet.

Furthermore we can not find any example for compacting Parquet or other bulk formats.

like image 844
Jan Avatar asked Sep 11 '25 08:09

Jan


1 Answers

There are two types of file compactor mentioned in flink's document.

OutputStreamBasedFileCompactor : The users can write the compacted results into an output stream. This is useful when the users don’t want to or can’t read records from the input files.

RecordWiseFileCompactor : The compactor can read records one-by-one from the input files and write into the result file similar to the FileWriter.

If I remember correctly, Parquet saves meta information at end of files. So obviously we need to use RecordWiseFileCompactor. Because we need to read the whole Parquet file so we can get the meta information at the end of the file. Then we can use the meta information (number of row groups, schema) to parse the file.

From the java api, to construct a RecordWiseFileCompactor, we need a instance of RecordWiseFileCompactor.Reader.Factory.

There are two implementations of interface RecordWiseFileCompactor.Reader.Factory, DecoderBasedReader.Factory and InputFormatBasedReader.Factory respectively.

DecoderBasedReader.Factory creates a DecoderBasedReader instance, which reads whole file content from InputStream. We can load the bytes into a buffer and parse the file from the byte buffer, which is obviously painful. So we don't use this implementation.

InputFormatBasedReader.Factory creates a InputFormatBasedReader, which reads whole file content using the FileInputFormat supplier we passed to InputFormatBasedReader.Factory constructor.

The InputFormatBasedReader instance uses the FileInputFormat to read record by record, and pass records to the writer which we passed to forBulkFormat call, till the end of the file.

The writer receives all the records and compact the records into one file.

So the question becomes what is FileInputFormat and how to implement it.

Though there are many methods and fields of class FileInputFormat, we know only four methods are called from InputFormatBasedReader from InputFormatBasedReader source code mentioned above.

  • open(FileInputSplit fileSplit), which opens the file
  • reachedEnd(), which checks if we hit end of file
  • nextRecord(), which reads next record from the opened file
  • close(), which cleans up the site

Luckily, there's a AvroParquetReader from package org.apache.parquet.avro we can utilize. It has already implemented open/read/close. So we can wrap the reader inside a FileInputFormat and use the AvroParquetReader to do all the dirty works.

Here's a example code snippet

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import java.io.IOException;

public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {

    private ParquetReader<GenericRecord> parquetReader;
    private GenericRecord readRecord;


    @Override
    public void open(FileInputSplit split) throws IOException {
        Configuration config = new Configuration();
        // set hadoop config here
        // for example, if you are using gcs, set fs.gs.impl here
        // i haven't tried to use core-site.xml but i believe this is feasible
        InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
        parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
        readRecord = parquetReader.read();
    }

    @Override
    public void close() throws IOException {
        parquetReader.close();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return readRecord == null;
    }

    @Override
    public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
        GenericRecord r = readRecord;
        readRecord = parquetReader.read();
        return r;
    }
}

Then you can use the ExampleFileInputFormat like below

FileSink<GenericRecord> sink = FileSink.forBulkFormat(
                new Path(path),
                AvroParquetWriters.forGenericRecord(schema))
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                        .enableCompactionOnCheckpoint(10)
                        .build(),
                new RecordWiseFileCompactor<>(
                        new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
                            @Override
                            public FileInputFormat<GenericRecord> get() throws IOException {
                                FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
                                return format;
                            }
                        })
                ))
        .build();

I have successfully deployed this to a flink on k8s and compacted files on gcs. There're some notes for deploying.

  • You need to download flink shaded hadoop jar from https://flink.apache.org/downloads.html (search Pre-bundled Hadoop in webpage) and the jar into $FLINK_HOME/lib/
  • If you are writing files to some object storage, for example gcs, you need to follow the plugin instruction. Remember to put the plugin jar into the plugin folder but not the lib foler.
  • If you are writing files to some object storage, you need to download the connector jar from cloud service supplier. For example, I'm using gcs and download gcs-connector jar following GCP instruction. Put the jar into some foler other than $FLINK_HOME/lib or $FLINK_HOME/plugins. I put the connector jar into a newly made folder $FLINK_HOME/hadoop-lib
  • Set environment HADOOP_CLASSPATH=$FLINK_HOME/lib/YOUR_SHADED_HADOOP_JAR:$FLINK_HOME/hadoop-lib/YOUR_CONNECTOR_JAR

After all these steps, you can start your job and good to go.

like image 138
Juiceyang Avatar answered Sep 14 '25 17:09

Juiceyang