I am following the instructions explained in this documentation to connect Apache Kafka to Eclipse Ditto.
https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html
I am not sure about the following.
1) ["ditto:outbound-auth-subject", "..."] under the Authorization context.
2) "address": "topic/key"
Please let me know about them! Thank you in advance.!
Edit:
Please find the command that I used to connect Ditto and Kafka
curl -X POST -i -u devops:foobar -H 'Content-Type: application/json' -d '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
"aggregate": false
},
"piggybackCommand": {
"type": "connectivity.commands:createConnection",
"connection": {
"id": "MyKafkaConnection1",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://radsah:password@localhost:9092",
"specificConfig": {
"bootstrapServers": "10.196.2.218:9092",
"saslMechanism": "plain"
},
"failoverEnabled": true,
"targets": [
{
"address": "digital-twins",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}],
"mappingContext": {
"mappingEngine": "JavaScript",
"options": {
"incomingScript": "function mapToDittoProtocolMsg(\n headers,\n textPayload,\n bytePayload,\n contentType\n) {\n\n if (contentType !== \"application/json\") {\n return null;\n }\n\n var jsonData = JSON.parse(textPayload);\n var temperature = jsonData.temp;\n var humidity = jsonData.hum;\n \n var path;\n var value;\n if (temperature != null && humidity != null) {\n path = \"/features\";\n value = {\n temperature: {\n properties: {\n value: temperature\n }\n },\n humidity: {\n properties: {\n value: humidity\n }\n }\n };\n } else if (temperature != null) {\n path = \"/features/temperature/properties/value\";\n value = temperature;\n } else if (humidity != null) {\n path = \"/features/humidity/properties/value\";\n value = humidity;\n }\n \n if (!path || !value) {\n return null;\n }\n\n return Ditto.buildDittoProtocolMsg(\n \"org.eclipse.ditto\",\n headers[\"device_id\"],\n \"things\",\n \"twin\",\n \"commands\",\n \"modify\",\n path,\n headers,\n value\n );\n}"
}
}
}
}
}' http://localhost:8080/devops/piggyback/connectivity?timeout=8000
I have registered a device using Hono and I am sending the data to Ditto. Ditto successfully receives the data. But I want send this received data to Kafka.
Connection is successfully established between Kafka and Ditto. But I am not receiving at the kafka-consumer "digital-twins". Am I missing something?
Edited with the Policy command:
curl -X PUT 'http://localhost:8080/api/2/policies/org.eclipse.ditto:5100' -u 'ditto:ditto' -H 'Content-Type: application/json' -d '{
"entries": {
"owner": {
"subjects": {
"nginx:ditto": {
"type": "nginx basic auth user"
}
},
"resources": {
"thing:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"policy:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"message:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
}
}
}
}
}
regarding the authorization context you can have a look at the authorization section in our connections documentation. It has to hold a subject that is defined in the policy or ACL of your things.
As example:
The policy of Thing "foo:bar" has read access of the whole thing for subject "somePrefix:someValue" defined.
{
"policyId": "foo:bar",
"entries": {
... //Maybe more entries
"MyKafkaConnection": {
"subjects": {
"somePrefix:someValue": {
"type": "my description for this subject"
}
},
"resources": {
"thing:/": {
"grant": [
"READ"
],
"revoke": []
},
"message:/": {
"grant": [
"READ"
],
"revoke": []
}
}
}
}
}
In the sample you referring to, the events related to "foo:bar" would then be published via the kafka connection on the topic you specified in the address field.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With