I've created two RDDs in PySpark with data extracted from HBase. I want to gather items with the same row keys, store the items and then search through values associated with each of the items. Ideally I'd store the results in a pyspark.sql object, since I want to apply Levenshtein distance to their content.
Details:
In the HBase I have location data, where a row key is the geohash of a given area, and in the columns there are multiple venues in the area with more details (json with description and other text data) on the location. I have two HBase tables and the locations can be the same in both of them. I want to search the data in those two RDDs, check for similar geohashes and store the results in a new data structure.
I don't want to reinvent the wheel and I've just started learning Spark, thus I'm wondering: what's the best way to do such task? Is the built-in function rdd.intersection a good solution?
Edited: Actually thanks to @Aneel's comments I could correct some of my mistakes.
Actually there is a join call on RDDs that gives the same (the join is done on the first column of the RDDs, and the values are a tuple of the rest of the columns of both RDDs), as a call with an JOIN with Spark SQL gives out, instead of doing a cogroup as previously pointed to, since as @Aneel pointed out cogroup squash key-value pair under one single key.
Now on a different note, I tried @Aneel's methods, and the gist above, and try to benchmark it a little bit, here are the results, using databricks' community edition (very small cluster, 6GB of memory, 1 core and Spark 2.1), here is the link. (the code is also at the end of the post)
Here are the results:
Actually it looks like that for small datasets RDDs are faster than Dataframes, but once you reach a threshold (around 250k records), Dataframes join start to be faster
Now as @Aneel suggested, bear in mind that I made a pretty simple example, and you might want to do some testing on your own set of data and environment (I did not go farther than 10M lines in my 2 lists because it already took 2.6 min to initialized).
Initialization code:
#Init code
NUM_TESTS=100
from random import randint
l1 = []
l2 = []
import timeit
for i in xrange(0, 10000000):
t = (randint(0,2000), randint(0,2000))
v = randint(0,2000)
l1.append((t,v))
if (randint(0,100) > 25): #at least 25% of the keys should be similar
t = (randint(0,2000), randint(0,2000))
v = randint(0,2000)
l2.append((t,v))
rdd1 = sc.parallelize(l1)
rdd2 = sc.parallelize(l2)
Spark SQL test:
#Test Spark SQL
def callable_ssql_timeit():
df1 = spark.createDataFrame(rdd1).toDF("id", "val")
df1.createOrReplaceTempView("table1")
df2 = spark.createDataFrame(rdd2).toDF("id", "val")
df2.createOrReplaceTempView("table2")
query="SELECT * FROM table1 JOIN table2 ON table1.id=table2.id"
spark.sql(query).count()
print(str(timeit.timeit(callable_ssql_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")
RDD join test:
#Test RDD join
def callable_rdd_timeit():
rdd1.join(rdd2).count()
print(str(timeit.timeit(callable_rdd_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With