/*
 * Decompiled with CFR 0.152.
 */
package org.xsocket.connection;

import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xsocket.DataConverter;
import org.xsocket.connection.AbstractMemoryManager;
import org.xsocket.connection.ConnectionUtils;
import org.xsocket.connection.IIoHandlerCallback;
import org.xsocket.connection.IoChainableHandler;
import org.xsocket.connection.IoProvider;
import org.xsocket.connection.IoQueue;
import org.xsocket.connection.IoSocketDispatcher;
import org.xsocket.connection.IoUnsynchronizedMemoryManager;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
final class IoSocketHandler
extends IoChainableHandler {
    private static final Logger LOG = Logger.getLogger(IoSocketHandler.class.getName());
    private static final int MAXSIZE_LOG_READ = 2000;
    private static final Map<String, Class> SUPPORTED_OPTIONS = new HashMap<String, Class>();
    private boolean isConnected = false;
    private boolean isLogicalClosed = false;
    private boolean isDisconnect = false;
    private boolean isDetached = true;
    private final SocketChannel channel;
    private IoSocketDispatcher dispatcher;
    private AbstractMemoryManager memoryManager;
    private final IoQueue sendQueue = new IoQueue();
    private int maxChunkSize = 0;
    private IWriteTask pendingWriteTask = null;
    private final String id;
    private boolean isRetryRead = true;
    private long openTime = -1L;
    private long lastTimeReceivedMillis = System.currentTimeMillis();
    private long lastTimeSentMillis = System.currentTimeMillis();
    private long receivedBytes = 0L;
    private long sendBytes = 0L;

    IoSocketHandler(SocketChannel channel, IoSocketDispatcher dispatcher, String connectionId) throws IOException {
        super(null);
        assert (channel != null);
        this.channel = channel;
        this.openTime = System.currentTimeMillis();
        channel.configureBlocking(false);
        this.dispatcher = dispatcher;
        this.id = connectionId;
        this.maxChunkSize = channel.socket().getSendBufferSize();
    }

    @Override
    public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
        this.setPreviousCallback(callbackHandler);
        while (!this.getChannel().finishConnect()) {
            this.getChannel().configureBlocking(true);
            this.getChannel().finishConnect();
            this.getChannel().configureBlocking(false);
        }
        this.updateDispatcher(this.dispatcher);
    }

    private void updateDispatcher(IoSocketDispatcher dispatcher) throws IOException {
        this.dispatcher = dispatcher;
        dispatcher.register(this, 1);
    }

    @Override
    public void setRetryRead(boolean isRetryRead) {
        this.isRetryRead = isRetryRead;
    }

    @Override
    public boolean reset() {
        try {
            this.sendQueue.drain();
            this.resumeRead();
            return super.reset();
        }
        catch (Exception e) {
            return false;
        }
    }

    void setMemoryManager(AbstractMemoryManager memoryManager) {
        this.memoryManager = memoryManager;
    }

    boolean isDetached() {
        return this.isDetached;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public int getPendingWriteDataSize() {
        return this.sendQueue.getSize() + super.getPendingWriteDataSize();
    }

    @Override
    public boolean hasDataToSend() {
        return !this.sendQueue.isEmpty();
    }

    @Override
    public boolean isSecure() {
        return false;
    }

    @Override
    public void setOption(String name, Object value) throws IOException {
        IoProvider.setOption(this.channel.socket(), name, value);
    }

    @Override
    public Object getOption(String name) throws IOException {
        return IoProvider.getOption(this.channel.socket(), name);
    }

    @Override
    public Map<String, Class> getOptions() {
        return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
    }

    void checkConnection() {
        if (!this.channel.isOpen()) {
            this.getPreviousCallback().onConnectionAbnormalTerminated();
        }
    }

    void setDetached(boolean isDetached) {
        this.isDetached = isDetached;
    }

    void onRegisteredEvent() throws IOException {
        block3: {
            if (!this.isConnected) {
                this.isConnected = true;
                try {
                    this.getPreviousCallback().onConnect();
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) break block3;
                    LOG.fine("error occured by performing onConnect " + this.id + " reason: " + e.toString());
                }
            }
        }
    }

    void onRegisteredFailedEvent(IOException ioe) throws IOException {
        block3: {
            if (!this.isConnected) {
                this.isConnected = true;
                try {
                    this.getPreviousCallback().onConnectException(ioe);
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) break block3;
                    LOG.fine("error occured by performing onConnectException " + this.id + " reason: " + e.toString());
                }
            }
        }
    }

    long onReadableEvent() throws IOException {
        assert (Thread.currentThread().getName().startsWith("xDispatcher")) : "receiveQueue can only be accessed by the dispatcher thread";
        long read = 0L;
        ByteBuffer[] received = this.readSocket();
        if (received != null) {
            int size = 0;
            for (ByteBuffer byteBuffer : received) {
                size += byteBuffer.remaining();
            }
            read += (long)size;
            this.getPreviousCallback().onData(received, size);
            this.getPreviousCallback().onPostData();
        }
        this.checkPreallocatedReadMemory();
        return read;
    }

    void onDirectUnregisteredWriteEvent() throws IOException {
        this.onWriteableEvent(false);
    }

    void onWriteableEvent() throws IOException {
        this.onWriteableEvent(true);
    }

    private void onWriteableEvent(boolean isUnsetWriteSelector) throws IOException {
        assert (ConnectionUtils.isDispatcherThread());
        boolean isMoreDateToWrite = this.writeSocket();
        if (isMoreDateToWrite) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("[" + this.id + "] remaining data to send. initiate sending of the remaining (" + DataConverter.toFormatedBytesSize(this.sendQueue.getSize()) + ")");
            }
            this.dispatcher.initializeWrite(this, false, false);
        } else {
            if (isUnsetWriteSelector) {
                this.dispatcher.unsetWriteSelectionKeyNow(this, true);
            }
            if (this.isLogicalClosed) {
                this.realClose();
            }
        }
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.finest("[" + this.getId() + "] writeable event handled");
        }
    }

    @Override
    public void write(ByteBuffer[] buffers) throws ClosedChannelException, IOException {
        this.addToWriteQueue(buffers);
    }

    void incSentBytes(int addSize) {
        this.lastTimeSentMillis = System.currentTimeMillis();
        this.sendBytes += (long)addSize;
        this.dispatcher.incSentBytes(addSize);
    }

    @Override
    public void flush() throws IOException {
        if (!this.sendQueue.isEmpty()) {
            this.dispatcher.initializeWrite(this, true, true);
        }
    }

    @Override
    public void addToWriteQueue(ByteBuffer[] buffers) {
        if (buffers != null) {
            this.sendQueue.append(buffers);
        }
    }

    @Override
    public void close(boolean immediate) throws IOException {
        if (immediate || this.sendQueue.isEmpty()) {
            this.realClose();
        } else {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("postpone close until remaning data to write (" + this.sendQueue.getSize() + ") has been written");
            }
            this.isLogicalClosed = true;
            this.dispatcher.initializeWrite(this, true, true);
        }
    }

    void closeSilence(boolean immediate) {
        block2: {
            try {
                this.close(immediate);
            }
            catch (IOException ioe) {
                if (!LOG.isLoggable(Level.FINE)) break block2;
                LOG.fine("error occured by closing " + this.getId() + " " + DataConverter.toString(ioe));
            }
        }
    }

    private void realClose() {
        block10: {
            block9: {
                block8: {
                    try {
                        this.dispatcher.deregister(this);
                    }
                    catch (Exception e) {
                        if (!LOG.isLoggable(Level.FINE)) break block8;
                        LOG.fine("error occured by deregistering connection " + this.id + " on dispatcher. reason: " + e.toString());
                    }
                }
                try {
                    this.channel.close();
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("connection " + this.id + " has been closed");
                    }
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) break block9;
                    LOG.fine("error occured by closing connection " + this.id + " reason: " + e.toString());
                }
            }
            if (!this.isDisconnect) {
                this.isDisconnect = true;
                try {
                    this.getPreviousCallback().onDisconnect();
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) break block10;
                    LOG.fine("error occured by calling onDisconnect " + this.id + " reason: " + e.toString());
                }
            }
        }
    }

    @Override
    public boolean isOpen() {
        return this.channel.isOpen();
    }

    public SocketChannel getChannel() {
        return this.channel;
    }

    @Override
    public void suspendRead() throws IOException {
        IoSocketDispatcher.setBypassingWriteAllowed(false);
        this.dispatcher.suspendRead(this);
    }

    @Override
    public void resumeRead() throws IOException {
        this.dispatcher.resumeRead(this);
    }

    @Override
    public boolean isReadSuspended() {
        return !this.dispatcher.isReadable(this);
    }

    private ByteBuffer[] readSocket() throws IOException {
        assert (Thread.currentThread().getName().startsWith("xDispatcher")) : "receiveQueue can only be accessed by the dispatcher thread";
        ByteBuffer[] received = null;
        int read = 0;
        this.lastTimeReceivedMillis = System.currentTimeMillis();
        if (this.isOpen()) {
            assert (this.memoryManager instanceof IoUnsynchronizedMemoryManager);
            ByteBuffer readBuffer = this.memoryManager.acquireMemoryStandardSizeOrPreallocated(8192);
            int pos = readBuffer.position();
            int limit = readBuffer.limit();
            try {
                read = this.channel.read(readBuffer);
            }
            catch (IOException ioe) {
                readBuffer.position(pos);
                readBuffer.limit(limit);
                this.memoryManager.recycleMemory(readBuffer);
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("[" + this.id + "] error occured while reading channel: " + ioe.toString());
                }
                throw ioe;
            }
            switch (read) {
                case -1: {
                    this.memoryManager.recycleMemory(readBuffer);
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("[" + this.id + "] channel has reached end-of-stream (maybe closed by peer)");
                    }
                    ClosedChannelException cce = new ClosedChannelException();
                    throw cce;
                }
                case 0: {
                    this.memoryManager.recycleMemory(readBuffer);
                    return null;
                }
            }
            int remainingFreeSize = readBuffer.remaining();
            ByteBuffer dataBuffer = this.memoryManager.extractAndRecycleMemory(readBuffer, read);
            received = new ByteBuffer[]{dataBuffer};
            this.receivedBytes += (long)read;
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("[" + this.id + "] received (" + (dataBuffer.limit() - dataBuffer.position()) + " bytes, total " + (this.receivedBytes + (long)read) + " bytes): " + DataConverter.toTextOrHexString(new ByteBuffer[]{dataBuffer.duplicate()}, "UTF-8", 2000));
            }
            if (remainingFreeSize == 0 && this.isRetryRead) {
                if (read < this.memoryManager.getPreallocationBufferSize()) {
                    ByteBuffer[] repeatedReceived;
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("[" + this.id + "] complete read buffer has been used, initiating repeated read");
                    }
                    if ((repeatedReceived = this.readSocket()) != null) {
                        ByteBuffer[] newReceived = new ByteBuffer[received.length + 1];
                        newReceived[0] = dataBuffer;
                        System.arraycopy(repeatedReceived, 0, newReceived, 1, repeatedReceived.length);
                        received = newReceived;
                        return received;
                    }
                    return received;
                }
                return received;
            }
            return received;
        }
        return null;
    }

    private void checkPreallocatedReadMemory() throws IOException {
        assert (Thread.currentThread().getName().startsWith("xDispatcher"));
        this.memoryManager.preallocate();
    }

    private boolean writeSocket() throws IOException {
        if (this.isOpen()) {
            IWriteTask writeTask = null;
            writeTask = this.pendingWriteTask != null ? this.pendingWriteTask : TaskFactory.newTask(this.sendQueue, this.maxChunkSize);
            IWriteResult result = writeTask.write(this);
            if (result.isAllWritten()) {
                this.pendingWriteTask = null;
                result.notifyWriteCallback();
                return !this.sendQueue.isEmpty();
            }
            this.pendingWriteTask = writeTask;
            return true;
        }
        if (LOG.isLoggable(Level.FINEST)) {
            if (!this.isOpen()) {
                LOG.finest("[" + this.getId() + "] couldn't write send queue to socket because socket is already closed (sendQueuesize=" + DataConverter.toFormatedBytesSize(this.sendQueue.getSize()) + ")");
            }
            if (this.sendQueue.isEmpty()) {
                LOG.finest("[" + this.getId() + "] nothing to write, because send queue is empty ");
            }
        }
        return false;
    }

    @Override
    public InetAddress getLocalAddress() {
        return this.channel.socket().getLocalAddress();
    }

    @Override
    public int getLocalPort() {
        return this.channel.socket().getLocalPort();
    }

    @Override
    public InetAddress getRemoteAddress() {
        InetAddress addr = this.channel.socket().getInetAddress();
        return addr;
    }

    @Override
    public int getRemotePort() {
        return this.channel.socket().getPort();
    }

    @Override
    public long getLastTimeReceivedMillis() {
        return this.lastTimeReceivedMillis;
    }

    @Override
    public long getLastTimeSendMillis() {
        return this.lastTimeSentMillis;
    }

    @Override
    public long getNumberOfReceivedBytes() {
        return this.receivedBytes;
    }

    @Override
    public long getNumberOfSendBytes() {
        return this.sendBytes;
    }

    @Override
    public String getRegisteredOpsInfo() {
        return this.dispatcher.getRegisteredOpsInfo(this);
    }

    @Override
    public void hardFlush() throws IOException {
        this.flush();
    }

    @Override
    public String toString() {
        try {
            SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss,S");
            StringBuilder sb = new StringBuilder();
            sb.append("(" + this.channel.socket().getInetAddress().toString() + ":" + this.channel.socket().getPort() + " -> " + this.channel.socket().getLocalAddress().toString() + ":" + this.channel.socket().getLocalPort() + ")");
            if (this.isReadSuspended()) {
                sb.append(" SUSPENDED");
            }
            sb.append(" received=" + DataConverter.toFormatedBytesSize(this.receivedBytes) + ", sent=" + DataConverter.toFormatedBytesSize(this.sendBytes) + ", age=" + DataConverter.toFormatedDuration(System.currentTimeMillis() - this.openTime) + ", lastReceived=" + df.format(new Date(this.lastTimeReceivedMillis)) + ", sendQueueSize=" + DataConverter.toFormatedBytesSize(this.sendQueue.getSize()) + " [" + this.id + "]");
            return sb.toString();
        }
        catch (Throwable e) {
            return super.toString();
        }
    }

    static {
        SUPPORTED_OPTIONS.put("SOL_SOCKET.SO_RCVBUF", Integer.class);
        SUPPORTED_OPTIONS.put("SOL_SOCKET.SO_SNDBUF", Integer.class);
        SUPPORTED_OPTIONS.put("SOL_SOCKET.SO_REUSEADDR", Boolean.class);
        SUPPORTED_OPTIONS.put("SOL_SOCKET.SO_KEEPALIVE", Boolean.class);
        SUPPORTED_OPTIONS.put("IPPROTO_TCP.TCP_NODELAY", Boolean.class);
        SUPPORTED_OPTIONS.put("SOL_SOCKET.SO_LINGER", Integer.class);
    }

    private static final class MultiBufferWriteResult
    implements IWriteResult {
        private final ByteBuffer[] buffers;
        private final IoSocketHandler handler;

        public MultiBufferWriteResult(IoSocketHandler handler, ByteBuffer ... buffers) {
            this.buffers = buffers;
            this.handler = handler;
        }

        public boolean isAllWritten() {
            return true;
        }

        public void notifyWriteCallback() {
            for (ByteBuffer buffer : this.buffers) {
                try {
                    this.handler.getPreviousCallback().onWritten(buffer);
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) continue;
                    LOG.fine("error occured by performing onWritten callback " + e.toString());
                }
            }
        }
    }

    private static final class SingleBufferWriteResult
    implements IWriteResult {
        private final ByteBuffer buffer;
        private final IoSocketHandler handler;

        public SingleBufferWriteResult(IoSocketHandler handler, ByteBuffer buffer) {
            this.buffer = buffer;
            this.handler = handler;
        }

        public boolean isAllWritten() {
            return true;
        }

        public void notifyWriteCallback() {
            block2: {
                try {
                    this.handler.getPreviousCallback().onWritten(this.buffer);
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) break block2;
                    LOG.fine("error occured by performing onWritten callback " + e.toString());
                }
            }
        }
    }

    private static final class ErrorWriteResult
    implements IWriteResult {
        private final IOException ioe;
        private final ByteBuffer[] buffers;
        private final IoSocketHandler handler;

        public ErrorWriteResult(IoSocketHandler handler, IOException ioe, ByteBuffer ... buffers) {
            this.buffers = buffers;
            this.ioe = ioe;
            this.handler = handler;
        }

        public boolean isAllWritten() {
            return true;
        }

        public void notifyWriteCallback() {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("error " + this.ioe.toString() + " occured by writing " + this.ioe.toString());
            }
            for (ByteBuffer buffer : this.buffers) {
                try {
                    this.handler.getPreviousCallback().onWriteException(this.ioe, buffer);
                }
                catch (Exception e) {
                    if (!LOG.isLoggable(Level.FINE)) continue;
                    LOG.fine("error occured by notifying that write exception (" + e.toString() + ") has been occured " + e.toString());
                }
            }
        }
    }

    private static final class EmptyWriteResult
    implements IWriteResult {
        private EmptyWriteResult() {
        }

        public boolean isAllWritten() {
            return true;
        }

        public void notifyWriteCallback() {
        }
    }

    private static final class IncompleteWriteResult
    implements IWriteResult {
        private IncompleteWriteResult() {
        }

        public boolean isAllWritten() {
            return false;
        }

        public void notifyWriteCallback() {
        }
    }

    private static interface IWriteResult {
        public boolean isAllWritten();

        public void notifyWriteCallback();
    }

    private static final class DirectWriteTask
    implements IWriteTask {
        private ByteBuffer bufferToWrite = null;

        private DirectWriteTask() {
        }

        boolean addData(ByteBuffer bufferToWrite) {
            if (bufferToWrite.hasRemaining()) {
                this.bufferToWrite = bufferToWrite;
                return true;
            }
            return false;
        }

        public IWriteResult write(IoSocketHandler handler) throws IOException {
            try {
                int written = handler.channel.write(this.bufferToWrite);
                handler.incSentBytes(written);
                if (!this.bufferToWrite.hasRemaining()) {
                    SingleBufferWriteResult writeResult = new SingleBufferWriteResult(handler, this.bufferToWrite);
                    this.release();
                    return writeResult;
                }
                return INCOMPLETE_WRITE_RESULT;
            }
            catch (IOException ioe) {
                return new ErrorWriteResult(handler, ioe, this.bufferToWrite);
            }
        }

        private void release() {
            this.bufferToWrite = null;
            TaskFactory.reuseWriteProcessor(this);
        }
    }

    private static final class MergingWriteTask
    implements IWriteTask {
        private int writeBufferSize = 8192;
        private ByteBuffer writeBuffer = ByteBuffer.allocateDirect(this.writeBufferSize);
        private ByteBuffer[] buffersToWrite = null;

        private MergingWriteTask() {
        }

        boolean addData(ByteBuffer[] buffers, int maxChunkSize) {
            this.buffersToWrite = buffers;
            if (this.writeBufferSize < maxChunkSize) {
                this.writeBufferSize = maxChunkSize;
                this.writeBuffer = ByteBuffer.allocateDirect(this.writeBufferSize);
            }
            int countBuffers = this.buffersToWrite.length;
            for (int i = 0; i < countBuffers; ++i) {
                this.writeBuffer.put(this.buffersToWrite[i]);
            }
            if (this.writeBuffer.position() == 0) {
                return false;
            }
            this.writeBuffer.flip();
            return true;
        }

        public IWriteResult write(IoSocketHandler handler) throws IOException {
            try {
                int written = handler.channel.write(this.writeBuffer);
                handler.incSentBytes(written);
                if (!this.writeBuffer.hasRemaining()) {
                    MultiBufferWriteResult writeResult = new MultiBufferWriteResult(handler, this.buffersToWrite);
                    this.release();
                    return writeResult;
                }
                return INCOMPLETE_WRITE_RESULT;
            }
            catch (IOException ioe) {
                return new ErrorWriteResult(handler, ioe, this.buffersToWrite);
            }
        }

        private void release() {
            this.buffersToWrite = null;
            this.writeBuffer.clear();
            TaskFactory.reuseWriteProcessor(this);
        }
    }

    private static final class EmptyWriteTask
    implements IWriteTask {
        private EmptyWriteTask() {
        }

        public IWriteResult write(IoSocketHandler handler) throws IOException {
            return EMPTY_WRITE_RESULT;
        }
    }

    private static interface IWriteTask {
        public static final IncompleteWriteResult INCOMPLETE_WRITE_RESULT = new IncompleteWriteResult();
        public static final IWriteResult EMPTY_WRITE_RESULT = new EmptyWriteResult();

        public IWriteResult write(IoSocketHandler var1) throws IOException;
    }

    private static final class TaskFactory {
        private static final EmptyWriteTask EMPTY_WRITE_TASK = new EmptyWriteTask();
        private static ThreadLocal<MergingWriteTask> freeMergingWriteTaskThreadLocal = new ThreadLocal();
        private static ThreadLocal<DirectWriteTask> freeDirectWriteTaskThreadLocal = new ThreadLocal();

        private TaskFactory() {
        }

        static IWriteTask newTask(IoQueue sendQueue, int maxChunkSize) {
            ByteBuffer[] buffersToWrite = sendQueue.drain(maxChunkSize);
            if (buffersToWrite == null) {
                return TaskFactory.createEmptyWriteTask();
            }
            if (buffersToWrite.length > 1) {
                MergingWriteTask mergingWriteTask = TaskFactory.createMergingWriteTask();
                boolean dataToWrite = mergingWriteTask.addData(buffersToWrite, maxChunkSize);
                if (dataToWrite) {
                    return mergingWriteTask;
                }
                return TaskFactory.createEmptyWriteTask();
            }
            DirectWriteTask directWriteTask = TaskFactory.createDirectWriteTask();
            boolean dataToWrite = directWriteTask.addData(buffersToWrite[0]);
            if (dataToWrite) {
                return directWriteTask;
            }
            return TaskFactory.createEmptyWriteTask();
        }

        static EmptyWriteTask createEmptyWriteTask() {
            return EMPTY_WRITE_TASK;
        }

        static MergingWriteTask createMergingWriteTask() {
            assert (ConnectionUtils.isDispatcherThread());
            MergingWriteTask meringWriteTask = freeMergingWriteTaskThreadLocal.get();
            if (meringWriteTask != null) {
                freeMergingWriteTaskThreadLocal.remove();
            } else {
                meringWriteTask = new MergingWriteTask();
            }
            return meringWriteTask;
        }

        static DirectWriteTask createDirectWriteTask() {
            assert (ConnectionUtils.isDispatcherThread());
            DirectWriteTask directWriteTask = freeDirectWriteTaskThreadLocal.get();
            if (directWriteTask != null) {
                freeDirectWriteTaskThreadLocal.remove();
            } else {
                directWriteTask = new DirectWriteTask();
            }
            return directWriteTask;
        }

        static void reuseWriteProcessor(MergingWriteTask meringWriteProcessor) {
            assert (ConnectionUtils.isDispatcherThread());
            freeMergingWriteTaskThreadLocal.set(meringWriteProcessor);
        }

        static void reuseWriteProcessor(DirectWriteTask directWriteProcessor) {
            assert (ConnectionUtils.isDispatcherThread());
            freeDirectWriteTaskThreadLocal.set(directWriteProcessor);
        }
    }
}

