/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.zookeeper;

import com.codahale.metrics.Timer;
import java.nio.charset.StandardCharsets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaZookeeperPaths;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.offset.FailedToCommitOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsToCommit;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;

public class ZookeeperMessageCommitter
implements MessageCommitter {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperMessageCommitter.class);
    private final CuratorFramework curatorFramework;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final HermesMetrics metrics;

    public ZookeeperMessageCommitter(CuratorFramework curatorFramework, KafkaNamesMapper kafkaNamesMapper, HermesMetrics metrics) {
        this.curatorFramework = curatorFramework;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.metrics = metrics;
    }

    @Override
    public FailedToCommitOffsets commitOffsets(OffsetsToCommit offsetsToCommit) {
        FailedToCommitOffsets failedOffsets = new FailedToCommitOffsets();
        for (SubscriptionName subscriptionName : offsetsToCommit.subscriptionNames()) {
            for (SubscriptionPartitionOffset offset : offsetsToCommit.batchFor(subscriptionName)) {
                try {
                    Timer.Context c = this.metrics.timer("offset-committer.single-commit.zookeeper").time();
                    Throwable throwable = null;
                    try {
                        this.commitOffset(offset);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (c == null) continue;
                        if (throwable != null) {
                            try {
                                c.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        c.close();
                    }
                }
                catch (Exception exception) {
                    logger.warn("Failed to commit offset {}", (Object)offset);
                    failedOffsets.add(offset);
                }
            }
        }
        return failedOffsets;
    }

    private void commitOffset(SubscriptionPartitionOffset partitionOffset) throws Exception {
        byte[] data = String.valueOf(partitionOffset.getOffset()).getBytes(StandardCharsets.UTF_8);
        String offsetPath = KafkaZookeeperPaths.partitionOffsetPath((ConsumerGroupId)this.kafkaNamesMapper.toConsumerGroupId(partitionOffset.getSubscriptionName()), (KafkaTopicName)partitionOffset.getKafkaTopicName(), (int)partitionOffset.getPartition());
        try {
            this.curatorFramework.setData().forPath(offsetPath, data);
        }
        catch (KeeperException.NoNodeException ex) {
            this.curatorFramework.create().creatingParentsIfNeeded().forPath(offsetPath, data);
        }
    }

    @Override
    public void removeOffset(TopicName topicName, String subscriptionName, KafkaTopicName topic, int partition) throws Exception {
        String offsetPath = KafkaZookeeperPaths.partitionOffsetPath((ConsumerGroupId)this.kafkaNamesMapper.toConsumerGroupId(new SubscriptionName(subscriptionName, topicName)), (KafkaTopicName)topic, (int)partition);
        this.curatorFramework.delete().forPath(offsetPath);
    }
}

