/*
 * Decompiled with CFR 0.152.
 */
package org.komamitsu.fluency.flusher;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.flusher.Flusher;
import org.komamitsu.fluency.sender.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncFlusher
extends Flusher {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncFlusher.class);
    private final BlockingQueue<Boolean> waitQueue = new LinkedBlockingQueue<Boolean>();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final Runnable task = new Runnable(){

        @Override
        public void run() {
            while (!AsyncFlusher.this.executorService.isShutdown()) {
                try {
                    Boolean ignorable = (Boolean)AsyncFlusher.this.waitQueue.poll(AsyncFlusher.this.flusherConfig.getFlushIntervalMillis(), TimeUnit.MILLISECONDS);
                    AsyncFlusher.this.buffer.flush(AsyncFlusher.this.sender);
                    AsyncFlusher.this.waitQueue.clear();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (IOException e) {
                    LOG.error("Failed to flush", (Throwable)e);
                }
            }
            AsyncFlusher.this.closeBuffer();
        }
    };

    private AsyncFlusher(Buffer buffer, Sender sender, Config flusherConfig) {
        super(buffer, sender, flusherConfig);
        this.executorService.execute(this.task);
    }

    @Override
    protected void flushInternal(boolean force) throws IOException {
        if (force) {
            try {
                this.waitQueue.put(true);
            }
            catch (InterruptedException e) {
                LOG.warn("Failed to wake up the flushing thread", (Throwable)e);
            }
        }
    }

    @Override
    protected void closeInternal() throws IOException {
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("1st awaitTermination was interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        if (!this.executorService.isTerminated()) {
            this.executorService.shutdownNow();
        }
    }

    public static class Config
    extends Flusher.Config<AsyncFlusher> {
        @Override
        public AsyncFlusher createInstance(Buffer buffer, Sender sender) {
            return new AsyncFlusher(buffer, sender, this);
        }
    }
}

