Click here to Skip to main content
15,068,645 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
The example shows a small application which should run multi-threaded to speed up
the data processing.

There is an input-thread, creating work and putting it into a queue created with Thread::Queue

A bunch of worker threads is getting started, in this example you provide the count of workers as an argument to the script.

0 means no threads at all, fastest execution
1 starts just one worker
2 starts two workers, and so on.

Why does this construct happen to slow down instead of spreading the workload?


PERL
#!/usr/bin/perl
  
use strict;
use warnings;
use Time::HiRes qw(sleep time usleep);

use threads 1.39;
use threads::shared;
use Thread::Queue;

# Maximum working threads, by default 1, can be changed by giving a numeric argument, 0 means no threads, 1 and higher gives n-Threads
my $MAX_THREADS = 1;
if(@ARGV == 1) {
  $MAX_THREADS = shift @ARGV;
}
# How many work packages should get processed
my $WORK_COUNT=100;

my $IN_COUNT :shared = 0;
# OUT_COUNT is for getting output in the same order as the output
my $OUT_COUNT :shared = 0;

# without threads at all if you give "0" as the argument, runs incredible fast
if($MAX_THREADS == 0) {
  my $cnt = 0;
  while($cnt < $WORK_COUNT) {
    my %work=(
      l => 10000,
      a => 100,
      b => 200,
    );
    my $result = compute(\%work);
    print "OUTPUT $cnt '$result'\n";
    $cnt++;
  }
  exit 0;
}

MAIN:
{
    my $work_iq = Thread::Queue->new();
    #$work_iq->limit = $MAX_THREADS*5;

    my $work_oq = Thread::Queue->new();

    # Create the thread pool
    for (1..$MAX_THREADS) {
        my $thr = threads->create('worker', $work_iq,$work_oq);
    }
    # Create one input thread
    my $ithr = threads->create('input', $work_iq);

    my $working_threads=$MAX_THREADS;
    # loop while we have working threads
    while($working_threads) {
      my $queue_entry = $work_oq->dequeue();
      if ($$queue_entry{'cnt'} >= 0)  {
        print "OUTPUT $$queue_entry{'cnt'} '$$queue_entry{'result'}'\n";
      } else {
        # got message from exiting thread
        $working_threads--;
      }
    }

    # first we wait for the input thread to exit
    $ithr->join();
    # Wait for all the remaining threads to finish
    foreach my $thr (threads->list()) {
      $thr->join();
    }
}

print("Done\n");
exit(0);

sub compute {
  my $work = shift;
  my $s = 0;
  while($$work{'l'} > 0) {
    my $c = $$work{'a'} * $$work{'b'};
    $s += $c;
    $$work{'a'} += 1;
    $$work{'b'} -= 1;
    $$work{'l'}--;
  }
  return $s;
}

# A worker thread handler
sub worker
{
    my ($work_iq,$work_oq) = @_;
    my $work=0;
    # This thread's ID
    my $tid = threads->tid();
    my $again=1;
    # Work loop while there is work
    do {
        # Wait for work from the queue
        my $queue_entry = $work_iq->dequeue();

        # do work while cnt is not negative
        if ($$queue_entry{'cnt'} >= 0)  {
          # compute something
          my $result = compute($$queue_entry{'work'});

          # put our result into the output queue
          my %result_entry=( 'cnt' => $$queue_entry{'cnt'}, 'result' => $result,);
          $work_oq->enqueue(\%result_entry);
       } else {
          $again=0;
          # signalising that this worker has left the worker group
          my %queue_entry=( 'cnt' => -1, 'result' => -1,);
          $work_oq->enqueue(\%queue_entry);
       }
    } while ($again);

    printf("Finished -> %2d\n", $tid);
}

sub input {
    my ($work_iq) = @_;
    while ($IN_COUNT < $WORK_COUNT) {
      # generate some computing work
      my %work=(
      l => 10000,
      a => 100,
      b => 200,
      );

        # place the work into the queue
        my %queue_entry=( 'cnt' => $IN_COUNT, 'work' => \%work,);
        $work_iq->enqueue(\%queue_entry);
        {
          $IN_COUNT++;
        }
    }
    # send for each worker thread an END queue entry
    my %queue_entry=( 'cnt' => -1, 'work' => -1,);
    for (1..$MAX_THREADS) {
      $work_iq->enqueue(\%queue_entry)
    }
    print "input thread finished\n";
}


What I have tried:

at least it is reproducable in this small example code.
I already removed the use of shared variable, but still the non-threaded version is multiple times faster than the threaded version with one thread.
With every additional thread it gets slower but consumes more CPU at the same time.
Posted
Updated 6-Jul-21 3:26am

1 solution

Perl threads, at least in perl 5, aren't really threads - they're perhaps more like fibers that are controlled by the perl runtime rather than POSIX or other threads. In general, you will get much better performance using a fork rather than perl-threads.

See: When Perl isn't fast enough[^]

perlthrtut - tutorial on threads in Perl[^]
   

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)




CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900