/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import com.google.common.net.HostAndPort;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.inject.Inject;
import kafka.api.ConsumerMetadataRequest;
import kafka.api.RequestOrResponse;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.network.BlockingChannel;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.ReadingConsumerMetadataException;

public class BlockingChannelFactory {
    private final HostAndPort broker;
    private final int readTimeout;

    @Inject
    public BlockingChannelFactory(ConfigFactory configFactory) {
        this(HostAndPort.fromString((String)BlockingChannelFactory.findAnyBroker(configFactory.getStringProperty(Configs.KAFKA_BROKER_LIST))), configFactory.getIntProperty(Configs.KAFKA_CONSUMER_METADATA_READ_TIMEOUT));
    }

    private static String findAnyBroker(String brokerList) {
        return Arrays.stream(brokerList.split(",")).findAny().get();
    }

    public BlockingChannelFactory(HostAndPort broker, int readTimeout) {
        this.broker = broker;
        this.readTimeout = readTimeout;
    }

    public BlockingChannel create(ConsumerGroupId consumerGroupId) {
        ConsumerMetadataResponse metadataResponse = this.readConsumerMetadata(consumerGroupId);
        if (metadataResponse.errorCode() != ErrorMapping.NoError()) {
            throw new ReadingConsumerMetadataException(metadataResponse.errorCode());
        }
        Broker coordinator = metadataResponse.coordinator();
        BlockingChannel blockingChannel = new BlockingChannel(coordinator.host(), coordinator.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), this.readTimeout);
        return blockingChannel;
    }

    private ConsumerMetadataResponse readConsumerMetadata(ConsumerGroupId consumerGroupId) {
        BlockingChannel channel = new BlockingChannel(this.broker.getHostText(), this.broker.getPort(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), this.readTimeout);
        channel.connect();
        channel.send((RequestOrResponse)new ConsumerMetadataRequest(consumerGroupId.asString(), ConsumerMetadataRequest.CurrentVersion(), 0, "0"));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom((ByteBuffer)channel.receive().buffer());
        channel.disconnect();
        return metadataResponse;
    }
}

