Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark Athena connector

I need to use Athena in spark but spark uses preparedStatement when using JDBC drivers and it gives me an exception "com.amazonaws.athena.jdbc.NotImplementedException: Method Connection.prepareStatement is not yet implemented"

Can you please let me know how can I connect Athena in spark

like image 767
Nipun Avatar asked Dec 05 '25 07:12

Nipun


2 Answers

I don't know how you'd connect to Athena from Spark, but you don't need to - you can very easily query the data that Athena contains (or, more correctly, "registers") from Spark.

There are two parts to Athena

  1. Hive Metastore (now called the Glue Data Catalog) which contains mappings between database and table names and all underlying files
  2. Presto query engine which translates your SQL into data operations against those files

When you start an EMR cluster (v5.8.0 and later) you can instruct it to connect to your Glue Data Catalog. This is a checkbox in the 'create cluster' dialog. When you check this option your Spark SqlContext will connect to the Glue Data Catalog, and you'll be able to see the tables in Athena.

You can then query these tables as normal.

See https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html for more.

like image 186
Kirk Broadhurst Avatar answered Dec 07 '25 17:12

Kirk Broadhurst


You can use this JDBC driver: SimbaAthenaJDBC

<dependency>
    <groupId>com.syncron.amazonaws</groupId>
    <artifactId>simba-athena-jdbc-driver</artifactId>
    <version>2.0.2</version>
</dependency>

to use:

SparkSession spark = SparkSession
    .builder()
    .appName("My Spark Example")
    .getOrCreate();

Class.forName("com.simba.athena.jdbc.Driver");

Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass", 
    "com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");

List<String> predicateList =
    Stream
        .of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
        .collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);

Dataset<Row> data =
    spark.read()
        .jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
            "my_env.my_table", predicates, connectionProperties);

You can also use this driver in a Flink application:

TypeInformation[] fieldTypes = new TypeInformation[] {
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
};

RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.simba.athena.jdbc.Driver")
    .setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
    .setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
    .setRowTypeInfo(rowTypeInfo)
    .finish();

DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
like image 31
Wil Avatar answered Dec 07 '25 17:12

Wil