/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.sender;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.komamitsu.fluency.sender.AckTokenSerDe;
import org.komamitsu.fluency.sender.MessagePackAckTokenSerDe;
import org.komamitsu.fluency.sender.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TCPSender
implements Sender {
    private static final Logger LOG = LoggerFactory.getLogger(TCPSender.class);
    private static final Charset CHARSET_FOR_ERRORLOG = Charset.forName("UTF-8");
    private final AtomicReference<SocketChannel> channel = new AtomicReference();
    private final String host;
    private final int port;
    private final byte[] optionBuffer = new byte[256];
    private final AckTokenSerDe ackTokenSerDe = new MessagePackAckTokenSerDe();

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public TCPSender(String host, int port) throws IOException {
        this.port = port;
        this.host = host;
    }

    public TCPSender(int port) throws IOException {
        this("127.0.0.1", port);
    }

    public TCPSender(String host) throws IOException {
        this(host, 24224);
    }

    public TCPSender() throws IOException {
        this("127.0.0.1", 24224);
    }

    private SocketChannel getOrOpenChannel() throws IOException {
        if (this.channel.get() == null) {
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(this.host, this.port));
            socketChannel.socket().setTcpNoDelay(true);
            socketChannel.socket().setSoTimeout(5000);
            this.channel.set(socketChannel);
        }
        return this.channel.get();
    }

    @Override
    public synchronized void send(ByteBuffer data) throws IOException {
        try {
            LOG.trace("send(): sender.host={}, sender.port={}", (Object)this.getHost(), (Object)this.getPort());
            this.getOrOpenChannel().write(data);
        }
        catch (IOException e) {
            this.channel.set(null);
            throw e;
        }
    }

    @Override
    public synchronized void send(List<ByteBuffer> dataList) throws IOException {
        try {
            LOG.trace("send(): sender.host={}, sender.port={}", (Object)this.getHost(), (Object)this.getPort());
            this.getOrOpenChannel().write(dataList.toArray(new ByteBuffer[dataList.size()]));
        }
        catch (IOException e) {
            this.channel.set(null);
            throw e;
        }
    }

    @Override
    public synchronized void sendWithAck(List<ByteBuffer> dataList, byte[] ackToken) throws IOException {
        this.send(dataList);
        this.send(ByteBuffer.wrap(this.ackTokenSerDe.pack(ackToken)));
        ByteBuffer byteBuffer = ByteBuffer.wrap(this.optionBuffer);
        this.getOrOpenChannel().read(byteBuffer);
        byte[] unpackedToken = this.ackTokenSerDe.unpack(this.optionBuffer);
        if (!Arrays.equals(ackToken, unpackedToken)) {
            throw new UnmatchedAckException("Ack tokens don't matched: expected=" + new String(ackToken, CHARSET_FOR_ERRORLOG) + ", got=" + new String(unpackedToken, CHARSET_FOR_ERRORLOG));
        }
    }

    @Override
    public synchronized void close() throws IOException {
        SocketChannel socketChannel = this.channel.getAndSet(null);
        if (socketChannel != null) {
            socketChannel.close();
        }
    }

    public static class UnmatchedAckException
    extends IOException {
        public UnmatchedAckException(String message) {
            super(message);
        }
    }
}

