#!/usr/longitude/bin/perl -w

# YAPC::Europe 2001
# "Parallel Processing with Perl and PVM" - Benjamin Holzman: Longitude, Inc.

# Listing Three : "strand"

use strict;
require LWP::UserAgent;
require URI;
require HTML::Parse;
use Digest::MD5 qw(md5_base64);
use Parallel::Pvm;

use constant SPIDER_INIT      => 100;
use constant SPIDER_REQUEST   => 101;
use constant SPIDER_DIGEST    => 102;
use constant SPIDER_ERROR     => 103;
use constant SPIDER_DUPLICATE => 104;
use constant SPIDER_NEW       => 105;
use constant SPIDER_URIS      => 106;
use constant SPIDER_EXIT      => 107;
use constant SPIDER_TIMINGS   => 108;

my $mytid = Parallel::Pvm::mytid;
my $ptid  = Parallel::Pvm::parent;

my $ua = LWP::UserAgent->new();

my $bufid = Parallel::Pvm::recv($ptid, SPIDER_INIT);
my $base = Parallel::Pvm::unpack();

while (my $bufid = Parallel::Pvm::recv($ptid, -1)) {
  my($status, $bytes, $tag, $tid) = Parallel::Pvm::bufinfo($bufid);
  if ($tag == SPIDER_REQUEST) {
    my %pending = ();
    my $reqURI = Parallel::Pvm::unpack();
    my $request  = HTTP::Request->new('GET', $reqURI);
    my $response = $ua->request($request);
    if ($response->is_success) {
      my $key = md5_base64($response->content);
      Parallel::Pvm::initsend();
      Parallel::Pvm::pack($key);
      Parallel::Pvm::send($ptid, SPIDER_DIGEST);

      $bufid = Parallel::Pvm::recv($ptid, -1);
      my($status, $bytes, $tag, $tid) = Parallel::Pvm::bufinfo($bufid);
      if ($tag == SPIDER_NEW) {
	for my $uri (extract_uris($response->content_ref, $reqURI)) {
	  $pending{$uri} = 1 if $uri =~ /^$base/;
	}
	Parallel::Pvm::initsend();
	Parallel::Pvm::pack(scalar keys %pending);
	Parallel::Pvm::pack(keys %pending) if scalar keys %pending;
	Parallel::Pvm::send($ptid, SPIDER_URIS);
      }
    } else {
      Parallel::Pvm::initsend();
      Parallel::Pvm::send($ptid, SPIDER_ERROR);
    }
  } elsif ($tag == SPIDER_EXIT) {
    my($user, $system) = (times())[0,1];
    Parallel::Pvm::initsend();
    Parallel::Pvm::pack($user);
    Parallel::Pvm::pack($system);
    Parallel::Pvm::send($ptid, SPIDER_TIMINGS);
    Parallel::Pvm::exit();
    exit;
  }
}

sub extract_uris {
  my($content, $base) = @_;
  my @uris;
  for (@{ HTML::Parse::parse_html($$content)->extract_links }) {
    my($link, $elem) = @$_;
    my $uri = URI->new($link)->abs($base);
    push @uris, join(":", $uri->scheme, $uri->opaque);
  }
  return @uris;
}
