From c07ce87a71750d2a860920d04745e9f5b09cbc45 Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 29 Jan 2013 15:50:20 +0000 Subject: [PATCH] Allow immediate message delivery on queue addition. This facility should allow the email_template function in Template to be deprecated and removed. --- Webperl/Message/Queue.pm | 100 ++++++++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 13 deletions(-) diff --git a/Webperl/Message/Queue.pm b/Webperl/Message/Queue.pm index f6be85d..73a0475 100644 --- a/Webperl/Message/Queue.pm +++ b/Webperl/Message/Queue.pm @@ -17,8 +17,14 @@ # along with this program. If not, see http://www.gnu.org/licenses/. ## @class -# This class allows messages to be added to the message queue, or retrieved from -# it in a format suitable for passing to Message::Sender. +# This class encapsulates the queuing and dispatching of messages. Messages +# can be added to the queue for later delivery (primarily via the queue_message() +# function) and sent by invoking the deliver_queue() or deliver_message() functions. +# +# Note that, in order to use the queue correctly, deliver_queue() needs to be called +# periodically to deliver messages to users. A SendMessages.pl script suitable for +# use as a cron-based message dispatcher is provided in the webperl supportfiles +# directory. package Webperl::Message::Queue; use strict; @@ -67,26 +73,34 @@ sub new { # ready to be sent at a later time by Message::Sender. The supported arguments are # as follows: # -# - subject (required) The email subject. -# - message (required) The body content to show in the email. -# - recipients (required) A reference to an array of userids. Each user will recieve +# - `subject` (required) The email subject. +# - `message` (required) The body content to show in the email. +# - `recipients` (required) A reference to an array of userids. Each user will recieve # a copy of the message. If unique_recip is not set, the recipients may be # visible to each other under some transports. -# - unique_recip (optional) If set, copy of the message is made for each recipient, +# - `unique_recip` (optional) If set, copy of the message is made for each recipient, # with one recipient per message (defaults to false). -# - ident (optional) Allow a user-definable identifier string to be attached to +# - `ident` (optional) Allow a user-definable identifier string to be attached to # the message in the queue (if unique_recip is set, and more than one recipient # is specified, the ident is set in each copy of the message). -# - userid (optional) Contains the ID of the user adding this message. If this is +# - `userid` (optional) Contains the ID of the user adding this message. If this is # undef, the message is recorded as a system-generated one. Note that the # interpretation of this field is controlled by Message::Sender - it may # be used to determine the From: address, or it may be ignored. -# - send_after (optional) Specify the unix timestamp at which the message should be +# - `send_after` (optional) Specify the unix timestamp at which the message should be # sent. If this is not specified, the creation time is used. -# - delay (optional) If specified, this introduces a delay, specified in seconds, +# - `delay` (optional) If specified, this introduces a delay, specified in seconds, # between the message beng added and the first point at which it may be # sent. Note that, if both this and send_after are specified, the delay is # added to the value specified in send_after. +# - `send_immediately` (optional) *USE WITH CAUTION* If specified, the message is +# sent immediately, completely ignoring the `send_after` or `delay` options +# and without waiting for the queue dispatcher to process the pending queue. +# Use this only when sending messages that need to be delivered as rapidly +# as possible (eg: account operations requiring email notification). You should +# be extremely careful using this if a message has more than one recipient +# and `unique_recip` is set, as this function may take a long time to return in +# that situation. # # @param args A hash, or a reference to a hash, of arguments defining the message. # @return true on success, undef on error. @@ -120,6 +134,14 @@ sub queue_message { or return undef; $self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipient $recip"); + + # This will, unfortunately, produce a send for each recipient. Realistically, this + # is no different from the queue dispatcher doing the job, but it may introduce + # unacceptable delays if there are a lot of recipients and slow transports involved. + # That doesn't matter with the queue dispatcher, as that will normally be called from + # a cron job in the background, but this will happen during normal program flow. + $self -> deliver_message($msgid) + if($args -> {"send_immediately"}); } # Otherwise there is one message with multiple recipients. @@ -133,6 +155,9 @@ sub queue_message { } $self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipients ".join(",", @{$args -> {"recipients"}})); + + $self -> deliver_message($msgid) + if($args -> {"send_immediately"}); } return 1; @@ -345,6 +370,8 @@ sub get_sendable_messages { # # @param try_failed If this is set to true, transport modules will try to resend # messages that previously failed to send. +# @return The number of message send failures through any transport (0 indicates +# that all transports sent all the messages successfully), undef on error. sub deliver_queue { my $self = shift; my $try_failed = shift; @@ -371,7 +398,7 @@ sub deliver_queue { # Load the transport... $transport -> {"module"} = $self -> load_transport_module(id => $transport -> {"id"}) - or return $self -> self_error("Transport loading failed: ".$self -> {"errstr"}); + or return $self -> self_error("Transport loading failed: ".$self -> errstr()); # Try to deliver each sendable message foreach my $message (@{$messages}) { @@ -383,15 +410,62 @@ sub deliver_queue { $self -> update_status($message -> {"id"}, $transport -> {"id"}, $sent ? "sent" : "failed", - $sent ? undef : $transport -> {"module"} -> {"errstr"}) + $sent ? undef : $transport -> {"module"} -> errstr()) or return undef; } } } - $self -> {"logger"} -> log("messaging", 0, undef, "Queue delivery finished, processed ".$counts -> {"messages"}." through ".$counts -> {"transports"}." transports. ".$counts -> {"success"}." messages sent, ".($counts -> {"messages"} - $counts -> {"success"})." failed."); + $self -> {"logger"} -> log("messaging", 0, undef, "Queue delivery finished, processed ".$counts -> {"messages"}." through ".$counts -> {"transports"}." transports. ".$counts -> {"success"}." messages sent, ".(($counts -> {"transports"} * $counts -> {"messages"}) - $counts -> {"success"})." failed."); + + return (($counts -> {"transports"} * $counts -> {"messages"}) - $counts -> {"success"}); } +## @method $ deliver_message($messageid) +# Given a message ID, attempt to send the message via all available transports. +# This forces immediate delivery of the specified message, if possible, and +# marks it as either sent or failed. +# +# @param messageid The ID of the message to send. +# @return The number of transport send failures (0 indicates all transports delivered +# the message successfully), undef on error. +sub deliver_message { + my $self = shift; + my $messageid = shift; + + $self -> clear_error(); + + $self -> {"logger"} -> log("messaging", 0, undef, "Starting delivery of message $messageid"); + + # Make sure the message is valid first. + my $message = $self -> get_message($messageid) + or return undef; + + my $failures = 0; + + my $transports = $self -> get_transports(); + foreach my $transport (@{$transports}) { + $transport -> {"module"} = $self -> load_transport_module(id => $transport -> {"id"}) + or return $self -> self_error("Transport loading failed: ".$self -> errstr()); + + # Tru to send the message through this transport + my $sent = $transport -> {"module"} -> deliver($message); + + ++$failures if(!$sent); + + # Store the send status for this transport + $self -> update_status($message -> {"id"}, + $transport -> {"id"}, + $sent ? "sent" : "failed", + $sent ? undef : $transport -> {"module"} -> errstr()) + or return undef; + } + + $self -> {"logger"} -> log("messaging", 0, undef, "Message $messageid send through ".scalar(@{$transports})." transports, $failures send failures"); + + return $failures; +} + # ============================================================================ # Marking of various sorts