/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.buffer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.sender.Sender;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageBuffer
extends Buffer<Config> {
    private static final Logger LOG = LoggerFactory.getLogger(MessageBuffer.class);
    private final LinkedBlockingQueue<ByteBuffer> messages = new LinkedBlockingQueue();
    private final ObjectMapper objectMapper = new ObjectMapper((JsonFactory)new MessagePackFactory());
    private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    private MessageBuffer(Config bufferConfig) {
        super(bufferConfig);
    }

    @Override
    public synchronized void append(String tag, long timestamp, Map<String, Object> data) throws IOException {
        this.outputStream.reset();
        this.objectMapper.writeValue((OutputStream)this.outputStream, Arrays.asList(tag, timestamp, data));
        this.outputStream.close();
        if (this.allocatedSize.get() + this.outputStream.size() > ((Config)this.bufferConfig).getMaxBufferSize()) {
            throw new Buffer.BufferFullException("Buffer is full. bufferConfig=" + this.bufferConfig + ", allocatedSize=" + this.allocatedSize);
        }
        this.messages.add(ByteBuffer.wrap(this.outputStream.toByteArray()));
        this.allocatedSize.getAndAdd(this.outputStream.size());
    }

    @Override
    public synchronized void flushInternal(Sender sender) throws IOException {
        ByteBuffer message = null;
        while ((message = this.messages.poll()) != null) {
            try {
                this.allocatedSize.addAndGet(-message.capacity());
                sender.send(message);
            }
            catch (Throwable e) {
                try {
                    this.messages.put(message);
                    this.allocatedSize.addAndGet(message.capacity());
                }
                catch (InterruptedException e1) {
                    LOG.error("Interrupted during restoring fetched message. It can be lost. message={}", (Object)message);
                }
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                throw new RuntimeException("Failed to send message to fluentd", e);
            }
        }
    }

    @Override
    public void closeInternal(Sender sender) throws IOException {
        this.messages.clear();
    }

    public static class Config
    extends Buffer.Config<MessageBuffer, Config> {
        @Override
        public MessageBuffer createInstance() {
            return new MessageBuffer(this);
        }
    }
}

