/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.buffer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.sender.Sender;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PackedForwardBuffer
extends Buffer<Config> {
    private static final Logger LOG = LoggerFactory.getLogger(PackedForwardBuffer.class);
    private final Map<String, ExpirableBuffer> appendedChunks = new HashMap<String, ExpirableBuffer>();
    private final LinkedBlockingQueue<TaggableBuffer> flushableChunks = new LinkedBlockingQueue();
    private final AtomicInteger emitCounter = new AtomicInteger();
    private final AtomicReference<Long> lastAppendedChunksChecked = new AtomicReference();
    private final ThreadLocal<ObjectMapper> objectMapperHolder = new ThreadLocal<ObjectMapper>(){

        @Override
        protected ObjectMapper initialValue() {
            return new ObjectMapper((JsonFactory)new MessagePackFactory());
        }
    };
    private final ThreadLocal<ByteArrayOutputStream> outputStreamHolder = new ThreadLocal<ByteArrayOutputStream>(){

        @Override
        protected ByteArrayOutputStream initialValue() {
            return new ByteArrayOutputStream();
        }
    };

    private PackedForwardBuffer(Config bufferConfig) {
        super(bufferConfig);
    }

    private synchronized ExpirableBuffer prepareBuffer(String tag, int writeSize) throws Buffer.BufferFullException {
        int newChunkSize;
        int origChunkSize;
        ExpirableBuffer chunk = this.appendedChunks.get(tag);
        if (chunk != null && chunk.getByteBuffer().remaining() > writeSize) {
            return chunk;
        }
        if (chunk == null) {
            origChunkSize = 0;
            newChunkSize = ((Config)this.bufferConfig).getBuffInitialSize();
        } else {
            origChunkSize = chunk.getByteBuffer().capacity();
            newChunkSize = (int)((float)chunk.getByteBuffer().capacity() * ((Config)this.bufferConfig).getBuffExpandRatio());
        }
        while (newChunkSize < writeSize) {
            newChunkSize = (int)((float)newChunkSize * ((Config)this.bufferConfig).getBuffExpandRatio());
        }
        int delta = newChunkSize - origChunkSize;
        if (this.totalSize.get() + delta > ((Config)this.bufferConfig).getBufferSize()) {
            throw new Buffer.BufferFullException("Buffer is full. bufferConfig=" + this.bufferConfig + ", totalSize=" + this.totalSize);
        }
        this.totalSize.addAndGet(delta);
        ExpirableBuffer newBuffer = new ExpirableBuffer(ByteBuffer.allocate(newChunkSize));
        if (chunk != null) {
            chunk.getByteBuffer().flip();
            newBuffer.getByteBuffer().put(chunk.getByteBuffer());
        }
        LOG.trace("prepareBuffer(): allocate a new buffer. tag={}, buffer={}", (Object)tag, (Object)newBuffer);
        this.appendedChunks.put(tag, newBuffer);
        return newBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void append(String tag, long timestamp, Map<String, Object> data) throws IOException {
        ObjectMapper objectMapper = this.objectMapperHolder.get();
        ByteArrayOutputStream outputStream = this.outputStreamHolder.get();
        outputStream.reset();
        objectMapper.writeValue((OutputStream)outputStream, Arrays.asList(timestamp, data));
        outputStream.close();
        boolean succeeded = false;
        while (!succeeded) {
            try {
                Map<String, ExpirableBuffer> map = this.appendedChunks;
                synchronized (map) {
                    ExpirableBuffer buffer = this.prepareBuffer(tag, outputStream.size());
                    buffer.getByteBuffer().put(outputStream.toByteArray());
                    succeeded = true;
                    buffer.getLastUpdatedTimeMillis().set(System.currentTimeMillis());
                    this.moveChunkIfNeeded(tag, buffer);
                    if (this.emitCounter.incrementAndGet() % 1000 == 0) {
                        this.moveChunks(false);
                    }
                }
            }
            catch (Buffer.BufferFullException e) {
                LOG.warn("Buffer is full. Maybe you'd better increase the buffer size.", (Throwable)e);
                try {
                    TimeUnit.MILLISECONDS.sleep(500L);
                }
                catch (InterruptedException e1) {
                    LOG.warn("Interrupted", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private synchronized void moveChunkIfNeeded(String tag, ExpirableBuffer buffer) throws IOException {
        if (buffer.getByteBuffer().position() > ((Config)this.bufferConfig).getChunkSize()) {
            this.moveChunk(tag, buffer);
        }
    }

    private synchronized void moveChunks(boolean force) throws IOException {
        long expiredThreshold = System.currentTimeMillis() - (long)((Config)this.bufferConfig).getChunkRetentionTimeMillis();
        for (Map.Entry<String, ExpirableBuffer> entry : this.appendedChunks.entrySet()) {
            if (entry.getValue() == null || !force && entry.getValue().getLastUpdatedTimeMillis().get() >= expiredThreshold) continue;
            this.moveChunk(entry.getKey(), entry.getValue());
        }
    }

    private synchronized void moveChunk(String tag, ExpirableBuffer buffer) throws IOException {
        try {
            LOG.trace("moveChunk(): tag={}, buffer={}", (Object)tag, (Object)buffer);
            this.flushableChunks.put(new TaggableBuffer(tag, buffer.getByteBuffer()));
            this.appendedChunks.put(tag, null);
        }
        catch (InterruptedException e) {
            throw new IOException("Failed to move chunk due to interruption", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flushInternal(Sender sender) throws IOException {
        long now = System.currentTimeMillis();
        Long lastAppendedChunksChecked = this.lastAppendedChunksChecked.get();
        if (lastAppendedChunksChecked == null) {
            this.lastAppendedChunksChecked.set(now);
        } else if (lastAppendedChunksChecked < now - 500L) {
            this.moveChunks(false);
            this.lastAppendedChunksChecked.set(now);
        }
        TaggableBuffer chunk = null;
        while ((chunk = this.flushableChunks.poll()) != null) {
            try {
                this.totalSize.addAndGet(-chunk.getByteBuffer().capacity());
                ByteArrayOutputStream header = new ByteArrayOutputStream();
                MessagePacker messagePacker = MessagePack.newDefaultPacker((OutputStream)header);
                LOG.trace("flushInternal(): bufferUsage={}, chunk={}", (Object)Float.valueOf(this.getBufferUsage()), (Object)chunk);
                String tag = chunk.getTag();
                ByteBuffer byteBuffer = chunk.getByteBuffer();
                messagePacker.packArrayHeader(2);
                messagePacker.packString(tag);
                messagePacker.packRawStringHeader(byteBuffer.position());
                messagePacker.flush();
                Sender sender2 = sender;
                synchronized (sender2) {
                    sender.send(ByteBuffer.wrap(header.toByteArray()));
                    byteBuffer.flip();
                    sender.send(byteBuffer);
                }
            }
            catch (Throwable e) {
                try {
                    this.flushableChunks.put(chunk);
                    this.totalSize.addAndGet(chunk.getByteBuffer().capacity());
                }
                catch (InterruptedException e1) {
                    LOG.error("Interrupted during restoring fetched chunk. It can be lost. chunk={}", (Object)chunk);
                }
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                throw new RuntimeException("Failed to send chunk to fluentd", e);
            }
        }
    }

    @Override
    public synchronized void closeInternal(Sender sender) throws IOException {
        this.moveChunks(true);
        this.appendedChunks.clear();
        this.flush(sender);
    }

    public static class Config
    extends Buffer.Config<PackedForwardBuffer, Config> {
        private int buffInitialSize = 524288;
        private float buffExpandRatio = 2.0f;
        private int chunkSize = 0x400000;
        private int chunkRetentionTimeMillis = 500;

        public int getBuffInitialSize() {
            return this.buffInitialSize;
        }

        public Config setBuffInitialSize(int buffInitialSize) {
            this.buffInitialSize = buffInitialSize;
            return this;
        }

        public float getBuffExpandRatio() {
            return this.buffExpandRatio;
        }

        public Config setBuffExpandRatio(float buffExpandRatio) {
            this.buffExpandRatio = buffExpandRatio;
            return this;
        }

        public int getChunkSize() {
            return this.chunkSize;
        }

        public Config setChunkSize(int chunkSize) {
            this.chunkSize = chunkSize;
            return this;
        }

        public int getChunkRetentionTimeMillis() {
            return this.chunkRetentionTimeMillis;
        }

        public Config setChunkRetentionTimeMillis(int chunkRetentionTimeMillis) {
            this.chunkRetentionTimeMillis = chunkRetentionTimeMillis;
            return this;
        }

        @Override
        public String toString() {
            return "Config{buffInitialSize=" + this.buffInitialSize + ", buffExpandRatio=" + this.buffExpandRatio + ", chunkSize=" + this.chunkSize + ", chunkRetentionTimeMillis=" + this.chunkRetentionTimeMillis + "} " + super.toString();
        }

        @Override
        public PackedForwardBuffer createInstance() {
            return new PackedForwardBuffer(this);
        }
    }

    private static class TaggableBuffer {
        private final String tag;
        private final ByteBuffer byteBuffer;

        public TaggableBuffer(String tag, ByteBuffer byteBuffer) {
            this.tag = tag;
            this.byteBuffer = byteBuffer;
        }

        public String getTag() {
            return this.tag;
        }

        public ByteBuffer getByteBuffer() {
            return this.byteBuffer;
        }

        public String toString() {
            return "TaggableBuffer{tag='" + this.tag + '\'' + ", byteBuffer=" + this.byteBuffer + '}';
        }
    }

    private static class ExpirableBuffer {
        private final AtomicLong lastUpdatedTimeMillis = new AtomicLong();
        private final ByteBuffer byteBuffer;

        public ExpirableBuffer(ByteBuffer byteBuffer) {
            this.byteBuffer = byteBuffer;
        }

        public AtomicLong getLastUpdatedTimeMillis() {
            return this.lastUpdatedTimeMillis;
        }

        public ByteBuffer getByteBuffer() {
            return this.byteBuffer;
        }

        public String toString() {
            return "ExpirableBuffer{lastUpdatedTimeMillis=" + this.lastUpdatedTimeMillis + ", byteBuffer=" + this.byteBuffer + '}';
        }
    }
}

