We need to merge two dataset which are having different column names, there are no common columns across the datasets.
We have tried couple of approaches, both of the approaches are not yielding result. Kindly let us know how to combine two dataset using Apache spark Java
Input data set 1
"405-048011-62815", "CRC Industries",
"630-0746","Dixon value",
"4444-444","3M INdustries",
"555-55","Dixon coupling valve"
Input dataset 2
"222-2222-5555", "Tata",
"7777-88886","WestSide",
"22222-22224","Reliance",
"33333-3333","V industries"
Expected out is
    ----------label1----|------sentence1------|------label2---|------sentence2-----------
    | 405-048011-62815  | CRC Industries      | 222-2222-5555 |                      Tata|
    |        630-0746   |   Dixon value       |   7777-88886  |                  WestSide|
    -------------------------------------------------------------------------------------
`
    List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));
    StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
    List<String> listStrings = new ArrayList<String>();
    listStrings.add("405-048011-62815");
    listStrings.add("630-0746");
    Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
    matchFound1.show();
    listStrings.clear();
    listStrings.add("222-2222-5555");
    listStrings.add("7777-88886");
    List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));
    StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
    new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
    Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
    matchFound2.show();
    //Approach 1
    Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2"));
    System.out.println("After concat");
    matchFound3.show();
    //Approach 2
    Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2")));
    System.out.println("After concat 2");
    matchFound4.show();`
Error for each of the approaches are as follows
Approach 1 error
----------
org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];;
!Project [label1#0, sentence1#1, label2#10, sentence2#11]
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]
----------
Error for each of the approaches are as follows
Approach 2 error
org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;;
!Filter concat(label1#0, sentence1#1, label2#10, sentence2#11)
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]
Here In first dataframe (dataframe1) , the columns ['ID', 'NAME', 'Address'] and second dataframe (dataframe2 ) columns are ['ID','Age']. Now we have to add the Age column to the first dataframe and NAME and Address in the second dataframe, we can do this by using lit() function. This function is available in pyspark.
Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.
Spark provides union() method in Dataset class to concatenate or append a Dataset to another. To append or concatenate two Datasets use Dataset. union() method on the first dataset and provide second Dataset as argument. Note: Dataset Union can only be performed on Datasets with the same number of columns.
hope this work for you
DF
val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve")
        val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55")
        val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1")
        val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries")
         val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333")
        val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2")
String Indexer
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
  .setInputCol("label1")
  .setOutputCol("label1Index")
val indexed = indexer.fit(df1).transform(df1)
indexed.show()
val indexer1 = new StringIndexer()
  .setInputCol("label2")
  .setOutputCol("label2Index")
val indexed1 = indexer1.fit(df2).transform(df2)
indexed1.show()
Join
    val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index"))
rnd_reslt12.show()
+---------------+--------------------+-------------+------------+
|         label1|           sentence1|       label2|   sentence2|
+---------------+--------------------+-------------+------------+
|       630-0746|         Dixon value|222-2222-5555|        Tata|
|       4444-444|       3M INdustries|  22222-22224|    Reliance|
|         555-55|Dixon coupling valve|   33333-3333|V industries|
|405048011-62815|      CRC Industries|   7777-88886|    WestSide|
+---------------+--------------------+-------------+------------+
With string indexer i have done with java, this will work.
public class StringIndexer11  {
    public static void main(String[] args) {
        Dataset<Row> csvDataSet=null;
        try{
            System.setProperty("hadoop.home.dir", "D:\\AI matching\\winutil");
            JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
            SQLContext sqlContext = new SQLContext(sc);
            SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
            List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));
            StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });
            Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
            List<String> listStrings = new ArrayList<String>();
            listStrings.add("405-048011-62815");
            listStrings.add("630-0746");
             Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
            matchFound1.show();
            listStrings.clear();
            listStrings.add("222-2222-5555");
            listStrings.add("7777-88886");
            StringIndexer indexer = new StringIndexer()
              .setInputCol("label1")
              .setOutputCol("label1Index");
            Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1);
            //Dataset1.show();
            List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));
            StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });
            Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
            Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
            matchFound2.show();
            StringIndexer indexer1 = new StringIndexer()
              .setInputCol("label2")
              .setOutputCol("label2Index");
            Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2);
            //Dataset2.show();
            Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index").equalTo(Dataset2.col("label2Index"))).drop(Dataset1.col("label1Index")).drop(Dataset2.col("label2Index"));
                    Finalresult.show();
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With