EDIT2: TL;DR Version
The following piece of code is causing huge memory burden. Library: Paruqet.NET
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync(); //Main culprit for huge RAM consumption
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
I am currently looking for any experts who can tell how I can read a paruqet file without overburdening RAM and dump the parquet data to SQL.
Complete Story
To give an overview of my project, I am writing a C# program that reads Parquet files, copies to a DataTable and then makes an SQL connection and using SqlBulkCopy dumps the data to an SQL server (SQL Server 2019, localdb located on the same machine).
I am using paralling processing, but I have to mention that I am new to C# as well parallel computing. Most of the code I have built here was by using ChatGPT and Googling.
Now, my program is going to read a directory, gather all files with extension ".parquet" and store them in a string array.
string[] fileList = GetParquetFiles(activeDirectory[0]);
These files will be read in parallel and I am using SemaphoreSlim to limit the number of active parallel threads.
public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
int numberOfConcurrentFiles = 2;
using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
{
List<Task> tasks = new List<Task>();
foreach (var file in fileList)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await ReadAndDeployParquetFile(file, databaseName);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Let's take a flow of 1 such thread. Inside this thread, I am reading the whole Parquet file as a Table (I am using Parquet.NET library to read).
In each thread, I am reading the ParquetTable completely and copying the schema to a DataTable (Just the schema, no data).
Next, I am calculating batchSize to split and read the ParquetTable into "chunks". These chunks of data are again processed parallelly using SemaphoreSlim
public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync();
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
sql = sql.Trim(',') + ')';
SQLConnection conn = new SQLConnection();
conn.ExecuteSqlCommand(sql, tableName, databasename);
int rowCount = parquetTable.Count;
int batchSize = 100000;
decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);
SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
List<Task> tasks = new List<Task>();
Console.WriteLine($"File {tableName} has total batch {(int)parts}");
for (int i= 0; i < (int)parts; i++)
{
await semaphore.WaitAsync();
int currentPart = i;
tasks.Add(Task.Run (() =>
{
try
{
ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Finally, it is added row-by-row into a new DataTable called partTable that each sub thread is given (The schema of main DataTable is cloned and sent across).
public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
SQLConnection conn = new SQLConnection();
int columnCount = parquetTable.Schema.Fields.Count;
for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
{
var row = parquetTable[i];
var dataRow = partTable.NewRow();
for (int j = 0; j < columnCount; j++)
{
if (row[j] != null)
{
dataRow[j] = row[j] ?? DBNull.Value;
}
}
partTable.Rows.Add(dataRow);
}
conn.InsertTable(tableName, partTable, databaseName, currentPart);
partTable.Dispose();
}
Now the issue is, there is a parquet file which as 2 million rows. The chunk size I have given is 100k, so now it would make 10 batches and run them in parallel but keep only 8 threads active at a time (Environment.ProcessorCount is 8 in my PC) and run the remaining 2 when any of the 8 frees up (correct me if I am wrong here).
The file itself is 24MB, but the RAM usage is shooting up to 3GB! How? My understanding of how the program works is When 1 sub-thread is done, it should free up all it's memory. But it appears as if this is not happening.
I used dotMemory application to check the memory usage and the RAM consumption keeps going UP and never comes down at any point.

Can anyone help me understand why the memory is not clearing up after the sub-thread job is done and also help me fix the code to reduce RAM usage? Again, I am very new to C# and even more new to parallel computing, so please go easy on explanation.
EDIT: Fixed batchSize variable number, had wrongly set as 10k instead of 100k.
I used dotMemory
That's the only pertinent thing in your entire question, using a memory profiler to fix your problems. There's also one built into VS itself, you can find it in Debug > Performance Profile > Memory Profile.
However you didn't focus on the important part: what's rooting your objects. Rooting in .Net means objects that are directly stored as fields in classes that are forced to never be cleaned up. Their fields and their fields' fields and so on recursively keeps everything alive for as long as needed.
So if you ever have a question as to what's keeping things in memory (which is your entire question), follow that instance's references up to its root and you'll find out exactly what's not getting cleaned up and why.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With