/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.zookeeper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.curator.framework.CuratorFramework;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsStorage;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;

public class ZookeeperOffsetsStorage
implements OffsetsStorage {
    private static final String OFFSET_PATTERN_PATH = "/consumers/%s/offsets/%s";
    private final CuratorFramework curatorFramework;

    @Inject
    public ZookeeperOffsetsStorage(@Named(value="kafkaCurator") CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    @Override
    public void setSubscriptionOffset(Subscription subscription, PartitionOffset partitionOffset) {
        try {
            Long actualOffset = this.convertByteArrayToLong((byte[])this.curatorFramework.getData().forPath(this.getPartitionOffsetPath(subscription, partitionOffset.getPartition())));
            if (actualOffset > partitionOffset.getOffset()) {
                this.curatorFramework.setData().forPath(this.getPartitionOffsetPath(subscription, partitionOffset.getPartition()), Long.valueOf(partitionOffset.getOffset()).toString().getBytes(Charsets.UTF_8));
            }
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    private Long convertByteArrayToLong(byte[] data) {
        return Long.valueOf(new String(data, Charsets.UTF_8));
    }

    @Override
    public long getSubscriptionOffset(Subscription subscription, int partitionId) {
        try {
            byte[] offset = (byte[])this.curatorFramework.getData().forPath(this.getPartitionOffsetPath(subscription, partitionId));
            return Long.valueOf(new String(offset));
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    @VisibleForTesting
    protected String getPartitionOffsetPath(Subscription subscription, int partition) {
        return Joiner.on((String)"/").join((Object)ZookeeperOffsetsStorage.getOffsetPath(subscription), (Object)partition, new Object[0]);
    }

    private static String getOffsetPath(Subscription subscription) {
        return String.format(OFFSET_PATTERN_PATH, subscription.getId(), subscription.getQualifiedTopicName());
    }
}

