#!/usr/longitude/bin/perl -w

# YAPC::Europe 2001
# "Parallel Processing with Perl and PVM" - Benjamin Holzman: Longitude, Inc.

# Listing Two : "web"

use strict;
use Parallel::Pvm;

use sigtrap qw(die normal-signals);

use constant SPIDER_INIT      => 100;
use constant SPIDER_REQUEST   => 101;
use constant SPIDER_DIGEST    => 102;
use constant SPIDER_ERROR     => 103;
use constant SPIDER_DUPLICATE => 104;
use constant SPIDER_NEW       => 105;
use constant SPIDER_URIS      => 106;
use constant SPIDER_EXIT      => 107;
use constant SPIDER_TIMINGS   => 108;

use constant PVM_HOSTFILE   => "/var/br/pvm_hosts.dat";

use constant TASKS_PER_NODE   => 2;

my $base = $ARGV[0] || "http://127.0.0.1/";
my(%visited); # md5(content) -> URI

# make sure that pvmd is started.
my $info = Parallel::Pvm::start_pvmd(1, PVM_HOSTFILE);
die "start_pvmd: $info" if $info && $info != PvmDupHost;

($info, my @nodes) = Parallel::Pvm::config();
die "config: $info" if $info;

my $mytid = Parallel::Pvm::mytid();

# spawn TASKS_PER_NODE strand processes per node
my($ntask, @tids) = Parallel::Pvm::spawn("strand",TASKS_PER_NODE * @nodes);

print STDERR "started $ntask strand processes\n";

Parallel::Pvm::initsend();
Parallel::Pvm::pack($base);
Parallel::Pvm::mcast(@tids, SPIDER_INIT);

my @uris = ($base);
my %tid2uri;
my @tasks = @tids;

my $done = 0;
while (!$done) {

  # check if we have run out of tasks or work
  if (!@tasks || !@uris) {
    my %pending = ();
    my $bufid;
    while ($bufid = Parallel::Pvm::trecv(-1, -1, 0, 1000)) {
      my($status, $bytes, $tag, $tid) = Parallel::Pvm::bufinfo($bufid);

      my $uri = delete $tid2uri{$tid};
      push @tasks, $tid;

      if ($tag == SPIDER_DIGEST) {
	my $digest = Parallel::Pvm::unpack();

	Parallel::Pvm::initsend();
	if (exists $visited{$digest}) {
	  Parallel::Pvm::send($tid, SPIDER_DUPLICATE);
	} else {
	  $visited{$digest} = $uri;

	  Parallel::Pvm::send($tid, SPIDER_NEW);

	  Parallel::Pvm::recv($tid, SPIDER_URIS);
	  if (my $numURIs = Parallel::Pvm::unpack()) {
	    @pending{Parallel::Pvm::unpack()} = (1)x$numURIs;
	  }
	}
      } elsif ($tag == SPIDER_ERROR) {
	printf STDERR "[t%x]: error retrieving [%s]\n", $tid, $uri;
      }

      # handle any messages that are waiting
      if (Parallel::Pvm::probe(-1, -1)) {
	$bufid = Parallel::Pvm::recv(-1, -1);
	redo;
      }
    }
    $done = keys %tid2uri == 0 && keys %pending == 0 && @uris == 0;
    push @uris, keys %pending;
  }

  # now send as many messages as we can
  my $numMessages = @uris > @tasks ? 0+@tasks : 0+@uris;
  next unless $numMessages;

  for (my $i = 0; $i < $numMessages; $i++) {
    Parallel::Pvm::initsend();
    Parallel::Pvm::pack($uris[$i]);
    Parallel::Pvm::send($tasks[$i], SPIDER_REQUEST);
    $tid2uri{$tasks[$i]} = $uris[$i];
  }
  splice(@tasks, 0, $numMessages);
  splice(@uris, 0, $numMessages);
}

print "$_\n" for sort values %visited;

Parallel::Pvm::initsend();
Parallel::Pvm::mcast(@tids, SPIDER_EXIT);
for my $tid (@tids) {
  Parallel::Pvm::recv($tid, SPIDER_TIMINGS);
  printf "[t%x]: user = %.2f, system = %.2f\n",
	 $tid, Parallel::Pvm::unpack(), Parallel::Pvm::unpack();
}

END {
  Parallel::Pvm::kill($_) for @tids;
  Parallel::Pvm::exit();
}
