I'm having some hard time learning the idea behind Fibers\coroutines and the implementation in Crystal.
I hope this is the right place to ask this, I'll totally accept a "not here" answer :)
This is my usual way of handling multi-threading in Ruby:
threads = []
max_threads = 10
loop do
  begin
    threads << Thread.new do
      helper_method(1,2,3,4)
    end
  rescue Exception => e
    puts "Error Starting thread"
  end
  begin
    threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    while threads.size >= max_threads
      puts 'Got Maximum threads'
      sleep 1
      threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    end
  rescue Exception => e
    puts e
  end
end
This way I open a new Thread, usually of a incoming connection or some other thing, add the Thread to a threads array, and then check that I don't have more threads then what I wanted.
What would be a good way to implement something similar in Crystal using spawn\channels\fibers etc.. ?
Something like this:
require "socket"
ch = Channel(TCPSocket).new
10.times do
  spawn do
    loop do
      socket = ch.receive
      socket.puts "Hi!"
      socket.close
    end
  end
end
server = TCPServer.new(1234)
loop do
  socket = server.accept
  ch.send socket
end
This code will pre-spawn 10 fibers to attend the requests. The channel is unbuffered so the connections wont be queuing if they cannot be attended by any fiber.
You can't replicate the way it works for threads. spawn doesn't return a coroutine object, and there ain't no way to join coroutines.
Yet we can open a channel to communicate between the coroutines and the pool manager. This manager may run within it's own coroutine or be the main coroutine —that will prevent the process from exiting.
Here is a working example, with a worker(&block) method that will spawn a coroutine, and open a channel to return its status (it failed or it terminated), and a pool(&block) method that will keep a pool of such workers and read from the result channels to know the state of the coroutines, and keep spawning new ones.
def worker(&block)
  result = UnbufferedChannel(Exception?).new
  ::spawn do
    begin
      block.call
    rescue ex
      result.send(ex)
    else
      result.send(nil)
    end
  end
  result
end
def pool(size, &block)
  counter = 0
  results = [] of UnbufferedChannel(Exception?)
  loop do
    while counter < size
      counter += 1
      puts "spawning worker"
      results << worker(&block)
    end
    result = Channel.select(results)
    counter -= 1
    results.delete(result)
    if ex = result.receive
      puts "ERROR: #{ex.message}"
    else
      puts "worker terminated"
    end
  end
end
pool(5) do
  loop { helper_method(1, 2, 3, 4) }
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With