Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark.read() multiple paths at once instead of one-by-one in a for loop

I am running the following code:

list_of_paths is a list with paths that end to an avro file. For example,

['folder_1/folder_2/0/2020/05/15/10/41/08.avro', 'folder_1/folder_2/0/2020/05/15/11/41/08.avro', 'folder_1/folder_2/0/2020/05/15/12/41/08.avro']

Note: The above paths are stored in Azure Data Lake storage, and the below process is executed in Databricks

spark.conf.set("fs.azure.account.key.{0}.dfs.core.windows.net".format(storage_account_name), storage_account_key)
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
begin_time = time.time()

for i in range(len(list_of_paths)):

    try:
      read_avro_data,avro_decoded=None,None

      #Read paths from Azure Data Lake "abfss"
      read_avro_data=spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/{2}".format(storage_container_name, storage_account_name, list_of_paths[i]))

    except Exception as e:
      custom_log(e)

Schema

read_avro_data.printSchema()

root
 |-- SequenceNumber: long (nullable = true)
 |-- Offset: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Body: binary (nullable = true) 
# this is the content of the AVRO file.

Number of rows and columns

print("ROWS: ", read_avro_data.count(), ", NUMBER OF COLUMNS: ", len(read_avro_data.columns))

ROWS:  2 , NUMBER OF COLUMNS:  6

What I want is not to read 1 AVRO file per iteration, so 2 rows of content at one iteration. Instead, I want to read all the AVRO files at once. So 2x3 = 6 rows of content at my final spark DataFrame.

Is this feasible with spark.read()? Something like the following:

spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/folder_1/folder_2/0/2020/05/15/*")

[Update] Sorry for the misunderstanding of wildcard(*). This implies that all AVRO files are in the same folder. But rather, I have 1 folder per AVRO file. So 3 AVRO files, 3 folders. In this case the wildcard won't work. The solution as answered below is the use of a list [] with path names.

Thank you in advance for your help and advice.

like image 285
NikSp Avatar asked Sep 01 '25 02:09

NikSp


1 Answers

load(path=None, format=None, schema=None, **options) this method will accept single path or list of paths.

For example, You can directly pass list of paths like below

spark.read.format("avro").load(["/tmp/dataa/userdata1.avro","/tmp/dataa/userdata2.avro"]).count()

1998

like image 61
Srinivas Avatar answered Sep 02 '25 16:09

Srinivas