Click here to Skip to main content
15,881,938 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
The example shows a small application which should run multi-threaded to speed up
the data processing.

There is an input-thread, creating work and putting it into a queue created with Thread::Queue

A bunch of worker threads is getting started, in this example you provide the count of workers as an argument to the script.

0 means no threads at all, fastest execution
1 starts just one worker
2 starts two workers, and so on.

Why does this construct happen to slow down instead of spreading the workload?


PERL
#!/usr/bin/perl
  
use strict;
use warnings;
use Time::HiRes qw(sleep time usleep);

use threads 1.39;
use threads::shared;
use Thread::Queue;

# Maximum working threads, by default 1, can be changed by giving a numeric argument, 0 means no threads, 1 and higher gives n-Threads
my $MAX_THREADS = 1;
if(@ARGV == 1) {
  $MAX_THREADS = shift @ARGV;
}
# How many work packages should get processed
my $WORK_COUNT=100;

my $IN_COUNT :shared = 0;
# OUT_COUNT is for getting output in the same order as the output
my $OUT_COUNT :shared = 0;

# without threads at all if you give "0" as the argument, runs incredible fast
if($MAX_THREADS == 0) {
  my $cnt = 0;
  while($cnt < $WORK_COUNT) {
    my %work=(
      l => 10000,
      a => 100,
      b => 200,
    );
    my $result = compute(\%work);
    print "OUTPUT $cnt '$result'\n";
    $cnt++;
  }
  exit 0;
}

MAIN:
{
    my $work_iq = Thread::Queue->new();
    #$work_iq->limit = $MAX_THREADS*5;

    my $work_oq = Thread::Queue->new();

    # Create the thread pool
    for (1..$MAX_THREADS) {
        my $thr = threads->create('worker', $work_iq,$work_oq);
    }
    # Create one input thread
    my $ithr = threads->create('input', $work_iq);

    my $working_threads=$MAX_THREADS;
    # loop while we have working threads
    while($working_threads) {
      my $queue_entry = $work_oq->dequeue();
      if ($$queue_entry{'cnt'} >= 0)  {
        print "OUTPUT $$queue_entry{'cnt'} '$$queue_entry{'result'}'\n";
      } else {
        # got message from exiting thread
        $working_threads--;
      }
    }

    # first we wait for the input thread to exit
    $ithr->join();
    # Wait for all the remaining threads to finish
    foreach my $thr (threads->list()) {
      $thr->join();
    }
}

print("Done\n");
exit(0);

sub compute {
  my $work = shift;
  my $s = 0;
  while($$work{'l'} > 0) {
    my $c = $$work{'a'} * $$work{'b'};
    $s += $c;
    $$work{'a'} += 1;
    $$work{'b'} -= 1;
    $$work{'l'}--;
  }
  return $s;
}

# A worker thread handler
sub worker
{
    my ($work_iq,$work_oq) = @_;
    my $work=0;
    # This thread's ID
    my $tid = threads->tid();
    my $again=1;
    # Work loop while there is work
    do {
        # Wait for work from the queue
        my $queue_entry = $work_iq->dequeue();

        # do work while cnt is not negative
        if ($$queue_entry{'cnt'} >= 0)  {
          # compute something
          my $result = compute($$queue_entry{'work'});

          # put our result into the output queue
          my %result_entry=( 'cnt' => $$queue_entry{'cnt'}, 'result' => $result,);
          $work_oq->enqueue(\%result_entry);
       } else {
          $again=0;
          # signalising that this worker has left the worker group
          my %queue_entry=( 'cnt' => -1, 'result' => -1,);
          $work_oq->enqueue(\%queue_entry);
       }
    } while ($again);

    printf("Finished -> %2d\n", $tid);
}

sub input {
    my ($work_iq) = @_;
    while ($IN_COUNT < $WORK_COUNT) {
      # generate some computing work
      my %work=(
      l => 10000,
      a => 100,
      b => 200,
      );

        # place the work into the queue
        my %queue_entry=( 'cnt' => $IN_COUNT, 'work' => \%work,);
        $work_iq->enqueue(\%queue_entry);
        {
          $IN_COUNT++;
        }
    }
    # send for each worker thread an END queue entry
    my %queue_entry=( 'cnt' => -1, 'work' => -1,);
    for (1..$MAX_THREADS) {
      $work_iq->enqueue(\%queue_entry)
    }
    print "input thread finished\n";
}


What I have tried:

at least it is reproducable in this small example code.
I already removed the use of shared variable, but still the non-threaded version is multiple times faster than the threaded version with one thread.
With every additional thread it gets slower but consumes more CPU at the same time.
Posted
Updated 6-Jul-21 3:26am

1 solution

Perl threads, at least in perl 5, aren't really threads - they're perhaps more like fibers that are controlled by the perl runtime rather than POSIX or other threads. In general, you will get much better performance using a fork rather than perl-threads.

See: When Perl isn't fast enough[^]

perlthrtut - tutorial on threads in Perl[^]
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900