Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upsert from AWS Glue to Amazon Redshift

I understand that there is no direct UPSERT query one can perform directly from Glue to Redshift. Is it possible to implement the staging table concept within the glue script itself?

So my expectation is creating the staging table, merging it with destination table and finally deleting it. Can it be achieved within the Glue script?

like image 465
Arpit Singh Avatar asked Oct 29 '25 14:10

Arpit Singh


2 Answers

It is possible to implement upsert into Redshift using staging table in Glue by passing 'postactions' option to JDBC sink:

val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"

val fields = datasetDf.toDF().columns.mkString(",")

val postActions =
  s"""
     DELETE FROM $destination USING $staging AS S
        WHERE $destinationTable.id = S.id
          AND $destinationTable.date = S.date;
     INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
     DROP TABLE IF EXISTS $staging
  """

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "postactions" -> postActions
  )),
  redshiftTmpDir = s"$tempDir/redshift",
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

Make sure the user used for writing to Redshift has sufficient permissions to create/drop tables in the staging schema.

like image 78
Yuriy Bondaruk Avatar answered Nov 01 '25 12:11

Yuriy Bondaruk


Apparently connection_options dictionary parameter in glueContext.write_dynamic_frame.from_jdbc_conf function has 2 interesting parameters: preactions and postactions

target_table = "my_schema.my_table"
stage_table = "my_schema.#my_table_stage_table"


pre_query = """
    drop table if exists {stage_table};
    create table {stage_table} as select * from {target_table} LIMIT 0;""".format(stage_table=stage_table, target_table=target_table)

post_query = """
    begin;
    delete from {target_table} using {stage_table} where {stage_table}.id = {target_table}.id ; 
    insert into {target_table} select * from {stage_table}; 
    drop table {stage_table}; 
    end;""".format(stage_table=stage_table, target_table=target_table)
    
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = datasource0, catalog_connection ="test_red", redshift_tmp_dir='s3://s3path', transformation_ctx="datasink4",
    connection_options = {"preactions": pre_query, "postactions": post_query, 
                          "dbtable": stage_table, "database": "redshiftdb"})

Based on https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

like image 36
Vzzarr Avatar answered Nov 01 '25 11:11

Vzzarr