/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue;
import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ActivationHandler;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelInitializer;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;

public class OperationChannel
implements MessagePassingQueue.Consumer<HotRodOperation<?>> {
    private static final Log log = LogFactory.getLog(OperationChannel.class);
    public static final AttributeKey<OperationChannel> OPERATION_CHANNEL_ATTRIBUTE_KEY = AttributeKey.newInstance((String)"hotrod-operation");
    private final Runnable SEND_OPERATIONS = this::sendOperations;
    private final SocketAddress address;
    private final ChannelInitializer newChannelInvoker;
    private final AtomicReference<CompletableFuture<Void>> attemptedConnect = new AtomicReference();
    private final Function<String, ClientTopology> currentCacheTopologyFunction;
    private final BiConsumer<OperationChannel, Throwable> connectionFailureListener;
    private final MpscUnboundedArrayQueue<HotRodOperation<?>> queue = new MpscUnboundedArrayQueue(128);
    private volatile Channel channel;
    private boolean acceptingRequests;
    Codec codec;
    HeaderDecoder headerDecoder;
    ByteBuf buffer;

    OperationChannel(SocketAddress unresolvedAddress, ChannelInitializer channelInitializer, Function<String, ClientTopology> currentCacheTopologyFunction, BiConsumer<OperationChannel, Throwable> connectionFailureListener) {
        assert (!(unresolvedAddress instanceof InetSocketAddress) || ((InetSocketAddress)unresolvedAddress).isUnresolved());
        this.address = unresolvedAddress;
        this.newChannelInvoker = channelInitializer;
        this.currentCacheTopologyFunction = currentCacheTopologyFunction;
        this.connectionFailureListener = connectionFailureListener;
    }

    public static OperationChannel createAndStart(SocketAddress address, ChannelInitializer newChannelInvoker, Function<String, ClientTopology> currentCacheTopologyFunction, BiConsumer<OperationChannel, Throwable> connectionFailureListener) {
        OperationChannel operationChannel = new OperationChannel(address, newChannelInvoker, currentCacheTopologyFunction, connectionFailureListener);
        operationChannel.attemptConnect();
        return operationChannel;
    }

    CompletionStage<Void> attemptConnect() {
        CompletableFuture<Void> connectFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> prev = this.attemptedConnect.compareAndExchange(null, connectFuture);
        if (prev != null) {
            return prev;
        }
        this.channel = null;
        ChannelFuture channelFuture = this.newChannelInvoker.createChannel();
        channelFuture.addListener(f -> {
            if (f.isSuccess()) {
                Channel c = channelFuture.channel();
                assert (c.eventLoop().inEventLoop());
                c.attr(OPERATION_CHANNEL_ATTRIBUTE_KEY).set((Object)this);
                this.channel = c;
                this.headerDecoder = (HeaderDecoder)c.pipeline().get(HeaderDecoder.class);
                this.codec = this.headerDecoder.getConfiguration().version().getCodec();
                this.connectionFailureListener.accept(this, null);
                this.channel.pipeline().fireUserEventTriggered(ActivationHandler.ACTIVATION_EVENT);
                log.tracef("OperationChannel %s connect complete to %s", this, c);
            } else {
                Throwable cause = f.cause();
                log.tracef("Connection attempt to %s encountered exception for %s", this.address, cause);
                connectFuture.completeExceptionally(cause);
                this.attemptedConnect.compareAndSet(connectFuture, null);
                TransportException transportCause = new TransportException(cause, this.address);
                this.connectionFailureListener.accept(this, transportCause);
            }
        });
        return connectFuture;
    }

    public void setCodec(Codec codec) {
        assert (this.channel.eventLoop().inEventLoop());
        this.codec = codec;
    }

    public boolean isAcceptingRequests() {
        CompletableFuture<Void> f = this.attemptedConnect.get();
        return f != null && f.isDone();
    }

    public void markAcceptingRequests() {
        this.channel.eventLoop().submit(() -> {
            this.attemptedConnect.get().complete(null);
            this.acceptingRequests = true;
            this.sendOperations();
        });
    }

    public void forceSendOperation(HotRodOperation<?> operation) {
        if (!this.channel.eventLoop().inEventLoop()) {
            throw new IllegalArgumentException("Force sent operation " + String.valueOf(operation) + " are required to be sent in the event loop only " + String.valueOf(this.channel.eventLoop()));
        }
        log.tracef("Immediately sending operation %s to channel %s", operation, this.channel);
        long messageId = this.headerDecoder.registerOperation(operation);
        Channel channel = this.channel;
        ByteBuf buffer = channel.alloc().buffer();
        Codec codecToUse = this.codec.isUnsafeForTheHandshake() ? ProtocolVersion.SAFE_HANDSHAKE_PROTOCOL_VERSION.getCodec() : this.codec;
        codecToUse.writeHeader(buffer, messageId, this.currentCacheTopologyFunction.apply(operation.getCacheName()), operation);
        operation.writeOperationRequest(channel, buffer, codecToUse);
        channel.writeAndFlush((Object)buffer, channel.voidPromise());
    }

    public void sendOperation(HotRodOperation<?> operation) {
        this.queue.offer(operation);
        Channel channel = this.channel;
        if (channel != null) {
            log.tracef("Enqueued operation %s to send to channel %s", operation, channel);
            channel.eventLoop().execute(this.SEND_OPERATIONS);
        } else {
            log.tracef("Enqueued operation %s to send to address %s when connected", operation, this.address);
            this.attemptConnect();
        }
    }

    public Iterable<HotRodOperation<?>> reconnect(Throwable t) {
        assert (this.channel == null || this.channel.eventLoop().inEventLoop());
        Channel channel = this.channel;
        this.channel = null;
        CompletableFuture future = this.attemptedConnect.getAndSet(null);
        if (future != null) {
            future.completeExceptionally(t);
        }
        if (this.acceptingRequests) {
            log.tracef("Attempting to reconnect channel %s after exception %s", channel, t);
            this.acceptingRequests = false;
            this.attemptConnect();
        } else {
            log.tracef("Channel %s was never fully accepted, not reconnecting after exception %s", channel, t);
        }
        ArrayList channelOps = new ArrayList();
        this.queue.drain(channelOps::add, Integer.MAX_VALUE);
        return channelOps;
    }

    private void sendOperations() {
        Channel channel = this.channel;
        assert (channel == null || channel.eventLoop().inEventLoop());
        if (!this.acceptingRequests || this.queue.isEmpty() || channel == null) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.tracef("OperationChannel %s Sending commands: %s enqueue to send to channel %s", this, this.queue.size(), channel);
        }
        this.queue.drain((MessagePassingQueue.Consumer)this, 256);
        if (this.buffer != null && this.buffer.isReadable()) {
            log.tracef("Flushing commands to channel %s", channel);
            channel.writeAndFlush((Object)this.buffer, channel.voidPromise());
            this.buffer = null;
        }
        if (log.isTraceEnabled()) {
            log.tracef("Queue size after: %s", this.queue.size());
        }
        if (!this.queue.isEmpty()) {
            log.tracef("Resubmitting as more operations in queue after sending", new Object[0]);
            channel.eventLoop().execute(this.SEND_OPERATIONS);
        }
    }

    public SocketAddress getAddress() {
        return this.address;
    }

    public void accept(HotRodOperation<?> operation) {
        try {
            if (this.buffer == null) {
                this.buffer = this.channel.alloc().buffer();
            }
            long messageId = this.headerDecoder.registerOperation(operation);
            this.codec.writeHeader(this.buffer, messageId, this.currentCacheTopologyFunction.apply(operation.getCacheName()), operation);
            operation.writeOperationRequest(this.channel, this.buffer, this.codec);
        }
        catch (Throwable t) {
            log.tracef(t, "Encountered exception while attempting to write to channel %s", this.channel);
        }
    }

    public Queue<HotRodOperation<?>> pendingChannelOperations() {
        return this.queue;
    }

    public List<HotRodOperation<?>> close() {
        this.acceptingRequests = false;
        if (this.channel != null) {
            this.channel.close();
        }
        ArrayList channelOps = new ArrayList();
        this.queue.drain(channelOps::add, Integer.MAX_VALUE);
        return channelOps;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public String toString() {
        return "OperationChannel{address=" + String.valueOf(this.address) + ", queue.size=" + this.queue.size() + ", channel=" + String.valueOf(this.channel) + ", acceptingRequests=" + this.acceptingRequests + "}";
    }
}

