/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.zookeeper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.curator.framework.CuratorFramework;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaZookeeperPaths;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsStorage;

public class ZookeeperOffsetsStorage
implements OffsetsStorage {
    public static final int OFFSET_MISSING = -1;
    private final CuratorFramework curatorFramework;
    private final KafkaNamesMapper kafkaNamesMapper;

    @Inject
    public ZookeeperOffsetsStorage(@Named(value="kafkaCurator") CuratorFramework curatorFramework, KafkaNamesMapper kafkaNamesMapper) {
        this.curatorFramework = curatorFramework;
        this.kafkaNamesMapper = kafkaNamesMapper;
    }

    @Override
    public void setSubscriptionOffset(Subscription subscription, PartitionOffset partitionOffset) {
        try {
            String offsetPath = this.getPartitionOffsetPath(subscription, partitionOffset.getTopic(), partitionOffset.getPartition());
            long currentOffset = -1L;
            if (this.curatorFramework.checkExists().forPath(offsetPath) != null) {
                currentOffset = this.convertByteArrayToLong((byte[])this.curatorFramework.getData().forPath(offsetPath));
            }
            if (currentOffset == -1L || currentOffset > partitionOffset.getOffset()) {
                if (currentOffset == -1L) {
                    this.curatorFramework.create().creatingParentsIfNeeded().forPath(offsetPath);
                }
                this.curatorFramework.setData().forPath(offsetPath, Long.valueOf(partitionOffset.getOffset()).toString().getBytes(Charsets.UTF_8));
            }
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    private long convertByteArrayToLong(byte[] data) {
        return Long.valueOf(new String(data, Charsets.UTF_8));
    }

    @Override
    public long getSubscriptionOffset(Subscription subscription, KafkaTopicName kafkaTopicName, int partitionId) {
        try {
            byte[] offset = (byte[])this.curatorFramework.getData().forPath(this.getPartitionOffsetPath(subscription, kafkaTopicName, partitionId));
            return Long.valueOf(new String(offset));
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    @VisibleForTesting
    protected String getPartitionOffsetPath(Subscription subscription, KafkaTopicName kafkaTopicName, int partition) {
        return KafkaZookeeperPaths.partitionOffsetPath((ConsumerGroupId)this.kafkaNamesMapper.toConsumerGroupId(subscription), (KafkaTopicName)kafkaTopicName, (int)partition);
    }
}

