/*
 * Decompiled with CFR 0.152.
 */
package org.epics.ca.impl.requests;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.epics.ca.Monitor;
import org.epics.ca.Status;
import org.epics.ca.impl.ChannelImpl;
import org.epics.ca.impl.ContextImpl;
import org.epics.ca.impl.Messages;
import org.epics.ca.impl.NotifyResponseRequest;
import org.epics.ca.impl.TCPTransport;
import org.epics.ca.impl.Transport;
import org.epics.ca.impl.TypeSupports;
import org.epics.ca.util.Holder;

public class MonitorRequest<T>
implements Monitor<T>,
NotifyResponseRequest {
    private static final Logger logger = Logger.getLogger(MonitorRequest.class.getName());
    protected final ContextImpl context;
    protected final int ioid;
    protected final ChannelImpl<?> channel;
    protected final TypeSupports.TypeSupport<T> typeSupport;
    protected final int mask;
    protected final Disruptor<Holder<T>> disruptor;
    protected final AtomicBoolean closed = new AtomicBoolean();
    protected T overrunValue;
    protected Holder<T> lastValue;

    public MonitorRequest(ChannelImpl<?> channel, Transport transport, TypeSupports.TypeSupport<T> typeSupport, int mask, Disruptor<Holder<T>> disruptor) {
        this.channel = channel;
        this.typeSupport = typeSupport;
        this.mask = mask;
        this.disruptor = disruptor;
        this.context = transport.getContext();
        this.ioid = this.context.registerResponseRequest(this);
        channel.registerResponseRequest(this);
        this.resubscribe(transport);
    }

    @Override
    public int getIOID() {
        return this.ioid;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void response(int status, short dataType, int dataCount, ByteBuffer dataPayloadBuffer) {
        Status caStatus = Status.forStatusCode(status);
        if (caStatus == Status.NORMAL) {
            RingBuffer ringBuffer = this.disruptor.getRingBuffer();
            if (ringBuffer.hasAvailableCapacity(1)) {
                long next = ringBuffer.next();
                try {
                    this.lastValue = (Holder)ringBuffer.get(next);
                    this.lastValue.value = this.typeSupport.deserialize(dataPayloadBuffer, this.lastValue.value, dataCount);
                }
                finally {
                    ringBuffer.publish(next);
                }
            } else {
                this.overrunValue = this.typeSupport.deserialize(dataPayloadBuffer, this.overrunValue, dataCount);
                Object tmp = this.lastValue.value;
                this.lastValue.value = this.overrunValue;
                this.overrunValue = tmp;
            }
        } else {
            this.cancel();
        }
    }

    @Override
    public void cancel() {
        this.context.unregisterResponseRequest(this);
        this.channel.unregisterResponseRequest(this);
        this.disruptor.halt();
    }

    public void resubscribe(Transport transport) {
        int dataCount = this.typeSupport.getForcedElementCount();
        if (dataCount == 0 && this.channel.getTransport().getMinorRevision() < 13) {
            dataCount = this.channel.getNativeElementCount();
        }
        Messages.createSubscriptionMessage(transport, this.typeSupport.getDataType(), dataCount, this.channel.getSID(), this.ioid, this.mask);
        transport.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void exception(int errorCode, String errorMessage) {
        Status status = Status.forStatusCode(errorCode);
        if (status == null) {
            logger.warning(() -> "Unknown CA status code received for monitor, code: " + errorCode + ", message: " + errorMessage);
            return;
        }
        if (status == Status.CHANDESTROY) {
            this.cancel();
        } else if (status == Status.DISCONN) {
            RingBuffer ringBuffer = this.disruptor.getRingBuffer();
            if (ringBuffer.hasAvailableCapacity(1)) {
                long next = ringBuffer.next();
                try {
                    Holder holder = (Holder)ringBuffer.get(next);
                    holder.value = null;
                }
                finally {
                    ringBuffer.publish(next);
                }
            }
        } else {
            logger.warning(() -> "Exception with CA status " + (Object)((Object)status) + " received for monitor, message: " + (errorMessage != null ? errorMessage : status.getMessage()));
        }
    }

    @Override
    public Disruptor<Holder<T>> getDisruptor() {
        return this.disruptor;
    }

    @Override
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.cancel();
        TCPTransport transport = this.channel.getTransport();
        if (transport == null) {
            return;
        }
        int dataCount = this.typeSupport.getForcedElementCount();
        if (dataCount == 0 && this.channel.getTransport().getMinorRevision() < 13) {
            dataCount = this.channel.getNativeElementCount();
        }
        try {
            Messages.cancelSubscriptionMessage(transport, this.typeSupport.getDataType(), dataCount, this.channel.getSID(), this.ioid);
            transport.flush();
        }
        catch (Throwable th) {
            logger.log(Level.FINER, "Failed to send 'cancel subscription' message.", th);
        }
    }
}

