Interface RReliableQueue<V>

All Superinterfaces:
RDestroyable, RExpirable, RExpirableAsync, RObject, RObjectAsync, RReliableQueueAsync<V>

public interface RReliableQueue<V> extends RExpirable, RReliableQueueAsync<V>, RDestroyable
Reliable queue implementation based on Stream object.

Unlike regular Valkey or Redis based queues, this implementation provides features like:

  • Message acknowledgment to confirm successful processing
  • Message negative acknowledgment to redeliver a message or delete it if DLQ is not defined
  • Redundancy and synchronous replication
  • Deduplication by id or hash within a defined time interval
  • Bulk operations
  • Configurable queue size limit
  • Configurable message size limit
  • Configurable message expiration timeout
  • Configurable message visibility timeout
  • Configurable message priority
  • Configurable message delay
  • Configurable message delivery limit
  • Automatic redelivery of unacknowledged messages
  • Dead letter queue support for failed message handling
Author:
Nikita Koksharov
  • Method Details

    • setConfig

      void setConfig(QueueConfig config)
      Sets the configuration for this reliable queue.
      Parameters:
      config - the queue configuration to apply
    • setConfigIfAbsent

      boolean setConfigIfAbsent(QueueConfig config)
      Attempts to set the configuration for this reliable queue.

      This method only applies the configuration if no configuration has been set previously.

      Parameters:
      config - the queue configuration to apply
      Returns:
      true if the configuration was successfully applied, false if a configuration already exists
    • size

      int size()
      Returns the total number of messages in the queue ready for polling, excluding delayed and unacknowledged messages.
      Returns:
      the total number of messages
    • countDelayedMessages

      int countDelayedMessages()
      Returns the number of delayed messages in the queue.

      Delayed messages are those scheduled for future delivery and not yet available for consumption.

      Returns:
      the number of delayed messages
    • countUnacknowledgedMessages

      int countUnacknowledgedMessages()
      Returns the number of unacknowledged messages in the queue.

      Unacknowledged messages are those that have been delivered to consumers but not yet acknowledged as successfully processed.

      Returns:
      the number of unacknowledged messages
    • isEmpty

      boolean isEmpty()
      Checks if the queue is empty.

      A queue is considered empty when it contains no messages in any state (ready, delayed, or unacknowledged).

      Returns:
      true if the queue is empty, false otherwise
    • clear

      boolean clear()
      Removes all messages from the queue.

      This operation clears messages in all states (ready, delayed, and unacknowledged).

      Returns:
      true if the queue existed and has been cleared, otherwise false
    • poll

      Message<V> poll()
      Retrieves and removes the head of this queue, or returns null if this queue is empty.

      The retrieved message remains unacknowledged until explicitly acknowledged using the acknowledge(QueueAckArgs) or negativeAcknowledge(QueueNegativeAckArgs) method.

      Returns:
      the message in the head of this queue, or null if this queue is empty
    • poll

      Message<V> poll(QueuePollArgs args)
      Retrieves and removes the head of this queue with the specified polling arguments.

      The retrieved message remains unacknowledged until explicitly acknowledged using the acknowledge(QueueAckArgs) or negativeAcknowledge(QueueNegativeAckArgs) method.

      Parameters:
      args - polling arguments
      Returns:
      the message in the head of this queue, or null if this queue is empty
    • pollMany

      List<Message<V>> pollMany(QueuePollArgs pargs)
      Retrieves and removes multiple messages from the queue with the specified polling arguments.

      This batch operation is more efficient than polling messages individually.

      The retrieved messages remain unacknowledged until explicitly acknowledged using the acknowledge(QueueAckArgs) or negativeAcknowledge(QueueNegativeAckArgs) method.

      Parameters:
      pargs - polling arguments
      Returns:
      a list of retrieved messages
    • acknowledge

      void acknowledge(QueueAckArgs args)
      Acknowledges the successful processing of a message.

      Once acknowledged, a message is permanently removed from the queue and will not be redelivered.

      Parameters:
      args - acknowledgment arguments
    • contains

      boolean contains(String id)
      Checks if the queue contains a message with the specified ID.
      Parameters:
      id - the message ID to check
      Returns:
      true if a message with the specified ID exists in the queue, false otherwise
    • containsMany

      int containsMany(String... ids)
      Checks if the queue contains messages with the specified IDs.
      Parameters:
      ids - the message IDs to check
      Returns:
      the number of matching messages found in the queue
    • remove

      boolean remove(QueueRemoveArgs args)
      Removes a specific message from the queue.

      This operation can remove messages in any state (ready, delayed, or unacknowledged).

      Parameters:
      args - removal arguments
      Returns:
      true if the message was successfully removed, false if the message was not found
    • removeMany

      int removeMany(QueueRemoveArgs args)
      Removes multiple messages from the queue in a single operation.
      Parameters:
      args - removal arguments
      Returns:
      the number of messages successfully removed
    • move

      int move(QueueMoveArgs args)
      Moves messages between queues.
      Parameters:
      args - move arguments
      Returns:
      the number of messages successfully moved
    • add

      Message<V> add(QueueAddArgs<V> params)
      Adds a message to the queue with the specified parameters.

      Returns null if the message hasn't been added for one of the following reasons:

      • Due to message deduplication by id or hash
      • Due to configured queue size limit and queue is full
      Parameters:
      params - parameters for the message to be added
      Returns:
      the added message with its assigned ID and metadata or null if timeout defined and no space becomes available in full queue.
    • addMany

      List<Message<V>> addMany(QueueAddArgs<V> params)
      Adds multiple messages to the queue in a single operation.

      This batch operation is more efficient than adding messages individually.

      Messages may not be added for one of the following reasons:

      • Due to message deduplication by id or hash
      • Due to configured queue size limit and queue is full
      Parameters:
      params - parameters for the messages to be added
      Returns:
      a list of added messages with their assigned IDs and metadata or empty list if timeout defined and no space becomes available in full queue.
    • getDeadLetterQueueSources

      Set<String> getDeadLetterQueueSources()
      Returns the names of source queues which uses this reliable queue as dead letter queue.

      This only applies if this queue is configured as a dead letter queue in the source queue configurations.

      Returns:
      a set of source queue names
    • listAll

      List<Message<V>> listAll()
      Returns all messages in the queue, ready to be retrieved by the poll() command, without removing them.

      This operation is useful for inspection and debugging purposes.

      Returns:
      a list of all messages in the queue
    • listAll

      List<Message<V>> listAll(Codec headersCodec)
      Returns all messages in the queue, ready to be retrieved by the poll() command, using the specified codec for message header values.
      Parameters:
      headersCodec - the codec to use for deserializing message header values
      Returns:
      a list of all messages in the queue
    • get

      Message<V> get(String id)
      Returns message by id
      Parameters:
      id - message id
      Returns:
      message
    • get

      Message<V> get(Codec headersCodec, String id)
      Returns message by id applying specified codec to headers
      Parameters:
      headersCodec - codec for headers
      id - message id
      Returns:
      message
    • getAll

      List<Message<V>> getAll(String... ids)
      Returns messages by ids
      Parameters:
      ids - message ids
      Returns:
      message
    • getAll

      List<Message<V>> getAll(Codec headersCodec, String... ids)
      Returns messages by ids applying specified codec to headers
      Parameters:
      headersCodec - codec for headers
      ids - message ids
      Returns:
      message
    • negativeAcknowledge

      void negativeAcknowledge(QueueNegativeAckArgs args)
      Explicitly marks a message as failed or rejected.
      Parameters:
      args - arguments specifying the message to negatively acknowledge
    • addListener

      String addListener(QueueEventListener listener)
      Adds queue listener
      Parameters:
      listener - entry listener
      Returns:
      listener id
      See Also:
    • removeListener

      void removeListener(String id)
      Removes map entry listener
      Parameters:
      id - listener id
    • disableOperation

      void disableOperation(QueueOperation operation)
      Disables a queue operation
      Parameters:
      operation - queue operation
    • enableOperation

      void enableOperation(QueueOperation operation)
      Enables a queue operation
      Parameters:
      operation - queue operation