I have two RDDs in PySpark:
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
Both RDDs have same number or rows. Now what I want to do is take all the columns in each row from RDD1(converted from unicode to normal string) and the 2nd column from each row in RDD2 (converted from unicode string to float ) and form a new RDD with that. So the new RDD will look like this:
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
Once that is done then I want to do aggregation of last value in each row(the float value) in this new RDD3 by the date value in 1st column. That mans all the rows where date is 2013-01-31 00:00:00, their last numeric values should be added.
How can I do this in PySpark?
You need to zipWithIndex your RDDs, this method creates a tuple with your data and with another value that represents the index of that entry, therefore you can join both RDDs by index.
Your approach should be similar to (I bet there are more efficient ways):
rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))
zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))
print zdd1.join(zdd2).collect()
The output will be:
[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))], after this only a map is required to recompose the data. E.g. below:
combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()
# You can use the .zip method combinedRDD = rdd1.zip(rdd2)
The output will be:
[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
About the data type conversion, I have had that problem before and to solve this I use this snippet.
import unicodedata
convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
.encode('ascii','ignore'), v2)
combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()
Will output: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With