/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.buffer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.sender.Sender;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PackedForwardBuffer
extends Buffer<Config> {
    private static final Logger LOG = LoggerFactory.getLogger(PackedForwardBuffer.class);
    private final Map<String, RetentionBuffer> retentionBuffers = new HashMap<String, RetentionBuffer>();
    private final LinkedBlockingQueue<TaggableBuffer> flushableBuffers = new LinkedBlockingQueue();
    private final AtomicInteger emitCounter = new AtomicInteger();
    private final ThreadLocal<ObjectMapper> objectMapperHolder = new ThreadLocal<ObjectMapper>(){

        @Override
        protected ObjectMapper initialValue() {
            return new ObjectMapper((JsonFactory)new MessagePackFactory());
        }
    };
    private final ThreadLocal<ByteArrayOutputStream> outputStreamHolder = new ThreadLocal<ByteArrayOutputStream>(){

        @Override
        protected ByteArrayOutputStream initialValue() {
            return new ByteArrayOutputStream();
        }
    };

    private PackedForwardBuffer(Config bufferConfig) {
        super(bufferConfig);
    }

    private synchronized RetentionBuffer prepareBuffer(String tag, int writeSize) throws Buffer.BufferFullException {
        int newRetentionBufferSize;
        int origRetentionBufferSize;
        RetentionBuffer retentionBuffer = this.retentionBuffers.get(tag);
        if (retentionBuffer != null && retentionBuffer.getByteBuffer().remaining() > writeSize) {
            return retentionBuffer;
        }
        if (retentionBuffer == null) {
            origRetentionBufferSize = 0;
            newRetentionBufferSize = ((Config)this.bufferConfig).getInitialBufferSize();
        } else {
            origRetentionBufferSize = retentionBuffer.getByteBuffer().capacity();
            newRetentionBufferSize = (int)((float)retentionBuffer.getByteBuffer().capacity() * ((Config)this.bufferConfig).getBufferExpandRatio());
        }
        while (newRetentionBufferSize < writeSize) {
            newRetentionBufferSize = (int)((float)newRetentionBufferSize * ((Config)this.bufferConfig).getBufferExpandRatio());
        }
        int delta = newRetentionBufferSize - origRetentionBufferSize;
        if (this.allocatedSize.get() + delta > ((Config)this.bufferConfig).getMaxBufferSize()) {
            throw new Buffer.BufferFullException("Buffer is full. bufferConfig=" + this.bufferConfig + ", allocatedSize=" + this.allocatedSize);
        }
        this.allocatedSize.addAndGet(delta);
        RetentionBuffer newBuffer = new RetentionBuffer(ByteBuffer.allocate(newRetentionBufferSize));
        if (retentionBuffer != null) {
            retentionBuffer.getByteBuffer().flip();
            newBuffer.getByteBuffer().put(retentionBuffer.getByteBuffer());
        }
        LOG.trace("prepareBuffer(): allocate a new buffer. tag={}, buffer={}", (Object)tag, (Object)newBuffer);
        this.retentionBuffers.put(tag, newBuffer);
        return newBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void append(String tag, long timestamp, Map<String, Object> data) throws IOException {
        ObjectMapper objectMapper = this.objectMapperHolder.get();
        ByteArrayOutputStream outputStream = this.outputStreamHolder.get();
        outputStream.reset();
        objectMapper.writeValue((OutputStream)outputStream, Arrays.asList(timestamp, data));
        outputStream.close();
        boolean succeeded = false;
        while (!succeeded) {
            try {
                Map<String, RetentionBuffer> map = this.retentionBuffers;
                synchronized (map) {
                    RetentionBuffer buffer = this.prepareBuffer(tag, outputStream.size());
                    buffer.getByteBuffer().put(outputStream.toByteArray());
                    succeeded = true;
                    buffer.getLastUpdatedTimeMillis().set(System.currentTimeMillis());
                    this.moveRetentionBufferIfNeeded(tag, buffer);
                    if (this.emitCounter.incrementAndGet() % 1000 == 0) {
                        this.moveRetentionBuffersToFlushable(false);
                    }
                }
            }
            catch (Buffer.BufferFullException e) {
                LOG.warn("Buffer is full. Maybe you'd better increase the buffer size.", (Throwable)e);
                try {
                    TimeUnit.MILLISECONDS.sleep(500L);
                }
                catch (InterruptedException e1) {
                    LOG.warn("Interrupted", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private synchronized void moveRetentionBufferIfNeeded(String tag, RetentionBuffer buffer) throws IOException {
        if (buffer.getByteBuffer().position() > ((Config)this.bufferConfig).getBufferRetentionSize()) {
            this.moveRetentionBufferToFlushable(tag, buffer);
        }
    }

    private synchronized void moveRetentionBuffersToFlushable(boolean force) throws IOException {
        long expiredThreshold = System.currentTimeMillis() - (long)((Config)this.bufferConfig).getBufferRetentionTimeMillis();
        for (Map.Entry<String, RetentionBuffer> entry : this.retentionBuffers.entrySet()) {
            if (entry.getValue() == null || !force && entry.getValue().getLastUpdatedTimeMillis().get() >= expiredThreshold) continue;
            this.moveRetentionBufferToFlushable(entry.getKey(), entry.getValue());
        }
    }

    private synchronized void moveRetentionBufferToFlushable(String tag, RetentionBuffer buffer) throws IOException {
        try {
            LOG.trace("moveRetentionBufferToFlushable(): tag={}, buffer={}", (Object)tag, (Object)buffer);
            this.flushableBuffers.put(new TaggableBuffer(tag, buffer.getByteBuffer()));
            this.retentionBuffers.put(tag, null);
        }
        catch (InterruptedException e) {
            throw new IOException("Failed to move retention buffer due to interruption", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flushInternal(Sender sender) throws IOException {
        this.moveRetentionBuffersToFlushable(false);
        TaggableBuffer flushableBuffer = null;
        while ((flushableBuffer = this.flushableBuffers.poll()) != null) {
            try {
                this.allocatedSize.addAndGet(-flushableBuffer.getByteBuffer().capacity());
                ByteArrayOutputStream header = new ByteArrayOutputStream();
                MessagePacker messagePacker = MessagePack.newDefaultPacker((OutputStream)header);
                LOG.trace("flushInternal(): bufferUsage={}, flushableBuffer={}", (Object)Float.valueOf(this.getBufferUsage()), (Object)flushableBuffer);
                String tag = flushableBuffer.getTag();
                ByteBuffer byteBuffer = flushableBuffer.getByteBuffer();
                messagePacker.packArrayHeader(2);
                messagePacker.packString(tag);
                messagePacker.packRawStringHeader(byteBuffer.position());
                messagePacker.flush();
                Sender sender2 = sender;
                synchronized (sender2) {
                    sender.send(ByteBuffer.wrap(header.toByteArray()));
                    byteBuffer.flip();
                    sender.send(byteBuffer);
                }
            }
            catch (Throwable e) {
                try {
                    this.flushableBuffers.put(flushableBuffer);
                    this.allocatedSize.addAndGet(flushableBuffer.getByteBuffer().capacity());
                }
                catch (InterruptedException e1) {
                    LOG.error("Interrupted during restoring fetched flushableBuffer. It can be lost. flushableBuffer={}", (Object)flushableBuffer);
                }
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                throw new RuntimeException("Failed to send flushableBuffer to fluentd", e);
            }
        }
    }

    @Override
    public synchronized void closeInternal(Sender sender) throws IOException {
        this.moveRetentionBuffersToFlushable(true);
        this.retentionBuffers.clear();
        this.flush(sender);
    }

    public static class Config
    extends Buffer.Config<PackedForwardBuffer, Config> {
        private int initialBufferSize = 524288;
        private float bufferExpandRatio = 2.0f;
        private int bufferRetentionSize = 0x400000;
        private int bufferRetentionTimeMillis = 500;

        public int getInitialBufferSize() {
            return this.initialBufferSize;
        }

        public Config setInitialBufferSize(int initialBufferSize) {
            this.initialBufferSize = initialBufferSize;
            return this;
        }

        public float getBufferExpandRatio() {
            return this.bufferExpandRatio;
        }

        public Config setBufferExpandRatio(float bufferExpandRatio) {
            this.bufferExpandRatio = bufferExpandRatio;
            return this;
        }

        public int getBufferRetentionSize() {
            return this.bufferRetentionSize;
        }

        public Config setBufferRetentionSize(int bufferRetentionSize) {
            this.bufferRetentionSize = bufferRetentionSize;
            return this;
        }

        public int getBufferRetentionTimeMillis() {
            return this.bufferRetentionTimeMillis;
        }

        public Config setBufferRetentionTimeMillis(int bufferRetentionTimeMillis) {
            this.bufferRetentionTimeMillis = bufferRetentionTimeMillis;
            return this;
        }

        @Override
        public String toString() {
            return "Config{initialBufferSize=" + this.initialBufferSize + ", bufferExpandRatio=" + this.bufferExpandRatio + ", bufferRetentionSize=" + this.bufferRetentionSize + ", bufferRetentionTimeMillis=" + this.bufferRetentionTimeMillis + "} " + super.toString();
        }

        @Override
        public PackedForwardBuffer createInstance() {
            return new PackedForwardBuffer(this);
        }
    }

    private static class TaggableBuffer {
        private final String tag;
        private final ByteBuffer byteBuffer;

        public TaggableBuffer(String tag, ByteBuffer byteBuffer) {
            this.tag = tag;
            this.byteBuffer = byteBuffer;
        }

        public String getTag() {
            return this.tag;
        }

        public ByteBuffer getByteBuffer() {
            return this.byteBuffer;
        }

        public String toString() {
            return "TaggableBuffer{tag='" + this.tag + '\'' + ", byteBuffer=" + this.byteBuffer + '}';
        }
    }

    private static class RetentionBuffer {
        private final AtomicLong lastUpdatedTimeMillis = new AtomicLong();
        private final ByteBuffer byteBuffer;

        public RetentionBuffer(ByteBuffer byteBuffer) {
            this.byteBuffer = byteBuffer;
        }

        public AtomicLong getLastUpdatedTimeMillis() {
            return this.lastUpdatedTimeMillis;
        }

        public ByteBuffer getByteBuffer() {
            return this.byteBuffer;
        }

        public String toString() {
            return "ExpirableBuffer{lastUpdatedTimeMillis=" + this.lastUpdatedTimeMillis + ", byteBuffer=" + this.byteBuffer + '}';
        }
    }
}

