Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark UDF optimization challenge using a dictionary with regex's (Scala?)

I am trying to optimize the code below (PySpark UDF).

It gives me the desired result (based on my data set) but it's too slow on very large datasets (approx. 180M).

The results (accuracy) are better than available Python modules (e.g. geotext, hdx-python-country). So I'm not looking for another module.

DataFrame:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

regex.csv:

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>

Creating a Pandas DataFrame from regex.csv, group by iso2 and joining the keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$).

df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()

Function:

def get_iso2(x): 
 
    iso2={}
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

PySpark UDF:

get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))

Create new column:

df_new = df.withColumn('iso2',get_iso2_udf('address')

Expected sample output:

[US,US,NL]
[CA]
[BE,BE,AU]

Some places occur in more than one country (input is address column with city, province, state, country...)

Sample:

3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US]
Kalverstraat Amsterdam -> [US,NL]
Kalverstraat Amsterdam, Netherlands -> [US, NL, NL]

Maybe using Scala UDFs in PySpark is an option, but I have no idea how.

Your optimisation recommendations are highly appreciated!

like image 511
John Doe Avatar asked Sep 18 '25 05:09

John Doe


1 Answers

IIUC, you can try the following steps without using UDF:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")

Step-1: convert df_regex to a Spark dataframe df1 and add an unique_id to df.

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

Step-2: left join df_regex to df using rlike

df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

Step-3: count number of matched d2.keywords in d1.address by splitting d1.address by d2.keywords, and then reduce the size of the resulting Array by 1:

df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

Step-4: use array_repeat to repeat the value of iso2 num_matches times (require Spark 2.4+):

df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

Step-5: groupby and do the aggregation:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+

Alternative: Step-3 can also be handled by Pandas UDF:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

Notes:

  1. as pattern-matching with case-insensitive is expensive, we converted all chars of keywords (except anchors or escaped chars like \b, \B, \A, \z) to upper case.
  2. just a reminder, patterns used in rlike and regexp_replace are Java-based while in pandas_udf it's Python-based which might have slight differences when setting up patterns in regex.csv.

Method-2: using pandas_udf

As using join and groupby triggers data shuffling, the above method could be slow. Just one more option for your testing:

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}

# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten

def __get_iso2(addr, ptn):   
   return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

Or return an array of arrays in pandas_udf (w/o reduce and iconcat) and do flatten with Spark:

def __get_iso2_2(addr, ptn):
    return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)

df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()

Update: to find unique countries, do the following:

def __get_iso2_3(addr, ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

Method-3: use a list comprehension:

Similar to @CronosNull's method, In case the list of regex.csv is manageable, you can handle this using a list comprehension:

from pyspark.sql.functions import size, split, upper, col, array, expr, flatten

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()

df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+
like image 179
jxc Avatar answered Sep 19 '25 19:09

jxc