Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Limiting processes with Parallel::ForkManager

Tags:

fork

perl

I am trying to use Parallel::ForkManager to control some child processes. I would like to limit the number of processes running concurrently to 10. In total I need to run 20.

I know I could set the process limit at 10 in the first line at the object declaration, but I am also using the $pm object to run child processes that do something different (the current function is much more memory intensive so needs to be limited).

The code I have currently does not work, the run on finish call is never made, so the remaining 10 children never get forked. I don't understand why this is the case- I'd have thought the child would still call the finish code on exit,and decrement the count, but the "if" statement seems to stop this. Could someone explain why this is the case?

Thanks for any help!

# Parallel declarations
my $pm = Parallel::ForkManager->new(30);

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_str_ref) = @_; 
    --$active_jobs;
    })

my $total_jobs = 0;
my $active_jobs = 0;
while( $total_jobs < 20) {
    sleep 300 and next if $active_jobs > 10; 

    my $pid = $pm->start and ++$active_p1_jobs and ++$total_p1_jobs and next;

    my $return = module::function(%args);

    $pm->finish(0, { index => $total_jobs, return => $return }); 
    }

print STDERR "Submitted all jobs, now waiting for children to exit.\n";
$pm->wait_all_children();
like image 574
distracted-biologist Avatar asked Jan 25 '26 01:01

distracted-biologist


1 Answers

I'm going to call "type 2" the jobs that are limited to 10.

This is how I'd do it with P::FM:

use strict;
use warnings;

use List::Util            qw( shuffle );
use Parallel::ForkManager qw( );
use POSIX                 qw( WNOHANG );
use Time::HiRes           qw( sleep );

use constant MAX_WORKERS       => 30;
use constant MAX_TYPE2_WORKERS => 10;

sub is_type2_job { $_[0]{type} == 2 }

my @jobs = shuffle(
   ( map { { type => 1, data => $_ } } 0..19 ),
   ( map { { type => 2, data => $_ } } 0..19 ),
);

my $pm = Parallel::ForkManager->new(MAX_WORKERS);

my $type2_count = 0;
$pm->run_on_finish(sub {
   my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $job) = @_;
   --$type2_count if is_type2_job($job);
   print "Finished: $pid, $job->{type}, $job->{data}, $job->{result}\n";
});

my @postponed_jobs;
while (@postponed_jobs || @jobs) {
   my $job;
   if (@postponed_jobs && $type2_count < MAX_TYPE2_WORKERS) {
      $job = shift(@postponed_jobs);
   }
   elsif (@jobs) {
      $job = shift(@jobs);
      if ($type2_count >= MAX_TYPE2_WORKERS && is_type2_job($job)) {
         push @postponed_jobs, $job;
         redo;
      }
   }
   # elsif (@postponed_jobs) {
   #     # Already max type 2 jobs being processed,
   #     # but there are idle workers.
   #     $job = shift(@postponed_jobs);
   # }
   else {
      local $SIG{CHLD} = sub { };
      select(undef, undef, undef, 0.300);
      $pm->wait_one_child(WNOHANG);
      redo;
   }

   ++$type2_count if is_type2_job($job);

   my $pid = $pm->start and next;
   $job->{result} = $job->{data} + 100;  # Or whatever.
   $pm->finish(0, $job);
}

$pm->wait_all_children();

But this is broken. The code that picks the next job should be done in the middle of start (i.e. after it waits for children to finish, but before it forks), not before start. This could cause jobs to be run out of order. This isn't the first time I've wished P::FM has a pre-fork callback. Maybe you could ask the maintainer for one.

like image 166
ikegami Avatar answered Jan 27 '26 17:01

ikegami



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!