Multi-transport support.

This commit is contained in:
Chris 2012-08-15 14:13:28 +01:00
parent d322edfda5
commit 69ec3668d5
2 changed files with 317 additions and 21 deletions

View File

@ -22,6 +22,7 @@
package Message;
use strict;
use base qw(SystemModule);
use Utils qw(hash_or_hashref);
# ============================================================================
# Constructor
@ -36,8 +37,70 @@ use base qw(SystemModule);
sub new {
my $invocant = shift;
my $class = ref($invocant) || $invocant;
my $self = $class -> SUPER::new(@_)
or return undef;
return $class -> SUPER::new(@_);
return SystemModule::set_error("No module object available.") if(!$self -> {"module"});
return $self;
# ============================================================================
# Transport handling
## @method $ get_transports($include_inactive)
# Obtain a list of currently defined message transports. This will return an array of
# transport hashes describing the currently defined transports.
# @param include_inactive Include all transports, even if they are marked as inactive.
# @return A reference to an array of transport record hashrefs.
sub get_transports {
my $self = shift;
my $include_inactive = shift;
$self -> clear_error();
my $transh = $self -> {"dbh"} -> prepare("SELECT *
FROM `".$self -> {"settings"} -> {"database"} -> {"message_transports"}."`".
($include_inactive ? "" : " WHERE enabled = 1"));
$transh -> execute()
or return $self -> self_error("Unable to perform message transport lookup: ". $self -> {"dbh"} -> errstr);
return $transh -> fetchall_arrayref({});
## @method $ load_transport_module($args)
# Attempt to load an create an instance of a Message::Transport module.
# @param modulename The name of the transport module to load.
# @return A reference to an instance of the requested transport module on success,
# undef on error.
sub load_transport_module {
my $self = shift;
my $args = hash_or_hashref(@_);
# Work out which field is being searched on
my $field;
if($args -> {"id"}) {
$field = "id";
} elsif($args -> {"name"}) {
$field = "name";
} else {
return $self -> self_error("Incorrect arguments to load_transport_module: id or name not provided");
my $modh = $self -> {"dbh"} -> prepare("SELECT perl_module
FROM `".$self -> {"settings"} -> {"database"} -> {"message_transports"}."`
WHERE $field = ?");
$modh -> execute($args -> {$field})
or return $self -> self_error("Unable to execute transport module lookup: ".$self -> {"dbh"} -> errstr);
my $modname = $modh -> fetchrow_arrayref()
or return $self -> self_error("Unable to fetch module name for transport module: entry does not exist");
return $self -> {"module"} -> load_module($modname -> [0]);

View File

@ -23,10 +23,9 @@
package Message::Queue;
use strict;
use base qw(SystemModule);
use base qw(Message);
use Utils qw(hash_or_hashref);
# ============================================================================
# Constructor
@ -51,10 +50,7 @@ sub new {
"deleted" => 1,
"deleted_id" => 1,
"message_ident" => 1,
"status" => 1,
"send_after" => 1,
"sent_time" => 1,
"error_message" => 1
return $self;
@ -123,6 +119,8 @@ sub queue_message {
$self -> _add_recipient($msgid, $recip)
or return undef;
$self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipient $recip");
# Otherwise there is one message with multiple recipients.
@ -134,6 +132,8 @@ sub queue_message {
$self -> _add_recipient($msgid, $recip)
or return undef;
$self -> {"logger"} -> log("messaging", 0, undef, "Queued message $msgid with recipients ".join(",", @{$args -> {"recipients"}}));
return 1;
@ -142,7 +142,10 @@ sub queue_message {
## @method $ delete_message(%args)
# Delete a message from the queue. This will actually mark the message as deleted,
# messages are never really removed. Supported arguments are:
# messages are never really removed, they are simply marked as deleted. Note that
# this will deleted the message in a way that it will no longer be visible to either
# sender or recipient, and it will not be sent via external transports should any
# be in use. Supported arguments are:
# - userid The ID of the user deleting the message. If undef, it is assumed the
# system is deleting the message.
@ -151,6 +154,9 @@ sub queue_message {
# - ident A message ident to search for and delete any messages that have it set.
# This allows a group of messages to be deleted in one go.
# @note This function will not mark messages as deleted if they have been sent or
# viewed by a recipient. Only unsent, unviewed messages may be deleted.
# @param args A hash, or reference to a hash, of arguments specifying the message
# to delete.
# @return The number of messages deleted on success (which maybe 0!), undef on error.
@ -171,6 +177,77 @@ sub delete_message {
## @method $ sender_delete_message($senderid, $messageid, $queuedelete)
# Allow the sender of a message to delete it. Normally this will simply delete the message
# from the sender's message list view (ie: it marks their view of the message as deleted),
# if 'queuedelete' is true, and the message has not been sent or viewed, this will
# delete the message in the message queue as well. Note, as with other delete methods,
# this does not actually remove the message from the queue, it is simply marked as deleted.
# @param senderid The ID of the message sender.
# @param messageid The ID of the message to delete.
# @param queuedelete If true, the message in the queue is marked as deleted as well as
# in the sender's view.
# @return true on success, undef on error.
sub sender_delete_message {
my $self = shift;
my $senderid = shift;
my $messageid = shift;
my $queuedelete = shift;
$self -> clear_error();
$self -> {"logger"} -> log("messaging", $senderid, undef, "Deleting sender view of message $messageid");
# It's always possible to delete the message from the sender's view unless it is already so
my $nukeh = $self -> {"dbh"} -> prepare("UPDATE `".$self -> {"settings"} -> {"database"} -> {"message_sender"}."`
WHERE message_id = ?
AND sender_id = ?
AND deleted IS NULL");
my $result = $nukeh -> execute($messageid, $senderid);
return $self -> self_error("Unable to perform sender message delete: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Sender message delete failed, no rows updated") if($result eq "0E0");
if($queuedelete) {
my $count = $self -> delete_message(id => $messageid,
userid => $senderid);
return undef if(!defined($count));
return 1;
## @method $ recipient_delete_message($recipientid, $messageid)
# Allow an individual recipient of a message to mark it as deleted in their view. This can
# be done regardless of the message status.
# @param recipientid The ID of the recipient user.
# @param messageid The ID of the message to mark as deleted.
# @return true on success, undef on error.
sub recipient_delete_message {
my $self = shift;
my $recipientid = shift;
my $messageid = shift;
$self -> clear_error();
$self -> {"logger"} -> log("messaging", $recipientid, undef, "Deleting recipient view of message $messageid");
my $nukeh = $self -> {"dbh"} -> prepare("UPDATE `".$self -> {"settings"} -> {"database"} -> {"message_recipients"}."`
WHERE message_id = ?
AND recipient_id = ?
AND deleted IS NULL");
my $result = $nukeh -> execute($messageid, $recipientid);
return $self -> self_error("Unable to perform recipient message delete: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Recipient message delete failed, no rows updated") if($result eq "0E0");
return 1;
# ============================================================================
# Retrieval
@ -219,27 +296,111 @@ sub get_messages {
## @method $ get_sendable_messages($include_failed)
# Fetch a list of all messages that can be sent at this time. This will look up all
# messages with a send_after that is less than or equal to the current time that
# have not yet been sent.
## @method $ get_sendable_messages($transportid, $include_failed)
# Fetch a list of all messages that can be sent at this time by the specified transport.
# This will look up all messages with a send_after that is less than or equal to the
# current time that have not yet been sent.
# @param transportid The ID of the transport requesting sendable messages.
# @param include_failed If true, messages that have failed are included in the
# list of returned messages.
# @return A reference to an array of hashrefs on success, undef on error. Note that
# if there are no matching messages, this returns a reference to an empty array.
sub get_sendable_messages {
my $self = shift;
my $transportid = shift;
my $include_failed = shift;
my $fieldspec = [ { field => "send_after", op => "<=", value => time() } ];
if($include_failed) {
push(@{$fieldspec}, {"orgroup" => [ { field => "status", op => "=", value => "pending" }, { field => "status", op => "=", value => "failed" } ]});
} else {
push(@{$fieldspec}, {field => "status", op => "=", value => "pending" } );
# Sendable messages are messsages that have a send_after field value less than the current
# time, and a status of "pending" (or "failed") for the specified transport....
my $sendh = $self -> {"dbh"} -> prepare("SELECT
FROM `".$self -> {"settings"} -> {"database"} -> {"message_queue"}."` AS m,
`".$self -> {"settings"} -> {"database"} -> {"message_status"}."` AS s
WHERE s.message_id =
AND s.transport_id = ?
AND m.deleted IS NULL
AND m.send_after < UNIX_TIMESTAMP() ".
($include_failed ? "AND (s.status = 'pending' OR s.status = 'failed') "
: "AND s.status = 'pending'"));
$sendh -> execute($transportid)
or return $self -> self_error("Unable to perform message transport lookup: ". $self -> {"dbh"} -> errstr);
my $results = [];
while(my $mid = $sendh -> fetchrow_arrayref()) {
my $message = $self -> get_message($mid -> [0])
or return undef;
push(@{$results}, $message);
return $self -> _get_by_fields($fieldspec);
return $results;
# ============================================================================
# Marking of various sorts
## @method $ update_status($messageid, $transportid, $status, $message)
# Update the status of the specified message for the specified transport, setting
# its status message if needed.
# @param messageid The ID of the message to update.
# @param transportid The ID of the transport setting the status.
# @param status The status to set the message to. Must be "pending", "sent", or "failed"
# @param message Optional message status, may be undef.
# @return true on success, undef on error.
sub update_status {
my $self = shift;
my $messageid = shift;
my $transportid = shift;
my $status = shift;
my $message = shift;
$self -> clear_error();
$status = "pending" unless($status eq "sent" || $status eq "failed");
$self -> {"logger"} -> log("messaging", 0, undef, "Updating status of $messageid for transport $transportid: $status [".($message || "No status message")."]");
my $stateh = $self -> {"dbh"} -> prepare("UPDATE `".$self -> {"settings"} -> {"database"} -> {"message_status"}."`
SET status_time = UNIX_TIMESTAMP(), status = ?, status_message = ?
WHERE message_id = ?
AND transport_id = ?");
my $result = $stateh -> execute($status, $message, $messageid, $transportid);
return $self -> self_error("Unable to perform message status update: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Message status update failed, no rows updated") if($result eq "0E0");
return 1;
## @method $ mark_recipient_read($messageid, $recipientid)
# Mark the specified message as read by the recipient. This allows local transports or
# other services that may be able to detect when a user opens a message to record the
# user having read the message.
# @param messageid The ID of the message the recipient is reading.
# @param recipientid The ID of the recipient reading the message.
# @return true on success, undef on error.
sub mark_recipient_read {
my $self = shift;
my $messageid = shift;
my $recipientid = shift;
$self -> clear_error();
$self -> {"logger"} -> log("messaging", $recipientid, undef, "Marking $messageid as read");
my $stateh = $self -> {"dbh"} -> prepare("UPDATE `".$self -> {"settings"} -> {"database"} -> {"message_recipients"}."`
WHERE message_id = ?
AND recipient_id = ?");
my $result = $stateh -> execute($messageid, $recipientid);
return $self -> self_error("Unable to perform recipient view update: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Message recipient view failed, no rows updated") if($result eq "0E0");
return 1;
@ -271,6 +432,26 @@ sub _queue_message {
my $msgid = $self -> {"dbh"} -> {"mysql_insertid"}
or return $self -> self_error("Unable to obtain new message id");
# Add the sender data if possible
if($args -> {"userid"}) {
$newh = $self -> {"dbh"} -> prepare("INSERT INTO `".$self -> {"settings"} -> {"database"} -> {"message_sender"}."`
(message_id, sender_id) VALUES(?, ?)");
$result = $newh -> execute($msgid, $args -> {"userid"});
return $self -> self_error("Unable to perform message sender insert: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Message sender insert failed, no rows inserted") if($result eq "0E0");
# Now add transport status entries for each available transport
$newh = $self -> {"dbh"} -> prepare("INSERT INTO `".$self -> {"settings"} -> {"database"} -> {"message_status"}."`
(message_id, transport_id, status_time) VALUES(?, ?, ?)");
my $transports = $self -> get_transports();
foreach my $transport (@{$transports}) {
$result = $newh -> execute($msgid, $transport -> {"id"}, $args -> {"now"});
return $self -> self_error("Unable to perform message transport status insert: ". $self -> {"dbh"} -> errstr) if(!$result);
return $self -> self_error("Message transport status insert failed, no rows inserted") if($result eq "0E0");
return $msgid;
@ -303,7 +484,8 @@ sub _add_recipient {
## @method private $ _delete_by_field($field, $value, $userid, $deleted)
# Attempt to delete messages where the specified field contains the value given.
# Note that this *does not* remove the message from the table, it simply marks
# it as deleted so that get_message() will not normally return it.
# it as deleted so that get_message() will not normally return it. This will not
# delete messages that have been sent or viewed.
# @param field The database table field to search for messages on.
# @param value When a given message has this value in the specified field, it is
@ -323,15 +505,40 @@ sub _delete_by_field {
# Force valid field
$field = "id" unless($field && ($field eq "id" || $field eq "message_ident"));
$self -> {"logger"} -> log("messaging", $userid || 0, undef, "Attempting delete of message(s) with $field = '$value'");
# Check that the message has no views
my $viewh = $self -> {"dbh"} -> prepare("SELECT COUNT(*)
FROM `".$self -> {"settings"} -> {"database"} -> {"message_queue"}."` AS q,
`".$self -> {"settings"} -> {"database"} -> {"message_recipients"}."` AS r
WHERE q.$field = ?
AND r.message_id =
AND r.viewed IS NOT NULL");
$viewh -> execute($value)
or return $self -> self_error("Unable to perform message view check: ". $self -> {"dbh"} -> errstr);
my $views = $viewh -> fetchrow_arrayref()
or return $self -> self_error("Unable to obtain message view count. This should not happen!");
# If the view count is non-zero, the message or message group can not be deleted.
return 0 if($views -> [0]);
$self -> {"logger"} -> log("messaging", $userid || 0, undef, "Delete of message(s) with $field = '$value' passed view check");
# Otherwise, nobody is marked as having seen the message, if it hasn't been sent, mark it as deleted
my $nukeh = $self -> {"dbh"} -> prepare("UPDATE `".$self -> {"settings"} -> {"database"} -> {"message_queue"}."`
SET deleted = ?, deleted_id = ?
WHERE $field = ?
AND status != 'sent'
AND deleted IS NULL");
my $result = $nukeh -> execute($deleted, $userid, $value);
return $self -> self_error("Unable to perform message delete: ". $self -> {"dbh"} -> errstr) if(!$result);
# Result should contain the number of rows updated.
return 0 if($result eq "0E0"); # need a special case for the zero rows, just in case...
$self -> {"logger"} -> log("messaging", $userid || 0, undef, "Delete of message(s) with $field = '$value' marked $result messages");
return $result;
@ -419,25 +626,51 @@ sub _get_by_fields {
FROM `".$self -> {"settings"} -> {"database"} -> {"message_queue"}."`
$where".($permit_deleted ? "" : " AND deleted IS NULL"));
my $reciph = $self -> {"dbh"} -> prepare("SELECT recipient_id
my $sendh = $self -> {"dbh"} -> prepare("SELECT *
FROM `".$self -> {"settings"} -> {"database"} -> {"message_sender"}."`
WHERE message_id = ?");
my $reciph = $self -> {"dbh"} -> prepare("SELECT *
FROM `".$self -> {"settings"} -> {"database"} -> {"message_recipients"}."`
WHERE message_id = ?");
my $statush = $self -> {"dbh"} -> prepare("SELECT *
FROM `".$self -> {"settings"} -> {"database"} -> {"message_status"}."`
WHERE message_id = ?");
$fetch -> execute(@fetch_bind)
or return $self -> self_error("Unable to perform message lookup: ". $self -> {"dbh"} -> errstr);
while(my $message = $fetch -> fetchrow_hashref()) {
# Fetch the message sender info
$sendh -> execute($message -> {"id"})
or return $self -> self_error("Unable to perform message ".$message -> {"id"}." sender lookup: ". $self -> {"dbh"} -> errstr);
# This fetch may return undef, which is fine - sender is not a required field
$message -> {"sender"} = $sendh -> fetchrow_hashref();
# Fetch the message recipients, and store the list in the message
$reciph -> execute($message -> {"id"})
or return $self -> self_error("Unable to perform message ".$message -> {"id"}." recipient lookup: ". $self -> {"dbh"} -> errstr);
$message -> {"recipients"} = [];
while(my $recipient = $reciph -> fetchrow_arrayref()) {
push(@{$message -> {"recipients"}}, $recipient -> [0]);
while(my $recipient = $reciph -> fetchrow_hashref()) {
push(@{$message -> {"recipients"}}, $recipient);
# Messages must have at least one recipient, or they are broken
return $self -> self_error("Message ".$message -> {"id"}." has no recorded recipients. This should not happen.")
if(!scalar(@{$message -> {"recipients"}}));
# fetch and store the status values for each transport
$statush -> execute($message -> {"id"})
or return $self -> self_error("Unable to perform message ".$message -> {"id"}." sender lookup: ". $self -> {"dbh"} -> errstr);
$message -> {"status"} = {};
while(my $status = $statush -> fetchrow_hashref()) {
$message -> {"status"} -> {$status -> {"transport_id"}} = $status
push(@{$results}, $message);