Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NoBrokersAvailable: NoBrokersAvailable Error in Kafka

I've stumbled upon a 'NoBrokersAvailable: NoBrokersAvailable'-error in our Jupyter-notebook using this code:

from kafka import KafkaProducer
from kafka.errors import KafkaError

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
INTERVAL =10
while True:
    data_points = get_realtime_stock('AAPL')
    data = {'updated_on': data_points['updated_on'], 'ticker': data_points['security']['ticker'] ,'last_price': data_points['last_price']}
    message = data_points
    producer.send('data1', value=data).add_callback(on_send_success).add_errback(on_send_error)
    time.sleep(INTERVAL)

Here the respective error:

---------------------------------------------------------------------------
NoBrokersAvailable                        Traceback (most recent call last)
<ipython-input-8-cab724428b84> in <module>
     11     # handle exception
     12 
---> 13 producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
     14 INTERVAL =10
     15 while True:

~/anaconda3/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
    379         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
    380                              wakeup_timeout_ms=self.config['max_block_ms'],
--> 381                              **self.config)
    382 
    383         # Get auto-discovered version from client if necessary

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
    237         if self.config['api_version'] is None:
    238             check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 239             self.config['api_version'] = self.check_version(timeout=check_timeout)
    240 
    241     def _can_bootstrap(self):

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
    890         else:
    891             self._lock.release()
--> 892             raise Errors.NoBrokersAvailable()
    893 
    894     def wakeup(self):

NoBrokersAvailable: NoBrokersAvailable

The code worked just fine but out of nowhere it just stopped working for whatever reason. Does anyone know what the problem might be?

like image 546
Louis Koch Avatar asked Oct 23 '25 15:10

Louis Koch


1 Answers

I had the same error and I solved it by specifying the API version on the function KafkaProducer. Here is a sample from my code.

Please specify the version of your kafka-python library if the error persists.

producer = KafkaProducer(
    bootstrap_servers=#####,
    client_id=######,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

For the API version, you should put your Kafka version.

like image 194
Skander HR Avatar answered Oct 26 '25 11:10

Skander HR



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!