When using Java Lambda function to do a kinesis data firehose transformation , getting the below error. The below is my transformed JSON look like
{
"records": [
{
"recordId": "49586022990098427206724983301551059982279766660054253570000000",
"result": "Ok",
"data": "ZXlKMGFXTnJaWEpmYzNsdFltOXNJam9pVkVWVFZEY2lMQ0FpYzJWamRHOXlJam9pU0VWQlRGUklRMEZTUlNJc0lDSmphR0Z1WjJVaQ0KT2kwd0xqQTFMQ0FpY0hKcFkyVWlPamcwTGpVeGZRbz0="
}
]
}
error in the kinesis console is like
Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed
Anyone have an idea on this , i could not find an example code using Java on the kinesis data transformation
https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
This document says about the output structure
I just got done struggling through this in scala (java compatible). The key is to use the return type: com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse
import java.nio.ByteBuffer
import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse._
import com.amazonaws.services.lambda.runtime.events.{KinesisAnalyticsInputPreprocessingResponse, KinesisFirehoseEvent}
import com.amazonaws.services.lambda.runtime.{Context, LambdaLogger, RequestHandler}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
class Handler extends RequestHandler[KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse] {
override def handleRequest(in: KinesisFirehoseEvent, context: Context): KinesisAnalyticsInputPreprocessingResponse = {
val logger: LambdaLogger = context.getLogger
val records = in.getRecords
val tranformed = records.flatMap(record => {
try {
val changed = record.getData.array()
//do some sort of transform
val rec = new Record(record.getRecordId, Result.Ok, ByteBuffer.wrap(changed))
Some(rec)
} catch {
case e: Exception => {
logger.log(e.toString)
Some(new Record(record.getRecordId, Result.Dropped, record.getData))
}
}
})
val response = new KinesisAnalyticsInputPreprocessingResponse()
response.setRecords(tranformed.toList)
response
}
}
A java example:
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse;
import com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Log4j2
@RequiredArgsConstructor
public class FirehoseHandler implements RequestHandler<KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse> {
private final ObjectMapper mapper;
@Override
public KinesisAnalyticsInputPreprocessingResponse handleRequest(KinesisFirehoseEvent kinesisFirehoseEvent, Context context) {
return Flux.fromIterable(kinesisFirehoseEvent.getRecords())
.flatMap(this::transformRecord)
.collectList()
.map(KinesisAnalyticsInputPreprocessingResponse::new)
.block();
}
private Mono<KinesisAnalyticsInputPreprocessingResponse.Record> transformRecord(KinesisFirehoseEvent.Record record) {
return Mono.just(record.getData())
.map(StandardCharsets.UTF_8::decode)
.flatMap(data -> Mono.fromCallable(() -> doYourOwnThing(data)))
.map(StandardCharsets.UTF_8::encode)
.map(data -> new KinesisAnalyticsInputPreprocessingResponse.Record(record.getRecordId(), KinesisAnalyticsInputPreprocessingResponse.Result.Ok, data))
.onErrorResume(e -> Mono.just(new KinesisAnalyticsInputPreprocessingResponse.Record(record.getRecordId(), KinesisAnalyticsInputPreprocessingResponse.Result.ProcessingFailed, record.getData())));
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With