I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept the connection:
The script
from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys
async def consume():
bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
topic = os.environ.get('TOPIC', 'demo')
group = os.environ.get('GROUP_ID', 'demo-group')
consumer = AIOKafkaConsumer(
topic, bootstrap_servers=bootstrap_server, group_id=group
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def main():
try:
asyncio.run(consume())
except KeyboardInterrupt:
print("Bye!")
sys.exit(0)
if __name__ == "__main__":
print("Welcome to Kafka test script. ctrl + c to exit")
main()
The exception
Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
File "producer.py", line 33, in <module>
main()
File "producer.py", line 25, in main
asyncio.run(produce_message(message))
File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
return future.result()
File "producer.py", line 12, in produce_message
await producer.start()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
await self.client.bootstrap()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>
I've already tested the connection with the kafka shell scripts and it worked fine:
./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties --topic myTopic
But whenever I try with python it just don't work, I've investigated a little and found that it might be the authentication protocol, my KMS Cluster is protected with IAM role-based authentication but no matter how much I search there is no documentation on how to authenticate with IAM in the python kafka libraries: aiokafka, python-kafka, faust, etc.
Does anyone have an example on how to successfully connect to a KMS serverless cluster with IAM role-based authentication using Python?
AWS officially released aws-msk-iam-sasl-signer-python. Here an example code using aiokafka and aws-msk-iam-sasl-signer-python
aiokafka==0.10.0
aws-msk-iam-sasl-signer-python==1.0.1
import asyncio
import os, sys
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import ssl
def create_ssl_context():
_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_ssl_context.options |= ssl.OP_NO_SSLv2
_ssl_context.options |= ssl.OP_NO_SSLv3
_ssl_context.check_hostname = False
_ssl_context.verify_mode = ssl.CERT_NONE
_ssl_context.load_default_certs()
return _ssl_context
class AWSTokenProvider(AbstractTokenProvider):
async def token(self):
return await asyncio.get_running_loop().run_in_executor(None, self._token)
def _token(self):
AWS_REGION = os.getenv('AWS_REGION')
token, _ = MSKAuthTokenProvider.generate_auth_token(AWS_REGION)
return token
async def consume():
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'demo')
KAFKA_GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'demo-group')
KAFKA_SERVER = os.getenv('KAFKA_SERVER', 'b-1.mykafka.abcdef.c5.kafka.us-west-2.amazonaws.com:9098')
consumer = AIOKafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_SERVER,
group_id=KAFKA_GROUP_ID,
security_protocol='SASL_SSL',
ssl_context=create_ssl_context(),
sasl_mechanism="OAUTHBEARER",
sasl_oauth_token_provider=AWSTokenProvider()
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def main():
try:
asyncio.run(consume())
except KeyboardInterrupt:
print("Bye!")
sys.exit(0)
if __name__ == "__main__":
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With