I understand that there is no direct UPSERT query one can perform directly from Glue to Redshift. Is it possible to implement the staging table concept within the glue script itself?
So my expectation is creating the staging table, merging it with destination table and finally deleting it. Can it be achieved within the Glue script?
It is possible to implement upsert into Redshift using staging table in Glue by passing 'postactions' option to JDBC sink:
val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"
val fields = datasetDf.toDF().columns.mkString(",")
val postActions =
s"""
DELETE FROM $destination USING $staging AS S
WHERE $destinationTable.id = S.id
AND $destinationTable.date = S.date;
INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
DROP TABLE IF EXISTS $staging
"""
// Write data to staging table in Redshift
glueContext.getJDBCSink(
catalogConnection = "redshift-glue-connections-test",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> staging,
"overwrite" -> "true",
"postactions" -> postActions
)),
redshiftTmpDir = s"$tempDir/redshift",
transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
Make sure the user used for writing to Redshift has sufficient permissions to create/drop tables in the staging schema.
Apparently connection_options dictionary parameter in glueContext.write_dynamic_frame.from_jdbc_conf function has 2 interesting parameters: preactions and postactions
target_table = "my_schema.my_table"
stage_table = "my_schema.#my_table_stage_table"
pre_query = """
drop table if exists {stage_table};
create table {stage_table} as select * from {target_table} LIMIT 0;""".format(stage_table=stage_table, target_table=target_table)
post_query = """
begin;
delete from {target_table} using {stage_table} where {stage_table}.id = {target_table}.id ;
insert into {target_table} select * from {stage_table};
drop table {stage_table};
end;""".format(stage_table=stage_table, target_table=target_table)
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame = datasource0, catalog_connection ="test_red", redshift_tmp_dir='s3://s3path', transformation_ctx="datasink4",
connection_options = {"preactions": pre_query, "postactions": post_query,
"dbtable": stage_table, "database": "redshiftdb"})
Based on https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With