Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop OR change delay of Spring Integration Poller

I am using Spring Integration to read files from a directory using following configuration. However I am looking to stop poller once I found any file until service not restarted again. Is there any way I can change poller delay at runtime OR start/stop Poller at runtime?

@Bean
public MessageChannel fileInputChannel() {
    return new DirectChannel();
}

@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(cron = "0 0/10 19-23,0-6 ? * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    File directory = new File(localFtpDirectory);
    if (clearLocalDir && directory.isDirectory() && directory.exists()) {
        LOG.info("Clear directory {} on startup of service", directory);
        Arrays.stream(directory.listFiles()).forEach(File::delete);
    }
    source.setDirectory(directory);
    source.setFilter(new LastModifiedFileFilter(remoteFileFilter));
    return source;
}

@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public MessageHandler fileHandler() {
    return new MessageHandlerService();
}
like image 316
Sushil Avatar asked Sep 01 '25 10:09

Sushil


1 Answers

I had similar requirement, but was not satisfied with the accepted answer. The start/stop functionality mentioned here is defined by the Lifecycle/SmartLifecycle contract, and the typical scenario I can see is application shutdown or application context refresh. Simply, when I naively tried to use this I indeed got into problems very quickly.

My requirement was rather to pause the poller and resume it later on. This can be easily achieved by the advice and is very easy to implement. However, getting this knowledge was not easy and I ended up with framework source code investigation (indeed I might have overlooked something in the doc).

    @Bean
    public MessagePollingControlAdvice messagePollingControlAdvice() {
        return new MessagePollingControlAdvice();
    }

    //call this method for your DSL bridge configuration etc..
    Consumer<GenericEndpointSpec<BridgeHandler>> pollerConfiguration() {
        return b -> b.poller(pollerFactory -> pollerFactory.advice(messagePollingControlAdvice()));
    }
   

   public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
    private volatile boolean pollingActive = false;

    @Override
    public boolean beforeReceive(Object source) {
        return pollingActive;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, Object source) {
        return result;
    }

    public boolean isPollingActive() {
        return pollingActive;
    }

    //call this method from whatever place in your code to activate/deactivate poller
    public void setPollingActive(boolean pollingActive) {
        this.pollingActive = pollingActive;
    }
   }
like image 114
jan b Avatar answered Sep 13 '25 02:09

jan b