package com.hivemq.persistence.local.xodus;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.exceptions.UnrecoverableException;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extensions.iteration.BucketChunkResult;
import com.hivemq.migration.meta.PersistenceType;
import com.hivemq.persistence.PersistenceStartup;
import com.hivemq.persistence.RetainedMessage;
import com.hivemq.persistence.local.xodus.bucket.Bucket;
import com.hivemq.persistence.payload.PublishPayloadPersistence;
import com.hivemq.persistence.retained.RetainedMessageLocalPersistence;
import com.hivemq.util.LocalPersistenceFileUtil;
import com.hivemq.util.PublishUtil;
import com.hivemq.util.ThreadPreConditions;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import jetbrains.exodus.ByteIterable;
import jetbrains.exodus.ExodusException;
import jetbrains.exodus.env.Cursor;
import jetbrains.exodus.env.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
/* loaded from: input_file:com/hivemq/persistence/local/xodus/RetainedMessageXodusLocalPersistence.class */
public class RetainedMessageXodusLocalPersistence extends XodusLocalPersistence implements RetainedMessageLocalPersistence {
    private static final Logger log = LoggerFactory.getLogger(RetainedMessageXodusLocalPersistence.class);
    public static final String PERSISTENCE_VERSION = "040500";

    @NotNull
    private final PublishPayloadPersistence payloadPersistence;

    @NotNull
    private final RetainedMessageXodusSerializer serializer;
    private final AtomicLong retainMessageCounter;

    @VisibleForTesting
    final ConcurrentHashMap<Integer, PublishTopicTree> topicTrees;

