Asignación de trabajo a threads en un pipe

package Threads::Pipe;
use 5.008004;
use strict;
use warnings;
use threads;
use Thread::Queue;
use Carp;
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = ( 'pipe' );
our $VERSION = '0.01';

# Virtualización de un pipe.
# Se supone que el número de procesos virtuales requeridos
# es conocido de antemano.

sub pipe { # crea el anillo que virtualiza el pipe de tamaño N
  my $nt = shift(); # número de threads
  my $N = shift(); # número de procesadores virtuales 
  my $task = shift(); # subrutina a ejecutar

  my @t; # array de tareas
  my @channel; # array de colas
  # El resto de argumentos son argumentos de $task

  my $id; # identificador de procesador físico

  # creamos canales ...
  for($id=0; $id < $nt; $id++) {
    $channel[$id] = new Thread::Queue; # canal comunicando procesos $id y ($id+1)%$nt
  }

  # creamos threads ...
  for($id=0; $id < $nt; $id++) {
    my $wrap = sub { # clausura que envuelve a la función de usuario $task
      my $i; # identificador de proceso virtual
      my @results;

      for($i = $id; $i < $N; $i += $nt) {
        my $result_i = $task->($i,  $channel[($id+$nt-1)%$nt], $channel[$id], @_);
        push @results, $result_i;
      }
      return \@results;
    }; # end wrap

    $t[$id] = threads->new($wrap, @_); # contexto escalar: retorna escalar
  }
  # La thread 0 no trabaja, sólo espera  ...
  my @result;
  for($id=0; $id < $nt; $id++) {
    $result[$id] = $t[$id]->join(); # join debe ser ejecutado por el padre
  }
  # shuffle 
  my @R;
  for($id=0; $id < $nt; $id++) {
    my @aux = @{$result[$id]};
    for my $i (0..$#aux) {
      $R[$id+$nt*$i] = $aux[$i];
    }
  }
  return @R;
}

1;
Programa:
$ cat -n pipe3.pl
 1  #!/usr/bin/perl -w -I../lib
 2  # File: pipe3.pl
 3
 4  use strict;
 5  use threads;
 6  use Thread::Queue;
 7  use Threads::Pipe qw(pipe);
 8
 9  our $numthreads = (shift || 2); # número de threads "físicas"
10  our $numvirtual = (shift || 8); # número de threads "virtuales"
11  our $nummessages = (shift || 4); # número de mensajes por etapa
12
13  ### main ###
14  &pipe($numthreads, $numvirtual, \&job, $nummessages);
15
16  sub job {
17    my $vid = shift;
18    my $from_left = shift;
19    my $to_right = shift;
20    ##############
21    my $nummessages = shift;
22    my $id = threads->self->tid(); # identificado de ithread Perl
23
24    my ($i, $num);
25    if ($vid) {
26      for $i (1..$nummessages) {
27        $num = $from_left->dequeue;
28        # procesar número ...
29        my $processed = $num*$vid;
30        print "id=$id vid=$vid: num=$num processed=$processed\n";
31        $to_right->enqueue($num);
32      }
33    }
34    else {
35      for $i (1..$nummessages) {
36        print "id=$id vid=$vid: num=$i\n";
37        $to_right->enqueue($i);
38      }
39    }
40  }
Ejecución:
$ pipe3.pl
id=1 vid=0: num=1
id=1 vid=0: num=2
id=1 vid=0: num=3
id=1 vid=0: num=4
id=2 vid=1: num=1 processed=1
id=1 vid=2: num=1 processed=2
id=2 vid=1: num=2 processed=2
id=1 vid=2: num=2 processed=4
id=2 vid=1: num=3 processed=3
id=1 vid=2: num=3 processed=6
id=2 vid=1: num=4 processed=4
id=1 vid=2: num=4 processed=8
id=2 vid=3: num=1 processed=3
id=1 vid=4: num=1 processed=4
id=2 vid=3: num=2 processed=6
id=2 vid=3: num=3 processed=9
id=1 vid=4: num=2 processed=8
id=2 vid=3: num=4 processed=12
id=1 vid=4: num=3 processed=12
id=2 vid=5: num=1 processed=5
id=1 vid=4: num=4 processed=16
id=2 vid=5: num=2 processed=10
id=1 vid=6: num=1 processed=6
id=1 vid=6: num=2 processed=12
id=2 vid=5: num=3 processed=15
id=1 vid=6: num=3 processed=18
id=2 vid=5: num=4 processed=20
id=1 vid=6: num=4 processed=24
id=2 vid=7: num=1 processed=7
id=2 vid=7: num=2 processed=14
id=2 vid=7: num=3 processed=21
id=2 vid=7: num=4 processed=28

Casiano Rodríguez León
2012-02-29