Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Debezium embedded. Trying to see format of offset file. Not working

Tags:

mysql

debezium

I have this code for connecting debezium to a local mysql container. I am trying to see the format of the offset file. This is my code:

package io.debezium.examples.kinesis;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.relational.history.MemoryDatabaseHistory;
import io.debezium.util.Clock;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.core.JsonProcessingException;

/**
 * Demo for using the Debezium Embedded API to send change events to Amazon Kinesis.
 */
public class ChangeDataSender implements Runnable {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeDataSender.class);

    private static final String APP_NAME = "kinesis";
    private static final String KINESIS_REGION_CONF_NAME = "kinesis.region";

    private final Configuration config;
    private final JsonConverter valueConverter;
    private final AmazonKinesis kinesisClient;

    private final ObjectMapper mapper;

    public ChangeDataSender() {
        config = Configuration.empty().withSystemProperties(Function.identity()).edit()
                 .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
                //                .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.postgresql.PostgresConnector")
                .with(EmbeddedEngine.ENGINE_NAME, APP_NAME)
                .with(MySqlConnectorConfig.SERVER_NAME,APP_NAME)
                .with(MySqlConnectorConfig.SERVER_ID, 8192)

                // for demo purposes let's store offsets and history only in memory
                .with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename",
                        "/offsetStoragePath/storage/offset.dat")
                .with("offset.flush.interval.ms", 60000)
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")//
                .with("database.history.file.filename", "/debezium/dbhistory.dat")//

                // Send JSON without schema
                // .with("schemas.enable", true)
                .with("schemas.enable", false)
                .with("database.dbname", "poc")
                .with("plugin.name", "wal2json")
                .build();

        valueConverter = new JsonConverter();
        valueConverter.configure(config.asMap(), false);

        final String regionName = "us-west-2";

        final AWSCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("something");

        kinesisClient = AmazonKinesisClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withRegion(regionName)
                .build();
        mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    }

    @Override
    public void run() {
        final EmbeddedEngine engine = EmbeddedEngine.create()
                .using(config)
                .using(this.getClass().getClassLoader())
                .using(Clock.SYSTEM)
                .notifying(this::sendRecord)
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOGGER.info("Requesting embedded engine to shut down");
            engine.stop();
        }));

        awaitTermination(executor);
    }

    private void awaitTermination(ExecutorService executor) {
        try {
            while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                LOGGER.info("Waiting another 10 seconds for the embedded engine to shut down");
            }
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    private void sendRecord(SourceRecord record){

        // LOGGER.info(record.toString());
        // LOGGER.info(record.valueSchema().fields().toString());
        // LOGGER.info(record.keySchema());
        // LOGGER.info(record.key());
        // LOGGER.info(record.value());
        // LOGGER.info(record.valueSchema());
        LOGGER.info(record.topic());
        // LOGGER.info(record.timestamp());
        // try {
        //     String json =  mapper.writeValueAsString(record);
        //     LOGGER.info(json);
        // } catch (JsonProcessingException e) {
        //     LOGGER.info("Cannot print record in json format");
        // }

        // We are interested only in data events not schema change events
        // if (record.topic().equals(APP_NAME)) {
        //     return;
        // }

        Schema schema = null;

        // if ( null == record.keySchema() ) {
        //     LOGGER.error("The keySchema is missing. Something is wrong.");
        //     return;
        // }

        // For deletes, the value node is null
        if ( null != record.valueSchema() ) {
            schema = SchemaBuilder.struct()
                    // .field("key", record.keySchema())
                    .field("value", record.valueSchema())
                    .build();
        }
        // else{
        //     schema = SchemaBuilder.struct()
        //             .field("key", record.keySchema())
        //             .build();
        // }

        Struct message = new Struct(schema);
        // message.put("key", record.key());

        if ( null != record.value() )
            message.put("value", record.value());

        // String partitionKey = String.valueOf(record.key() != null ? record.key().hashCode() : -1);
        String partitionKey = String.valueOf(record.key() != null ? record.key().hashCode() : -1);
        LOGGER.info(String.format("topic : %s", record.topic()));
        final byte[] payload = valueConverter.fromConnectData("dummy", schema, message);

        PutRecordRequest putRecord = new PutRecordRequest();

        // putRecord.setStreamName(streamNameMapper(record.topic()));
        // putRecord.setStreamName("kinesis.inventory.customers");
        putRecord.setStreamName("kinesis.cscetbon.psql");
        putRecord.setPartitionKey(partitionKey);
        putRecord.setData(ByteBuffer.wrap(payload));

        System.out.println(payload);
    }

    private String streamNameMapper(String topic) {
        return topic;
    }

    public static void main(String[] args) {
        new ChangeDataSender().run();
    }
}

and the important line is this:

.with("offset.storage.file.filename",
                            "/offsetStoragePath/storage/offset.dat")
                    .with("offset.flush.interval.ms", 60000)
                    .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")//
                    .with("database.history.file.filename", 

I'm currently doing nothing with the log except printing it to stdout.

So I am running a docker container with this command for mysql:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
Initializing database

and I'm running commands in mysql with:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'✔ 13:48 ~ $ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

I can't get a offset file to generate. Any idea what I'm doing wrong?

like image 931
Jwan622 Avatar asked Jan 25 '26 20:01

Jwan622


1 Answers

are there any changes processed? If yes could you please try using io.debezium.embedded.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy.

Also are you sure that the user under who you run the Debezium Embedded can write to the given path?

like image 85
Jiri Pechanec Avatar answered Jan 27 '26 11:01

Jiri Pechanec



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!