    @Inject
    public RetainedMessageXodusLocalPersistence(@NotNull LocalPersistenceFileUtil localPersistenceFileUtil, @NotNull PublishPayloadPersistence publishPayloadPersistence, @NotNull EnvironmentUtil environmentUtil, @NotNull PersistenceStartup persistenceStartup) {
        super(environmentUtil, localPersistenceFileUtil, persistenceStartup, InternalConfigurations.PERSISTENCE_BUCKET_COUNT.get(), InternalConfigurations.RETAINED_MESSAGE_PERSISTENCE_TYPE.get().equals(PersistenceType.FILE));
        this.retainMessageCounter = new AtomicLong(0L);
        this.topicTrees = new ConcurrentHashMap<>();
        this.payloadPersistence = publishPayloadPersistence;
        this.serializer = new RetainedMessageXodusSerializer();
        for (int i = 0; i < this.bucketCount; i++) {
            this.topicTrees.put(Integer.valueOf(i), new PublishTopicTree());
        }
    }

    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    @NotNull
    protected String getName() {
        return "retained_messages";
    }

    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    @NotNull
    protected String getVersion() {
        return "040500";
    }

    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    @NotNull
    protected StoreConfig getStoreConfig() {
        return StoreConfig.WITHOUT_DUPLICATES;
    }

    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    @NotNull
    protected Logger getLogger() {
        return log;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    @PostConstruct
    public void postConstruct() {
        super.postConstruct();
    }

    @Override // com.hivemq.persistence.local.xodus.XodusLocalPersistence
    public void init() {
        for (int i = 0; i < this.buckets.length; i++) {
            try {
                Bucket bucket = this.buckets[i];
                int i2 = i;
                bucket.getEnvironment().executeInReadonlyTransaction(transaction -> {
                    Cursor openCursor = bucket.getStore().openCursor(transaction);
                    while (openCursor.getNext()) {
                        try {
                            this.payloadPersistence.incrementReferenceCounterOnBootstrap(this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue())).getPublishId());
                            this.topicTrees.get(Integer.valueOf(i2)).add(this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(openCursor.getKey())));
                            this.retainMessageCounter.incrementAndGet();
                        } catch (Throwable th) {
                            if (openCursor != null) {
                                try {
                                    openCursor.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (openCursor != null) {
                        openCursor.close();
                    }
                });
            } catch (ExodusException e) {
                log.error("An error occurred while preparing the Retained Message persistence.");
                log.debug("Original Exception:", e);
                throw new UnrecoverableException(false);
            }
        }
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void bootstrapPayloads() {
        try {
            for (Bucket bucket : this.buckets) {
                bucket.getEnvironment().executeInReadonlyTransaction(transaction -> {
                    Cursor openCursor = bucket.getStore().openCursor(transaction);
                    while (openCursor.getNext()) {
                        try {
                            this.payloadPersistence.incrementReferenceCounterOnBootstrap(this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue())).getPublishId());
                        } catch (Throwable th) {
                            if (openCursor != null) {
                                try {
                                    openCursor.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (openCursor != null) {
                        openCursor.close();
                    }
                });
            }
        } catch (ExodusException e) {
            log.error("An error occurred while preparing the Retained Message persistence.");
            log.debug("Original Exception:", e);
            throw new UnrecoverableException(false);
        }
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void clear(int i) {
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        this.topicTrees.put(Integer.valueOf(i), new PublishTopicTree());
        Bucket bucket = this.buckets[i];
        bucket.getEnvironment().executeInExclusiveTransaction(transaction -> {
            Cursor openCursor = bucket.getStore().openCursor(transaction);
            while (openCursor.getNext()) {
                try {
                    this.payloadPersistence.decrementReferenceCounter(this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue())).getPublishId());
                    this.retainMessageCounter.decrementAndGet();
                    openCursor.deleteCurrent();
                } catch (Throwable th) {
                    if (openCursor != null) {
                        try {
                            openCursor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (openCursor != null) {
                openCursor.close();
            }
        });
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public long size() {
        return this.retainMessageCounter.get();
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void remove(@NotNull String str, int i) {
        Preconditions.checkNotNull(str, "Topic must not be null");
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        Bucket bucket = this.buckets[i];
        bucket.getEnvironment().executeInExclusiveTransaction(transaction -> {
            ByteIterable stringToByteIterable = XodusUtils.stringToByteIterable(str);
            ByteIterable byteIterable = bucket.getStore().get(transaction, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str)));
            if (byteIterable == null) {
                log.trace("Removing retained message for topic {} (no message was stored previously)", str);
                return;
            }
            RetainedMessage deserializeValue = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(byteIterable));
            log.trace("Removing retained message for topic {}", str);
            bucket.getStore().delete(transaction, stringToByteIterable);
            this.topicTrees.get(Integer.valueOf(i)).remove(str);
            this.payloadPersistence.decrementReferenceCounter(deserializeValue.getPublishId());
            this.retainMessageCounter.decrementAndGet();
        });
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    @Nullable
    public RetainedMessage get(@NotNull String str, int i) {
        return tryGetLocally(str, 0, i);
    }

    private RetainedMessage tryGetLocally(@NotNull String str, int i, int i2) {
        Preconditions.checkNotNull(str, "Topic must not be null");
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        Bucket bucket = this.buckets[i2];
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        RetainedMessage retainedMessage = (RetainedMessage) bucket.getEnvironment().computeInReadonlyTransaction(transaction -> {
            ByteIterable byteIterable = bucket.getStore().get(transaction, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str)));
            if (byteIterable == null) {
                return null;
            }
            RetainedMessage deserializeValue = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(byteIterable));
            byte[] payloadOrNull = this.payloadPersistence.getPayloadOrNull(deserializeValue.getPublishId());
            if (payloadOrNull == null) {
                atomicBoolean.set(true);
                return null;
            }
            if (PublishUtil.checkExpiry(deserializeValue.getTimestamp(), deserializeValue.getMessageExpiryInterval())) {
                return null;
            }
            deserializeValue.setMessage(payloadOrNull);
            return deserializeValue;
        });
        if (!atomicBoolean.get()) {
            return retainedMessage;
        }
        if (i < 100) {
            return tryGetLocally(str, i + 1, i2);
        }
        log.warn("No payload was found for the retained message on topic {}.", str);
        return null;
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void put(@NotNull RetainedMessage retainedMessage, @NotNull String str, int i) {
        Preconditions.checkNotNull(str, "Topic must not be null");
        Preconditions.checkNotNull(retainedMessage, "Retained message must not be null");
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        Bucket bucket = this.buckets[i];
        bucket.getEnvironment().executeInExclusiveTransaction(transaction -> {
            Cursor openCursor = bucket.getStore().openCursor(transaction);
            try {
                if (openCursor.getSearchKey(XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str))) != null) {
                    RetainedMessage deserializeValue = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue()));
                    log.trace("Replacing retained message for topic {}", str);
                    bucket.getStore().put(transaction, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str)), XodusUtils.bytesToByteIterable(this.serializer.serializeValue(retainedMessage)));
                    this.payloadPersistence.decrementReferenceCounter(deserializeValue.getPublishId());
                } else {
                    bucket.getStore().put(transaction, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str)), XodusUtils.bytesToByteIterable(this.serializer.serializeValue(retainedMessage)));
                    log.trace("Creating new retained message for topic {}", str);
                    this.retainMessageCounter.incrementAndGet();
                    this.topicTrees.get(Integer.valueOf(i)).add(str);
                }
                if (openCursor != null) {
                    openCursor.close();
                }
            } catch (Throwable th) {
                if (openCursor != null) {
                    try {
                        openCursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    @NotNull
    public Set<String> getAllTopics(@NotNull String str, int i) {
        Preconditions.checkArgument(i >= 0 && i < this.bucketCount, "Bucket index out of range");
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        return this.topicTrees.get(Integer.valueOf(i)).get(str);
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void cleanUp(int i) {
        Preconditions.checkArgument(i >= 0 && i < this.bucketCount, "Bucket index out of range");
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        if (this.stopped.get()) {
            return;
        }
        Bucket bucket = this.buckets[i];
        bucket.getEnvironment().executeInExclusiveTransaction(transaction -> {
            Cursor openCursor = bucket.getStore().openCursor(transaction);
            try {
            } catch (Throwable th) {
                if (openCursor != null) {
                    try {
                        openCursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    @NotNull
    public BucketChunkResult<Map<String, RetainedMessage>> getAllRetainedMessagesChunk(int i, @Nullable String str, int i2) {
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        Bucket bucket = this.buckets[i];
        return (BucketChunkResult) bucket.getEnvironment().computeInReadonlyTransaction(transaction -> {
            int i3 = 0;
            ImmutableMap.Builder builder = ImmutableMap.builder();
            String str2 = str;
            boolean z = true;
            Cursor openCursor = bucket.getStore().openCursor(transaction);
            try {
                if (str == null) {
                    z = openCursor.getNext();
                } else {
                    ByteIterable bytesToByteIterable = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(str2));
                    if (openCursor.getSearchKeyRange(bytesToByteIterable) == null) {
                        BucketChunkResult bucketChunkResult = new BucketChunkResult(builder.build(), true, str, i);
                        if (openCursor != null) {
                            openCursor.close();
                        }
                        return bucketChunkResult;
                    }
                    if (openCursor.getKey().equals(bytesToByteIterable)) {
                        z = openCursor.getNext();
                    }
                }
                while (z && i3 < i2) {
                    String byteIterableToString = XodusUtils.byteIterableToString(openCursor.getKey());
                    RetainedMessage deserializeValue = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue()));
                    if (PublishUtil.checkExpiry(deserializeValue.getTimestamp(), deserializeValue.getMessageExpiryInterval())) {
                        z = openCursor.getNext();
                    } else {
                        byte[] payloadOrNull = this.payloadPersistence.getPayloadOrNull(deserializeValue.getPublishId());
                        if (payloadOrNull == null) {
                            log.warn("Could not dereference payload for retained message on topic \"{}\" with payload id \"{}\".", byteIterableToString, Long.valueOf(deserializeValue.getPublishId()));
                            z = openCursor.getNext();
                        } else {
                            deserializeValue.setMessage(payloadOrNull);
                            str2 = byteIterableToString;
                            i3 += deserializeValue.getEstimatedSizeInMemory();
                            builder.put(str2, deserializeValue);
                            z = openCursor.getNext();
                        }
                    }
                }
                if (openCursor != null) {
                    openCursor.close();
                }
                return new BucketChunkResult(builder.build(), !z, str2, i);
            } catch (Throwable th) {
                if (openCursor != null) {
                    try {
                        openCursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Override // com.hivemq.persistence.retained.RetainedMessageLocalPersistence
    public void iterate(@NotNull RetainedMessageLocalPersistence.ItemCallback itemCallback) {
        ThreadPreConditions.startsWith(ThreadPreConditions.SINGLE_WRITER_THREAD_PREFIX);
        for (Bucket bucket : this.buckets) {
            bucket.getEnvironment().executeInReadonlyTransaction(transaction -> {
                Cursor openCursor = bucket.getStore().openCursor(transaction);
                while (openCursor.getNext()) {
                    try {
                        itemCallback.onItem(this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(openCursor.getKey())), this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(openCursor.getValue())));
                    } catch (Throwable th) {
                        if (openCursor != null) {
                            try {
                                openCursor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                if (openCursor != null) {
                    openCursor.close();
                }
            });
        }
    }
}
