I am working on a spark cluster and I have two dataframes. One contains text. The other is a look-up table. Both tables are huge (M and N both could easily exceed 100,000 entries). What is the best way to match them?
Doing a cross-join then filtering results based on matches seems like a crazy idea since I would most certainly run out of memory.
My dataframes look something like this:
df1:
text
0 i like apples
1 oranges are good
2 eating bananas is healthy
. ...
. ...
M tomatoes are red, bananas are yellow
df2:
fruit_lookup
0 apples
1 oranges
2 bananas
. ...
. ...
N tomatoes
I am expecting an output dataframe to look something like this:
output_df:
text extracted_fruits
0 i like apples ['apples']
1 oranges are good ['oranges']
2 eating bananas is healthy ['bananas']
. ...
. ...
M tomatoes are red, bananas are yellow . ['tomatoes','bananas']
One way is to use CountVectorizerModel as 100K lookup words should be manageable for this model (default vocabSize=262144):
The basic idea is to create the CountVectorizerModel based on a customized list from df2
(lookup table). split df1.text
into an array column and then transform this column into a SparseVector which can then be mapped into words:
Edit: in split function , adjusted the regex from \s+
to [\s\p{Punct}]+
so that all punctuation marks are removed. change 'text'
to lower(col('text'))
if the lookup is case insensitive.
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import split, udf, regexp_replace, lower
df2.show()
+---+------------+
| id|fruit_lookup|
+---+------------+
| 0| apples|
| 1| oranges|
| 2| bananas|
| 3| tomatoes|
| 4|dragon fruit|
+---+------------+
Edit-2: Added the following df1 pre-process step and create an array column including all N-gram combinations. For each string with L
words, N=2 will add (L-1)
more items in array, if N=3, (L-1)+(L-2)
more items.
# max number of words in a single entry of the lookup table df2
N = 2
# Pre-process the `text` field up to N-grams,
# example: ngram_str('oranges are good', 3)
# --> ['oranges', 'are', 'good', 'oranges are', 'are good', 'oranges are good']
def ngram_str(s_t_r, N):
arr = s_t_r.split()
L = len(arr)
for i in range(2,N+1):
if L - i < 0: break
arr += [ ' '.join(arr[j:j+i]) for j in range(L-i+1) ]
return arr
udf_ngram_str = udf(lambda x: ngram_str(x, N), 'array<string>')
df1_processed = df1.withColumn('words_arr', udf_ngram_str(lower(regexp_replace('text', r'[\s\p{Punct}]+', ' '))))
Implement the model on the processed df1:
lst = [ r.fruit_lookup for r in df2.collect() ]
model = CountVectorizerModel.from_vocabulary(lst, inputCol='words_arr', outputCol='fruits_vec')
df3 = model.transform(df1_processed)
df3.show(20,40)
#+----------------------------------------+----------------------------------------+-------------------+
#| text| words_arr| fruits_vec|
#+----------------------------------------+----------------------------------------+-------------------+
#| I like apples| [i, like, apples, i like, like apples]| (5,[0],[1.0])|
#| oranges are good|[oranges, are, good, oranges are, are...| (5,[1],[1.0])|
#| eating bananas is healthy|[eating, bananas, is, healthy, eating...| (5,[2],[1.0])|
#| tomatoes are red, bananas are yellow|[tomatoes, are, red, bananas, are, ye...|(5,[2,3],[1.0,1.0])|
#| test| [test]| (5,[],[])|
#|I have dragon fruit and apples in my bag|[i, have, dragon, fruit, and, apples,...|(5,[0,4],[1.0,1.0])|
#+----------------------------------------+----------------------------------------+-------------------+
Then you can map the fruits_vec back to the fruits using model.vocabulary
vocabulary = model.vocabulary
#['apples', 'oranges', 'bananas', 'tomatoes', 'dragon fruit']
to_match = udf(lambda v: [ vocabulary[i] for i in v.indices ], 'array<string>')
df_new = df3.withColumn('extracted_fruits', to_match('fruits_vec')).drop('words_arr', 'fruits_vec')
df_new.show(truncate=False)
#+----------------------------------------+----------------------+
#|text |extracted_fruits |
#+----------------------------------------+----------------------+
#|I like apples |[apples] |
#|oranges are good |[oranges] |
#|eating bananas is healthy |[bananas] |
#|tomatoes are red, bananas are yellow |[bananas, tomatoes] |
#|test |[] |
#|I have dragon fruit and apples in my bag|[apples, dragon fruit]|
#+----------------------------------------+----------------------+
Method-2: As your dataset are not huge in terms of Spark context, the following might work, this will work with lookup value having multiple words as per your comment:
from pyspark.sql.functions import expr, collect_set
df1.alias('d1').join(
df2.alias('d2')
, expr('d1.text rlike concat("\\\\b", d2.fruit_lookup, "\\\\b")')
, 'left'
).groupby('text') \
.agg(collect_set('fruit_lookup').alias('extracted_fruits')) \
.show()
+--------------------+-------------------+
| text| extracted_fruits|
+--------------------+-------------------+
| oranges are good| [oranges]|
| I like apples| [apples]|
|tomatoes are red,...|[tomatoes, bananas]|
|eating bananas is...| [bananas]|
| test| []|
+--------------------+-------------------+
Where: "\\\\b
is word boundary so that the lookup values do not mess up with their contexts.
Note: you may need to clean up all punctuation marks and redundant spaces on both columns before the dataframe join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With