TheSchwartzで関連するジョブをグルーピングしてまとめて実行する
TheSchwartz では、関連するジョブをグルーピングしてまとめて実行することができるとドキュメントに書いてあったので、試しにやってみました。(参考にしたのは、Movable Type のソースコード。)
グルーピングしない場合
まずは、普通に処理する場合。キューにジョブを追加する client.pl はこんな感じ。
#!/usr/bin/perl
use strict;
use warnings;
use TheSchwartz;
use DateTime;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->insert( 'SampleWorker1', { datetime => DateTime->now( time_zone => 'local') } );
print "added a job\n";
use strict;
use warnings;
use TheSchwartz;
use DateTime;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->insert( 'SampleWorker1', { datetime => DateTime->now( time_zone => 'local') } );
print "added a job\n";
続いて、溜まったジョブを実行する worker.pl はこんな感じ。
package SampleWorker1;
use strict;
use warnings;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
printf("[%s] %s\n", $job->arg->{datetime}->hms(), $job->funcname);
$job->completed();
}
package main;
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->can_do('SampleWorker1');
$client->work();
use strict;
use warnings;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
printf("[%s] %s\n", $job->arg->{datetime}->hms(), $job->funcname);
$job->completed();
}
package main;
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->can_do('SampleWorker1');
$client->work();
worker.pl を実行すると監視状態になるので、client.pl を何度か叩いてみます。すると、以下のような結果が返ってきます。
[18:43:48] SampleWorker1 [18:43:49] SampleWorker1 [18:43:51] SampleWorker1 [18:43:54] SampleWorker1 [18:43:53] SampleWorker1 [18:43:52] SampleWorker1 [18:43:56] SampleWorker1 [18:43:58] SampleWorker1 [18:43:57] SampleWorker1 [18:43:59] SampleWorker1
グルーピングする場合
では、本題。まず client.pl です。ジョブをグルーピングするには、TheSchwartz::Job のインスタンスを生成して、coalesce というプロパティに値をセットします。
#!/usr/bin/perl
use strict;
use warnings;
use TheSchwartz;
use DateTime;
my $database_info = [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
];
my $client = TheSchwartz->new( databases => $database_info );
### Group A ###
my $job = TheSchwartz::Job->new(
funcname => 'SampleWorker2',
coalesce => 'group_A',
arg => {
datetime => DateTime->now( time_zone => 'local' ),
},
);
$client->insert( $job );
print "added a job to Group A\n";
### Group B ###
$job = TheSchwartz::Job->new(
funcname => 'SampleWorker2',
coalesce => 'group_B',
arg => {
datetime => DateTime->now( time_zone => 'local' ),
},
);
$client->insert( $job );
print "added a job to Group B\n";
### Non grouped job ###
$client->insert('SampleWorker2', { datetime => DateTime->now( time_zone => 'local') });
print "added a job\n";
use strict;
use warnings;
use TheSchwartz;
use DateTime;
my $database_info = [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
];
my $client = TheSchwartz->new( databases => $database_info );
### Group A ###
my $job = TheSchwartz::Job->new(
funcname => 'SampleWorker2',
coalesce => 'group_A',
arg => {
datetime => DateTime->now( time_zone => 'local' ),
},
);
$client->insert( $job );
print "added a job to Group A\n";
### Group B ###
$job = TheSchwartz::Job->new(
funcname => 'SampleWorker2',
coalesce => 'group_B',
arg => {
datetime => DateTime->now( time_zone => 'local' ),
},
);
$client->insert( $job );
print "added a job to Group B\n";
### Non grouped job ###
$client->insert('SampleWorker2', { datetime => DateTime->now( time_zone => 'local') });
print "added a job\n";
続いて worker.pl ですが、こちらはちょっとだけ工夫が要ります。work() メソッドが受け取ったジョブに coalesce がセットされていれば、TheSchwartz の find_job_with_coalescing_value() メソッドを利用して、同じ値がセットされているジョブを見つけて、まとめて実行します。
package SampleWorker2;
use strict;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
my @jobs = ();
push @jobs, $job;
if (my $key = $job->coalesce) {
print "work for ", $job->coalesce, "\n";
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
while (my $coalesced_job = $client->find_job_with_coalescing_value($class, $key)) {
push @jobs, $coalesced_job;
}
}
foreach my $j (@jobs) {
printf("[%s] %s %s\n", $j->arg->{datetime}->hms(), $j->funcname, $j->coalesce);
$j->completed();
}
print "---------\n";
}
package main;
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->can_do('SampleWorker2');
$client->work();
use strict;
use base qw( TheSchwartz::Worker );
use TheSchwartz::Job;
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
my @jobs = ();
push @jobs, $job;
if (my $key = $job->coalesce) {
print "work for ", $job->coalesce, "\n";
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
while (my $coalesced_job = $client->find_job_with_coalescing_value($class, $key)) {
push @jobs, $coalesced_job;
}
}
foreach my $j (@jobs) {
printf("[%s] %s %s\n", $j->arg->{datetime}->hms(), $j->funcname, $j->coalesce);
$j->completed();
}
print "---------\n";
}
package main;
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [
{
dsn => 'dbi:mysql:TheSchwartz',
user => 'db_user',
pass => 'db_password',
}
],
);
$client->can_do('SampleWorker2');
$client->work();
client.pl を何度か叩いた後、worker.pl を実行すると、以下のような結果が得られました。group_A および group_B のジョブが一度の処理でまとめて実行されていることがわかります。
work for group_A [19:10:01] SampleWorker2 group_A [19:10:03] SampleWorker2 group_A [19:10:02] SampleWorker2 group_A --------- [19:10:01] SampleWorker2 --------- work for group_B [19:10:02] SampleWorker2 group_B [19:10:03] SampleWorker2 group_B [19:10:01] SampleWorker2 group_B --------- [19:10:02] SampleWorker2 --------- [19:10:03] SampleWorker2 ---------
以上!
About this entry







No Trackbacks
trackback uri:コメントやトラックバックは承認待ちになる場合がありますので、すぐに反映されない場合はしばらくお待ちください。