Since version 1.15 of Apache Flink you can use the compaction feature to merge several files into one. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction
How can we use compaction with bulk Parquet format? The existing implementations for the RecordWiseFileCompactor.Reader (DecoderBasedReader and ImputFormatBasedReader) do not seem suitable for Parquet.
Furthermore we can not find any example for compacting Parquet or other bulk formats.
There are two types of file compactor mentioned in flink's document.
OutputStreamBasedFileCompactor : The users can write the compacted results into an output stream. This is useful when the users don’t want to or can’t read records from the input files.
RecordWiseFileCompactor : The compactor can read records one-by-one from the input files and write into the result file similar to the FileWriter.
If I remember correctly, Parquet saves meta information at end of files. So obviously we need to use RecordWiseFileCompactor. Because we need to read the whole Parquet file so we can get the meta information at the end of the file. Then we can use the meta information (number of row groups, schema) to parse the file.
From the java api, to construct a RecordWiseFileCompactor, we need a instance of RecordWiseFileCompactor.Reader.Factory.
There are two implementations of interface RecordWiseFileCompactor.Reader.Factory, DecoderBasedReader.Factory and InputFormatBasedReader.Factory respectively.
DecoderBasedReader.Factory creates a DecoderBasedReader instance, which reads whole file content from InputStream. We can load the bytes into a buffer and parse the file from the byte buffer, which is obviously painful. So we don't use this implementation.
InputFormatBasedReader.Factory creates a InputFormatBasedReader, which reads whole file content using the FileInputFormat supplier we passed to InputFormatBasedReader.Factory constructor.
The InputFormatBasedReader instance uses the FileInputFormat to read record by record, and pass records to the writer which we passed to forBulkFormat call, till the end of the file.
The writer receives all the records and compact the records into one file.
So the question becomes what is FileInputFormat and how to implement it.
Though there are many methods and fields of class FileInputFormat, we know only four methods are called from InputFormatBasedReader from InputFormatBasedReader source code mentioned above.
Luckily, there's a AvroParquetReader from package org.apache.parquet.avro we can utilize. It has already implemented open/read/close. So we can wrap the reader inside a FileInputFormat and use the AvroParquetReader to do all the dirty works.
Here's a example code snippet
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import java.io.IOException;
public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {
private ParquetReader<GenericRecord> parquetReader;
private GenericRecord readRecord;
@Override
public void open(FileInputSplit split) throws IOException {
Configuration config = new Configuration();
// set hadoop config here
// for example, if you are using gcs, set fs.gs.impl here
// i haven't tried to use core-site.xml but i believe this is feasible
InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
readRecord = parquetReader.read();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public boolean reachedEnd() throws IOException {
return readRecord == null;
}
@Override
public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
GenericRecord r = readRecord;
readRecord = parquetReader.read();
return r;
}
}
Then you can use the ExampleFileInputFormat like below
FileSink<GenericRecord> sink = FileSink.forBulkFormat(
new Path(path),
AvroParquetWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.enableCompactionOnCheckpoint(10)
.build(),
new RecordWiseFileCompactor<>(
new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
@Override
public FileInputFormat<GenericRecord> get() throws IOException {
FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
return format;
}
})
))
.build();
I have successfully deployed this to a flink on k8s and compacted files on gcs. There're some notes for deploying.
After all these steps, you can start your job and good to go.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With