Allow immediate message delivery on queue addition.

This facility should allow the email_template function in Template to be
deprecated and removed.
This commit is contained in:
Chris 2013-01-29 15:50:20 +00:00
parent 9d6a2df4cc
commit c07ce87a71

View File

@ -17,8 +17,14 @@
# along with this program. If not, see http://www.gnu.org/licenses/. # along with this program. If not, see http://www.gnu.org/licenses/.
## @class ## @class
# This class allows messages to be added to the message queue, or retrieved from # This class encapsulates the queuing and dispatching of messages. Messages
# it in a format suitable for passing to Message::Sender. # can be added to the queue for later delivery (primarily via the queue_message()
# function) and sent by invoking the deliver_queue() or deliver_message() functions.
#
# Note that, in order to use the queue correctly, deliver_queue() needs to be called
# periodically to deliver messages to users. A SendMessages.pl script suitable for
# use as a cron-based message dispatcher is provided in the webperl supportfiles
# directory.
package Webperl::Message::Queue; package Webperl::Message::Queue;
use strict; use strict;
@ -67,26 +73,34 @@ sub new {
# ready to be sent at a later time by Message::Sender. The supported arguments are # ready to be sent at a later time by Message::Sender. The supported arguments are
# as follows: # as follows:
# #
# - subject (required) The email subject. # - `subject` (required) The email subject.
# - message (required) The body content to show in the email. # - `message` (required) The body content to show in the email.
# - recipients (required) A reference to an array of userids. Each user will recieve # - `recipients` (required) A reference to an array of userids. Each user will recieve
# a copy of the message. If unique_recip is not set, the recipients may be # a copy of the message. If unique_recip is not set, the recipients may be
# visible to each other under some transports. # visible to each other under some transports.
# - unique_recip (optional) If set, copy of the message is made for each recipient, # - `unique_recip` (optional) If set, copy of the message is made for each recipient,
# with one recipient per message (defaults to false). # with one recipient per message (defaults to false).
# - ident (optional) Allow a user-definable identifier string to be attached to # - `ident` (optional) Allow a user-definable identifier string to be attached to
# the message in the queue (if unique_recip is set, and more than one recipient # the message in the queue (if unique_recip is set, and more than one recipient
# is specified, the ident is set in each copy of the message). # is specified, the ident is set in each copy of the message).
# - userid (optional) Contains the ID of the user adding this message. If this is # - `userid` (optional) Contains the ID of the user adding this message. If this is
# undef, the message is recorded as a system-generated one. Note that the # undef, the message is recorded as a system-generated one. Note that the
# interpretation of this field is controlled by Message::Sender - it may # interpretation of this field is controlled by Message::Sender - it may
# be used to determine the From: address, or it may be ignored. # be used to determine the From: address, or it may be ignored.
# - send_after (optional) Specify the unix timestamp at which the message should be # - `send_after` (optional) Specify the unix timestamp at which the message should be
# sent. If this is not specified, the creation time is used. # sent. If this is not specified, the creation time is used.
# - delay (optional) If specified, this introduces a delay, specified in seconds, # - `delay` (optional) If specified, this introduces a delay, specified in seconds,
# between the message beng added and the first point at which it may be # between the message beng added and the first point at which it may be
# sent. Note that, if both this and send_after are specified, the delay is # sent. Note that, if both this and send_after are specified, the delay is
# added to the value specified in send_after. # added to the value specified in send_after.
# - `send_immediately` (optional) *USE WITH CAUTION* If specified, the message is
# sent immediately, completely ignoring the `send_after` or `delay` options
# and without waiting for the queue dispatcher to process the pending queue.
# Use this only when sending messages that need to be delivered as rapidly
# as possible (eg: account operations requiring email notification). You should
# be extremely careful using this if a message has more than one recipient
# and `unique_recip` is set, as this function may take a long time to return in
# that situation.
# #
# @param args A hash, or a reference to a hash, of arguments defining the message. # @param args A hash, or a reference to a hash, of arguments defining the message.
# @return true on success, undef on error. # @return true on success, undef on error.
@ -120,6 +134,14 @@ sub queue_message {
or return undef; or return undef;
$self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipient $recip"); $self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipient $recip");
# This will, unfortunately, produce a send for each recipient. Realistically, this
# is no different from the queue dispatcher doing the job, but it may introduce
# unacceptable delays if there are a lot of recipients and slow transports involved.
# That doesn't matter with the queue dispatcher, as that will normally be called from
# a cron job in the background, but this will happen during normal program flow.
$self -> deliver_message($msgid)
if($args -> {"send_immediately"});
} }
# Otherwise there is one message with multiple recipients. # Otherwise there is one message with multiple recipients.
@ -133,6 +155,9 @@ sub queue_message {
} }
$self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipients ".join(",", @{$args -> {"recipients"}})); $self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipients ".join(",", @{$args -> {"recipients"}}));
$self -> deliver_message($msgid)
if($args -> {"send_immediately"});
} }
return 1; return 1;
@ -345,6 +370,8 @@ sub get_sendable_messages {
# #
# @param try_failed If this is set to true, transport modules will try to resend # @param try_failed If this is set to true, transport modules will try to resend
# messages that previously failed to send. # messages that previously failed to send.
# @return The number of message send failures through any transport (0 indicates
# that all transports sent all the messages successfully), undef on error.
sub deliver_queue { sub deliver_queue {
my $self = shift; my $self = shift;
my $try_failed = shift; my $try_failed = shift;
@ -371,7 +398,7 @@ sub deliver_queue {
# Load the transport... # Load the transport...
$transport -> {"module"} = $self -> load_transport_module(id => $transport -> {"id"}) $transport -> {"module"} = $self -> load_transport_module(id => $transport -> {"id"})
or return $self -> self_error("Transport loading failed: ".$self -> {"errstr"}); or return $self -> self_error("Transport loading failed: ".$self -> errstr());
# Try to deliver each sendable message # Try to deliver each sendable message
foreach my $message (@{$messages}) { foreach my $message (@{$messages}) {
@ -383,15 +410,62 @@ sub deliver_queue {
$self -> update_status($message -> {"id"}, $self -> update_status($message -> {"id"},
$transport -> {"id"}, $transport -> {"id"},
$sent ? "sent" : "failed", $sent ? "sent" : "failed",
$sent ? undef : $transport -> {"module"} -> {"errstr"}) $sent ? undef : $transport -> {"module"} -> errstr())
or return undef; or return undef;
} }
} }
} }
$self -> {"logger"} -> log("messaging", 0, undef, "Queue delivery finished, processed ".$counts -> {"messages"}." through ".$counts -> {"transports"}." transports. ".$counts -> {"success"}." messages sent, ".($counts -> {"messages"} - $counts -> {"success"})." failed."); $self -> {"logger"} -> log("messaging", 0, undef, "Queue delivery finished, processed ".$counts -> {"messages"}." through ".$counts -> {"transports"}." transports. ".$counts -> {"success"}." messages sent, ".(($counts -> {"transports"} * $counts -> {"messages"}) - $counts -> {"success"})." failed.");
return (($counts -> {"transports"} * $counts -> {"messages"}) - $counts -> {"success"});
} }
## @method $ deliver_message($messageid)
# Given a message ID, attempt to send the message via all available transports.
# This forces immediate delivery of the specified message, if possible, and
# marks it as either sent or failed.
#
# @param messageid The ID of the message to send.
# @return The number of transport send failures (0 indicates all transports delivered
# the message successfully), undef on error.
sub deliver_message {
my $self = shift;
my $messageid = shift;
$self -> clear_error();
$self -> {"logger"} -> log("messaging", 0, undef, "Starting delivery of message $messageid");
# Make sure the message is valid first.
my $message = $self -> get_message($messageid)
or return undef;
my $failures = 0;
my $transports = $self -> get_transports();
foreach my $transport (@{$transports}) {
$transport -> {"module"} = $self -> load_transport_module(id => $transport -> {"id"})
or return $self -> self_error("Transport loading failed: ".$self -> errstr());
# Tru to send the message through this transport
my $sent = $transport -> {"module"} -> deliver($message);
++$failures if(!$sent);
# Store the send status for this transport
$self -> update_status($message -> {"id"},
$transport -> {"id"},
$sent ? "sent" : "failed",
$sent ? undef : $transport -> {"module"} -> errstr())
or return undef;
}
$self -> {"logger"} -> log("messaging", 0, undef, "Message $messageid send through ".scalar(@{$transports})." transports, $failures send failures");
return $failures;
}
# ============================================================================ # ============================================================================
# Marking of various sorts # Marking of various sorts