/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.sender;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.komamitsu.fluency.sender.Sender;
import org.komamitsu.fluency.sender.TCPSender;
import org.komamitsu.fluency.sender.failuredetect.FailureDetectStrategy;
import org.komamitsu.fluency.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.sender.failuredetect.PhiAccrualFailureDetectStrategy;
import org.komamitsu.fluency.sender.heartbeat.Heartbeater;
import org.komamitsu.fluency.sender.heartbeat.TCPHeartbeater;
import org.komamitsu.fluency.util.Tuple;
import org.msgpack.core.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiSender
implements Sender {
    private static final Logger LOG = LoggerFactory.getLogger(MultiSender.class);
    @VisibleForTesting
    final List<Tuple<TCPSender, FailureDetector>> sendersAndFailureDetectors = new ArrayList<Tuple<TCPSender, FailureDetector>>();

    public MultiSender(List<TCPSender> senders, FailureDetectStrategy.Config failureDetectStrategyConfig, Heartbeater.Config heartbeaterConfig) throws IOException {
        for (TCPSender sender : senders) {
            Object config = heartbeaterConfig.dupDefaultConfig();
            ((Heartbeater.Config)config).setHost(sender.getHost());
            ((Heartbeater.Config)config).setPort(sender.getPort());
            ((Heartbeater.Config)config).setIntervalMillis(heartbeaterConfig.getIntervalMillis());
            FailureDetector failureDetector = new FailureDetector(failureDetectStrategyConfig, (Heartbeater.Config)config);
            this.sendersAndFailureDetectors.add(new Tuple<TCPSender, FailureDetector>(sender, failureDetector));
        }
    }

    public MultiSender(List<TCPSender> senders, Heartbeater.Config heartbeaterConfig) throws IOException {
        this(senders, new PhiAccrualFailureDetectStrategy.Config(), heartbeaterConfig);
    }

    public MultiSender(List<TCPSender> senders) throws IOException {
        this(senders, new PhiAccrualFailureDetectStrategy.Config(), new TCPHeartbeater.Config());
    }

    @Override
    public synchronized void send(ByteBuffer data) throws IOException {
        this.sendInternal(Arrays.asList(data), null);
    }

    @Override
    public synchronized void send(List<ByteBuffer> dataList) throws IOException {
        this.sendInternal(dataList, null);
    }

    @Override
    public void sendWithAck(List<ByteBuffer> dataList, byte[] ackToken) throws IOException {
        this.sendInternal(dataList, ackToken);
    }

    private synchronized void sendInternal(List<ByteBuffer> dataList, byte[] ackToken) throws AllNodesUnavailableException {
        ArrayList<Integer> positions = new ArrayList<Integer>(dataList.size());
        for (ByteBuffer byteBuffer : dataList) {
            positions.add(byteBuffer.position());
        }
        for (Tuple tuple : this.sendersAndFailureDetectors) {
            TCPSender sender = (TCPSender)tuple.getFirst();
            FailureDetector failureDetector = (FailureDetector)tuple.getSecond();
            LOG.trace("send(): hb.host={}, hb.port={}, isAvailable={}", new Object[]{failureDetector.getHeartbeater().getHost(), failureDetector.getHeartbeater().getPort(), failureDetector.isAvailable()});
            if (!failureDetector.isAvailable()) continue;
            try {
                if (ackToken == null) {
                    sender.send(dataList);
                } else {
                    sender.sendWithAck(dataList, ackToken);
                }
                return;
            }
            catch (IOException e) {
                LOG.error("Failed to send: sender=" + sender + ". Trying to use next sender...", (Throwable)e);
                for (int i = 0; i < dataList.size(); ++i) {
                    dataList.get(i).position((Integer)positions.get(i));
                }
                failureDetector.onFailure(e);
            }
        }
        throw new AllNodesUnavailableException("All nodes are unavailable");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        IOException firstException = null;
        for (Tuple<TCPSender, FailureDetector> senderAndFailureDetector : this.sendersAndFailureDetectors) {
            TCPSender sender = senderAndFailureDetector.getFirst();
            FailureDetector failureDetector = senderAndFailureDetector.getSecond();
            try {
                sender.close();
            }
            catch (IOException e) {
                if (firstException != null) continue;
                firstException = e;
            }
            finally {
                try {
                    failureDetector.close();
                }
                catch (IOException e) {
                    if (firstException != null) continue;
                    firstException = e;
                }
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    public static class AllNodesUnavailableException
    extends IOException {
        public AllNodesUnavailableException(String s) {
            super(s);
        }
    }
}

