/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.message.tracker;

import com.codahale.metrics.Timer;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBObject;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;
import pl.allegro.tech.hermes.api.SentMessageTraceStatus;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.message.tracker.LogSchemaAware;
import pl.allegro.tech.hermes.common.message.tracker.QueueMetrics;
import pl.allegro.tech.hermes.common.message.tracker.mongo.MongoQueueCommitter;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.Metrics;
import pl.allegro.tech.hermes.common.time.Clock;
import pl.allegro.tech.hermes.consumers.consumer.receiver.Message;
import pl.allegro.tech.hermes.consumers.message.tracker.LogRepository;

public class MongoLogRepository
implements LogRepository,
LogSchemaAware {
    private final Clock clock;
    private final BlockingQueue<DBObject> queue;
    private final String clusterName;

    @Inject
    public MongoLogRepository(DB database, Clock clock, HermesMetrics metrics, ConfigFactory config) {
        this(database, clock, metrics, config.getIntProperty(Configs.TRACKER_MONGODB_QUEUE_CAPACITY), config.getIntProperty(Configs.TRACKER_MONGODB_COMMIT_INTERVAL), config.getStringProperty(Configs.KAFKA_CLUSTER_NAME));
    }

    public MongoLogRepository(DB database, Clock clock, HermesMetrics metrics, int queueSize, int commitInterval, String clusterName) {
        this.clock = clock;
        this.queue = new LinkedBlockingQueue<DBObject>(queueSize);
        this.clusterName = clusterName;
        QueueMetrics.registerCurrentSizeGauge(this.queue, (Metrics.Gauge)Metrics.Gauge.CONSUMER_TRACKER_QUEUE_SIZE, (HermesMetrics)metrics);
        QueueMetrics.registerRemainingCapacityGauge(this.queue, (Metrics.Gauge)Metrics.Gauge.CONSUMER_TRACKER_REMAINING_CAPACITY, (HermesMetrics)metrics);
        MongoQueueCommitter.scheduleCommitAtFixedRate(this.queue, (String)"sent_messages", (DB)database, (Timer)metrics.timer(Metrics.Timer.CONSUMER_TRACKER_COMMIT_LATENCY), (int)commitInterval);
    }

    @Override
    public void logSuccessful(Message message, long timestamp, String topicName, String subscriptionName) {
        this.queue.offer((DBObject)this.subscriptionLog(message, timestamp, topicName, subscriptionName, SentMessageTraceStatus.SUCCESS));
    }

    @Override
    public void logFailed(Message message, long timestamp, String topicName, String subscriptionName, String reason) {
        this.queue.offer((DBObject)this.subscriptionLog(message, timestamp, topicName, subscriptionName, SentMessageTraceStatus.FAILED).append("reason", (Object)reason));
    }

    @Override
    public void logDiscarded(Message message, long timestamp, String topicName, String subscriptionName, String reason) {
        this.queue.offer((DBObject)this.subscriptionLog(message, timestamp, topicName, subscriptionName, SentMessageTraceStatus.DISCARDED).append("reason", (Object)reason));
    }

    @Override
    public void logInflight(Message message, long timestamp, String topicName, String subscriptionName) {
        this.queue.offer((DBObject)this.subscriptionLog(message, timestamp, topicName, subscriptionName, SentMessageTraceStatus.INFLIGHT));
    }

    private BasicDBObject subscriptionLog(Message message, long timestamp, String topicName, String subscriptionName, SentMessageTraceStatus status) {
        return new BasicDBObject().append("messageId", (Object)message.getId().get()).append("createdAt", (Object)this.clock.getDate()).append("timestamp", (Object)timestamp).append("topicName", (Object)topicName).append("subscription", (Object)subscriptionName).append("partition", (Object)message.getPartition()).append("offset", (Object)message.getOffset()).append("status", (Object)status.toString()).append("cluster", (Object)this.clusterName);
    }
}

