public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware
| Modifier and Type | Class and Description |
|---|---|
protected class |
MessageDatabase.LastAckMarshaller |
protected class |
MessageDatabase.MessageKeysMarshaller |
protected class |
MessageDatabase.Metadata |
protected class |
MessageDatabase.StoredDestinationMarshaller |
| Constructor and Description |
|---|
MessageDatabase() |
| Modifier and Type | Method and Description |
|---|---|
protected void |
checkpointCleanup(boolean cleanup) |
protected void |
clearStoreStats(KahaDestination kahaDestination)
Clear the counter for the destination, if one exists.
|
void |
close() |
protected abstract void |
configureMetadata() |
protected void |
decrementAndSubSizeToStoreStat(KahaDestination kahaDestination,
long size) |
protected void |
decrementAndSubSizeToStoreStat(KahaDestination kahaDestination,
String subKey,
long size) |
protected void |
decrementAndSubSizeToStoreStat(String kahaDestKey,
long size) |
protected void |
decrementAndSubSizeToStoreStat(String kahaDestKey,
String subKey,
long size) |
void |
doStart() |
void |
doStop(ServiceStopper stopper) |
void |
forgetRecoveredAcks(ArrayList<org.apache.activemq.command.MessageAck> acks,
boolean rollback) |
long |
getCheckpointInterval() |
long |
getCleanupInterval() |
int |
getCompactAcksAfterNoGC() |
File |
getDirectory() |
File |
getDirectoryArchive() |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getExistingStoredDestination(KahaDestination destination,
Transaction tx) |
int |
getFailoverProducersAuditDepth() |
int |
getIndexCacheSize() |
File |
getIndexDirectory() |
float |
getIndexLFUEvictionFactor() |
int |
getIndexWriteBatchSize() |
Location[] |
getInProgressTxLocationRange() |
Journal |
getJournal() |
long |
getJournalDiskSyncInterval() |
String |
getJournalDiskSyncStrategy() |
HashSet<Integer> |
getJournalFilesBeingReplicated() |
int |
getJournalMaxFileLength() |
int |
getJournalMaxWriteBatchSize() |
org.apache.activemq.store.kahadb.MessageDatabase.LastAck |
getLastAck(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
Location |
getLastUpdatePosition() |
int |
getMaxFailoverProducersToTrack() |
protected MessageDatabase.Metadata |
getMetadata() |
PageFile |
getPageFile() |
String |
getPreallocationScope() |
String |
getPreallocationStrategy() |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getStoredDestination(KahaDestination destination,
Transaction tx) |
long |
getStoredMessageCount(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
long |
getStoredMessageSize(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
protected MessageStoreStatistics |
getStoreStats(String kahaDestKey)
Locate the storeMessageSize counter for this KahaDestination
|
protected MessageStoreSubscriptionStatistics |
getSubStats(String kahaDestKey) |
String |
getTransactions() |
void |
incrementalRecover() |
protected void |
incrementAndAddSizeToStoreStat(KahaDestination kahaDestination,
long size)
Update MessageStoreStatistics
|
protected void |
incrementAndAddSizeToStoreStat(KahaDestination kahaDestination,
String subKey,
long size) |
protected void |
incrementAndAddSizeToStoreStat(String kahaDestKey,
long size) |
protected void |
incrementAndAddSizeToStoreStat(String kahaDestKey,
String subKey,
long size) |
boolean |
isArchiveCorruptedIndex() |
boolean |
isArchiveDataLogs() |
boolean |
isCheckForCorruptJournalFiles() |
boolean |
isChecksumJournalFiles() |
boolean |
isCompactAcksIgnoresStoreGrowth()
Returns whether Ack compaction will ignore that the store is still growing
and run more often.
|
boolean |
isDeleteAllMessages() |
boolean |
isEnableAckCompaction()
Returns whether Ack compaction is enabled
|
boolean |
isEnableIndexDiskSyncs() |
boolean |
isEnableIndexPageCaching() |
boolean |
isEnableIndexRecoveryFile() |
boolean |
isEnableJournalDiskSyncs()
Deprecated.
use
getJournalDiskSyncStrategy() instead |
boolean |
isEnableSubscriptionStatistics() |
boolean |
isFailIfDatabaseIsLocked() |
boolean |
isIgnoreMissingJournalfiles() |
boolean |
isUseIndexLFRUEviction() |
protected String |
key(KahaDestination destination) |
void |
load() |
JournalCommand<?> |
load(Location location)
Loads a previously stored JournalMessage
|
protected boolean |
matchType(Destination destination,
KahaDestination.DestinationType type)
Determine whether this Destination matches the DestinationType
|
void |
open() |
protected void |
process(KahaAddMessageCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) |
protected void |
process(KahaCommitCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) |
protected void |
process(KahaPrepareCommand command,
Location location) |
protected void |
process(KahaRemoveDestinationCommand command,
Location location) |
protected void |
process(KahaRemoveMessageCommand command,
Location location) |
protected void |
process(KahaRewrittenDataFileCommand command,
Location location) |
protected void |
process(KahaRollbackCommand command,
Location location) |
protected void |
process(KahaSubscriptionCommand command,
Location location) |
protected void |
process(KahaUpdateMessageCommand command,
Location location) |
protected void |
processLocation(Location location) |
protected void |
recoverIndex(Transaction tx) |
void |
setArchiveCorruptedIndex(boolean archiveCorruptedIndex) |
void |
setArchiveDataLogs(boolean archiveDataLogs) |
void |
setBrokerService(BrokerService brokerService) |
void |
setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) |
void |
setCheckpointInterval(long checkpointInterval) |
void |
setChecksumJournalFiles(boolean checksumJournalFiles) |
void |
setCleanupInterval(long cleanupInterval) |
void |
setCompactAcksAfterNoGC(int compactAcksAfterNoGC)
Sets the number of GC cycles where no journal logs were removed before an attempt to
move forward all the acks in the last log that contains them and is otherwise unreferenced.
|
void |
setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth)
Configure if Ack compaction will occur regardless of continued growth of the
journal logs meaning that the store has not run out of space yet.
|
void |
setDeleteAllMessages(boolean deleteAllMessages) |
void |
setDirectory(File directory) |
void |
setDirectoryArchive(File directoryArchive) |
void |
setEnableAckCompaction(boolean enableAckCompaction)
Configure if the Ack compaction task should be enabled to run
|
void |
setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) |
void |
setEnableIndexPageCaching(boolean enableIndexPageCaching) |
void |
setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) |
void |
setEnableIndexWriteAsync(boolean enableIndexWriteAsync) |
void |
setEnableJournalDiskSyncs(boolean syncWrites)
Deprecated.
use
setEnableJournalDiskSyncs(boolean) instead |
void |
setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics)
Enable caching statistics for each subscription to allow non-blocking
retrieval of metrics.
|
void |
setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) |
void |
setFailoverProducersAuditDepth(int failoverProducersAuditDepth) |
void |
setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) |
void |
setIndexCacheSize(int indexCacheSize) |
void |
setIndexDirectory(File indexDirectory) |
void |
setIndexLFUEvictionFactor(float indexLFUEvictionFactor) |
void |
setIndexWriteBatchSize(int setIndexWriteBatchSize) |
void |
setJournalDiskSyncInterval(long journalDiskSyncInterval) |
void |
setJournalDiskSyncStrategy(String journalDiskSyncStrategy) |
void |
setJournalMaxFileLength(int journalMaxFileLength) |
void |
setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) |
void |
setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) |
void |
setPreallocationScope(String preallocationScope) |
void |
setPreallocationStrategy(String preallocationStrategy) |
void |
setUseIndexLFRUEviction(boolean useIndexLFRUEviction) |
Location |
store(JournalCommand<?> data) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after,
Runnable onJournalStoreComplete)
All updated are are funneled through this method.
|
Location |
store(JournalCommand<?> data,
Runnable onJournalStoreComplete) |
ByteSequence |
toByteSequence(JournalCommand<?> data) |
void |
trackRecoveredAcks(ArrayList<org.apache.activemq.command.MessageAck> acks) |
void |
unload() |
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stopprotected BrokerService brokerService
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
public static final int LOG_SLOW_ACCESS_TIME
public static final File DEFAULT_DIRECTORY
protected static final org.apache.activemq.protobuf.Buffer UNMATCHED
protected MessageDatabase.Metadata metadata
protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
protected boolean failIfDatabaseIsLocked
protected boolean deleteAllMessages
protected File indexDirectory
protected ScheduledExecutorService scheduler
protected String journalDiskSyncStrategy
protected boolean archiveDataLogs
protected File directoryArchive
protected AtomicLong journalSize
protected AtomicBoolean opened
protected boolean forceRecoverIndex
protected final AtomicReference<Location> lastAsyncJournalUpdate
protected final ReentrantReadWriteLock indexLock
protected final HashMap<String,org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination> storedDestinations
protected final ConcurrentMap<String,MessageStore> storeCache
protected final LinkedHashMap<org.apache.activemq.command.TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
protected final Set<String> ackedAndPrepared
protected final Set<String> rolledBackAcks
public MessageDatabase()
public void doStart() throws Exception
doStart in class ServiceSupportExceptionpublic void doStop(ServiceStopper stopper) throws Exception
doStop in class ServiceSupportExceptionpublic void open() throws IOException
IOExceptionpublic void load() throws IOException
IOExceptionpublic void close() throws IOException, InterruptedException
IOExceptionInterruptedExceptionpublic void unload() throws IOException, InterruptedException
IOExceptionInterruptedExceptionpublic Location[] getInProgressTxLocationRange()
public String getTransactions()
protected void recoverIndex(Transaction tx) throws IOException
IOExceptionpublic void incrementalRecover() throws IOException
IOExceptionpublic Location getLastUpdatePosition() throws IOException
IOExceptionprotected void checkpointCleanup(boolean cleanup) throws IOException
IOExceptionpublic ByteSequence toByteSequence(JournalCommand<?> data) throws IOException
IOExceptionpublic Location store(JournalCommand<?> data) throws IOException
IOExceptionpublic Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException
IOExceptionpublic Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after) throws IOException
IOExceptionpublic Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException
IOExceptionpublic JournalCommand<?> load(Location location) throws IOException
location - IOExceptionprotected void process(KahaAddMessageCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) throws IOException
IOExceptionprotected void process(KahaUpdateMessageCommand command, Location location) throws IOException
IOExceptionprotected void process(KahaRemoveMessageCommand command, Location location) throws IOException
IOExceptionprotected void process(KahaRemoveDestinationCommand command, Location location) throws IOException
IOExceptionprotected void process(KahaSubscriptionCommand command, Location location) throws IOException
IOExceptionprotected void processLocation(Location location)
protected void process(KahaCommitCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) throws IOException
IOExceptionprotected void process(KahaPrepareCommand command, Location location)
protected void process(KahaRollbackCommand command, Location location) throws IOException
IOExceptionprotected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException
IOExceptionpublic HashSet<Integer> getJournalFilesBeingReplicated()
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOExceptionprotected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOExceptionprotected void clearStoreStats(KahaDestination kahaDestination)
kahaDestination - protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size)
kahaDestination - size - protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size)
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size)
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size)
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size)
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size)
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size)
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size)
protected MessageStoreStatistics getStoreStats(String kahaDestKey)
kahaDestination - protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey)
protected boolean matchType(Destination destination, KahaDestination.DestinationType type)
destination - type - public org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOExceptionpublic long getStoredMessageCount(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOExceptionpublic long getStoredMessageSize(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOExceptionprotected String key(KahaDestination destination)
public void trackRecoveredAcks(ArrayList<org.apache.activemq.command.MessageAck> acks)
public void forgetRecoveredAcks(ArrayList<org.apache.activemq.command.MessageAck> acks, boolean rollback) throws IOException
IOExceptionprotected abstract void configureMetadata()
public int getJournalMaxWriteBatchSize()
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
public File getDirectory()
public void setDirectory(File directory)
public boolean isDeleteAllMessages()
public void setDeleteAllMessages(boolean deleteAllMessages)
public void setIndexWriteBatchSize(int setIndexWriteBatchSize)
public int getIndexWriteBatchSize()
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
public boolean isEnableJournalDiskSyncs()
getJournalDiskSyncStrategy() insteadpublic void setEnableJournalDiskSyncs(boolean syncWrites)
setEnableJournalDiskSyncs(boolean) insteadsyncWrites - public String getJournalDiskSyncStrategy()
public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy)
public long getJournalDiskSyncInterval()
public void setJournalDiskSyncInterval(long journalDiskSyncInterval)
public long getCheckpointInterval()
public void setCheckpointInterval(long checkpointInterval)
public long getCleanupInterval()
public void setCleanupInterval(long cleanupInterval)
public void setJournalMaxFileLength(int journalMaxFileLength)
public int getJournalMaxFileLength()
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
public int getMaxFailoverProducersToTrack()
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
public int getFailoverProducersAuditDepth()
public PageFile getPageFile() throws IOException
IOExceptionpublic Journal getJournal() throws IOException
IOExceptionprotected MessageDatabase.Metadata getMetadata()
public boolean isFailIfDatabaseIsLocked()
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
public boolean isIgnoreMissingJournalfiles()
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
public int getIndexCacheSize()
public void setIndexCacheSize(int indexCacheSize)
public boolean isCheckForCorruptJournalFiles()
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
public boolean isChecksumJournalFiles()
public void setChecksumJournalFiles(boolean checksumJournalFiles)
public void setBrokerService(BrokerService brokerService)
setBrokerService in interface BrokerServiceAwarepublic boolean isArchiveDataLogs()
public void setArchiveDataLogs(boolean archiveDataLogs)
archiveDataLogs - the archiveDataLogs to setpublic File getDirectoryArchive()
public void setDirectoryArchive(File directoryArchive)
directoryArchive - the directoryArchive to setpublic boolean isArchiveCorruptedIndex()
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
public float getIndexLFUEvictionFactor()
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
public boolean isUseIndexLFRUEviction()
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
public void setEnableIndexPageCaching(boolean enableIndexPageCaching)
public boolean isEnableIndexDiskSyncs()
public boolean isEnableIndexRecoveryFile()
public boolean isEnableIndexPageCaching()
public File getIndexDirectory()
public void setIndexDirectory(File indexDirectory)
public String getPreallocationScope()
public void setPreallocationScope(String preallocationScope)
public String getPreallocationStrategy()
public void setPreallocationStrategy(String preallocationStrategy)
public int getCompactAcksAfterNoGC()
public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC)
A value of -1 will disable this feature.
compactAcksAfterNoGC - Number of empty GC cycles before we rewrite old ACKS.public boolean isCompactAcksIgnoresStoreGrowth()
public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth)
compactAcksIgnoresStoreGrowth - the compactAcksIgnoresStoreGrowth to setpublic boolean isEnableAckCompaction()
public void setEnableAckCompaction(boolean enableAckCompaction)
enableAckCompaction - public boolean isEnableSubscriptionStatistics()
public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics)
enableSubscriptionStatistics - Copyright © 2005–2016 The Apache Software Foundation. All rights reserved.