TheSchwartzで関連するジョブをグルーピングしてまとめて実行する

TheSchwartz では、関連するジョブをグルーピングしてまとめて実行することができるとドキュメントに書いてあったので、試しにやってみました。(参考にしたのは、Movable Type のソースコード。)

グルーピングしない場合

まずは、普通に処理する場合。キューにジョブを追加する client.pl はこんな感じ。

#!/usr/bin/perl
use strict;
use warnings;
use TheSchwartz;
use DateTime;

my $client = TheSchwartz->new(
    databases => [
        {   
            dsn => 'dbi:mysql:TheSchwartz',
            user => 'db_user',
            pass => 'db_password',
        }   
    ],  
);
$client->insert( 'SampleWorker1', { datetime => DateTime->now( time_zone => 'local') } );
print "added a  job\n";

続いて、溜まったジョブを実行する worker.pl はこんな感じ。

package SampleWorker1;
use strict;
use warnings;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;

sub work {
    my $class = shift;
    my TheSchwartz::Job $job = shift;
    printf("[%s] %s\n", $job->arg->{datetime}->hms(), $job->funcname); 
    $job->completed();
}

package main;
use strict;
use warnings;
use TheSchwartz;

my $client = TheSchwartz->new(
    databases => [
        {   
            dsn => 'dbi:mysql:TheSchwartz',
            user => 'db_user',
            pass => 'db_password',
        }   
    ],  
);
$client->can_do('SampleWorker1');
$client->work();

worker.pl を実行すると監視状態になるので、client.pl を何度か叩いてみます。すると、以下のような結果が返ってきます。

[18:43:48] SampleWorker1
[18:43:49] SampleWorker1
[18:43:51] SampleWorker1
[18:43:54] SampleWorker1
[18:43:53] SampleWorker1
[18:43:52] SampleWorker1
[18:43:56] SampleWorker1
[18:43:58] SampleWorker1
[18:43:57] SampleWorker1
[18:43:59] SampleWorker1

グルーピングする場合

では、本題。まず client.pl です。ジョブをグルーピングするには、TheSchwartz::Job のインスタンスを生成して、coalesce というプロパティに値をセットします。

#!/usr/bin/perl
use strict;
use warnings;
use TheSchwartz;
use DateTime;

my $database_info = [ 
    {   
        dsn => 'dbi:mysql:TheSchwartz',
        user => 'db_user',
        pass => 'db_password',
    }   
];

my $client = TheSchwartz->new( databases => $database_info );

### Group A ###
my $job = TheSchwartz::Job->new(
    funcname => 'SampleWorker2',
    coalesce => 'group_A',
    arg      => {
        datetime => DateTime->now( time_zone => 'local' ),
    },
);
$client->insert( $job );
print "added a job to Group A\n";

### Group B ###
$job = TheSchwartz::Job->new(
    funcname => 'SampleWorker2',
    coalesce => 'group_B',
    arg      => {
        datetime => DateTime->now( time_zone => 'local' ),
    },
);
$client->insert( $job );
print "added a job to Group B\n";

### Non grouped job ###
$client->insert('SampleWorker2', { datetime => DateTime->now( time_zone => 'local') }); 
print "added a job\n";

続いて worker.pl ですが、こちらはちょっとだけ工夫が要ります。work() メソッドが受け取ったジョブに coalesce がセットされていれば、TheSchwartz の find_job_with_coalescing_value() メソッドを利用して、同じ値がセットされているジョブを見つけて、まとめて実行します。

package SampleWorker2;
use strict;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;

sub work {
    my $class = shift;
    my TheSchwartz::Job $job = shift;    
    my @jobs = (); 
    push @jobs, $job;

    if (my $key = $job->coalesce) {
        print "work for ", $job->coalesce, "\n";
        my $client = TheSchwartz->new(
            databases => [
                {   
                    dsn => 'dbi:mysql:TheSchwartz',
                    user => 'db_user',
                    pass => 'db_password',
                }
            ],
        );
        while (my $coalesced_job = $client->find_job_with_coalescing_value($class, $key)) {
            push @jobs, $coalesced_job;
        }   
    }   
    
    foreach my $j (@jobs) {
        printf("[%s] %s %s\n", $j->arg->{datetime}->hms(), $j->funcname, $j->coalesce); 
        $j->completed();
    }   
    print "---------\n";
}

package main;
use strict;
use warnings;
use TheSchwartz;

my $client = TheSchwartz->new(
    databases => [
        {   
            dsn => 'dbi:mysql:TheSchwartz',
            user => 'db_user',
            pass => 'db_password',
        }
    ],
);
$client->can_do('SampleWorker2');
$client->work();

client.pl を何度か叩いた後、worker.pl を実行すると、以下のような結果が得られました。group_A および group_B のジョブが一度の処理でまとめて実行されていることがわかります。

work for group_A
[19:10:01] SampleWorker2 group_A
[19:10:03] SampleWorker2 group_A
[19:10:02] SampleWorker2 group_A
---------
[19:10:01] SampleWorker2 
---------
work for group_B
[19:10:02] SampleWorker2 group_B
[19:10:03] SampleWorker2 group_B
[19:10:01] SampleWorker2 group_B
---------
[19:10:02] SampleWorker2 
---------
[19:10:03] SampleWorker2 
---------

以上!

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

次のHTML タグと属性が使えます: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>