/*
 * Decompiled with CFR 0.152.
 */
package kafka.network;

import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.api.RequestKeys$;
import kafka.network.AbstractServerThread;
import kafka.network.BoundedByteBufferReceive;
import kafka.network.InvalidRequestException;
import kafka.network.Processor$;
import kafka.network.Receive;
import kafka.network.Send;
import kafka.network.SocketServerStats;
import kafka.utils.Time;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.ScalaObject;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005=b!C\u0001\u0003\t\u0003\u0005\t\u0011\u0001\u0003\u0007\u0005%\u0001&o\\2fgN|'O\u0003\u0002\u0004\t\u00059a.\u001a;x_J\\'\"A\u0003\u0002\u000b-\fgm[1\u0014\u0007\u000191\u0002\u0005\u0002\t\u00135\t!!\u0003\u0002\u000b\u0005\t!\u0012IY:ue\u0006\u001cGoU3sm\u0016\u0014H\u000b\u001b:fC\u0012\u0004\"\u0001D\b\u000e\u00035Q\u0011AD\u0001\u0006g\u000e\fG.Y\u0005\u0003!5\u00111bU2bY\u0006|%M[3di\"A!\u0003\u0001BC\u0002\u0013\u0005A#\u0001\biC:$G.\u001a:NCB\u0004\u0018N\\4\u0004\u0001U\tQ\u0003\u0005\u0002\u001739\u0011\u0001bF\u0005\u00031\t\tq\u0001S1oI2,'/\u0003\u0002\u001b7\tq\u0001*\u00198eY\u0016\u0014X*\u00199qS:<'B\u0001\r\u0003\u0011!i\u0002A!A!\u0002\u0013)\u0012a\u00045b]\u0012dWM]'baBLgn\u001a\u0011\t\u0011}\u0001!Q1A\u0005\u0002\u0001\nA\u0001^5nKV\t\u0011\u0005\u0005\u0002#K5\t1E\u0003\u0002%\t\u0005)Q\u000f^5mg&\u0011ae\t\u0002\u0005)&lW\r\u0003\u0005)\u0001\t\u0005\t\u0015!\u0003\"\u0003\u0015!\u0018.\\3!\u0011!Q\u0003A!b\u0001\n\u0003Y\u0013!B:uCR\u001cX#\u0001\u0017\u0011\u0005!i\u0013B\u0001\u0018\u0003\u0005E\u0019vnY6fiN+'O^3s'R\fGo\u001d\u0005\ta\u0001\u0011\t\u0011)A\u0005Y\u000511\u000f^1ug\u0002B\u0001B\r\u0001\u0003\u0006\u0004%\taM\u0001\u000f[\u0006D(+Z9vKN$8+\u001b>f+\u0005!\u0004C\u0001\u00076\u0013\t1TBA\u0002J]RD\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I\u0001N\u0001\u0010[\u0006D(+Z9vKN$8+\u001b>fA!)!\b\u0001C\u0001w\u00051A(\u001b8jiz\"R\u0001P\u001f?\u007f\u0001\u0003\"\u0001\u0003\u0001\t\u000bII\u0004\u0019A\u000b\t\u000b}I\u0004\u0019A\u0011\t\u000b)J\u0004\u0019\u0001\u0017\t\u000bIJ\u0004\u0019\u0001\u001b\t\u000f\t\u0003!\u0019!C\u0005\u0007\u0006qa.Z<D_:tWm\u0019;j_:\u001cX#\u0001#\u0011\u0007\u0015ce*D\u0001G\u0015\t9\u0005*\u0001\u0006d_:\u001cWO\u001d:f]RT!!\u0013&\u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0017\u0006!!.\u0019<b\u0013\tieIA\u000bD_:\u001cWO\u001d:f]Rd\u0015N\\6fIF+X-^3\u0011\u0005=#V\"\u0001)\u000b\u0005E\u0013\u0016\u0001C2iC:tW\r\\:\u000b\u0005MS\u0015a\u00018j_&\u0011Q\u000b\u0015\u0002\u000e'>\u001c7.\u001a;DQ\u0006tg.\u001a7\t\r]\u0003\u0001\u0015!\u0003E\u0003=qWm^\"p]:,7\r^5p]N\u0004\u0003bB-\u0001\u0005\u0004%IAW\u0001\u000ee\u0016\fX/Z:u\u0019><w-\u001a:\u0016\u0003m\u0003\"\u0001X2\u000e\u0003uS!AX0\u0002\u000b1|w\r\u000e6\u000b\u0005\u0001\f\u0017AB1qC\u000eDWMC\u0001c\u0003\ry'oZ\u0005\u0003Iv\u0013a\u0001T8hO\u0016\u0014\bB\u00024\u0001A\u0003%1,\u0001\bsKF,Xm\u001d;M_\u001e<WM\u001d\u0011\t\u000b!\u0004A\u0011I5\u0002\u0007I,h\u000eF\u0001k!\ta1.\u0003\u0002m\u001b\t!QK\\5u\u0011\u0015q\u0007\u0001\"\u0003p\u0003\u0015\u0019Gn\\:f)\tQ\u0007\u000fC\u0003r[\u0002\u0007!/A\u0002lKf\u0004\"aT:\n\u0005Q\u0004&\u0001D*fY\u0016\u001cG/[8o\u0017\u0016L\b\"\u0002<\u0001\t\u00039\u0018AB1dG\u0016\u0004H\u000f\u0006\u0002kq\")\u00110\u001ea\u0001\u001d\u0006i1o\\2lKR\u001c\u0005.\u00198oK2DQa\u001f\u0001\u0005\n%\fqcY8oM&<WO]3OK^\u001cuN\u001c8fGRLwN\\:\t\u000bu\u0004A\u0011\u0002@\u0002\r!\fg\u000e\u001a7f)\u0015y\u00181BA\u0007!\u0015a\u0011\u0011AA\u0003\u0013\r\t\u0019!\u0004\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007!\t9!C\u0002\u0002\n\t\u0011AaU3oI\")\u0011\u000f a\u0001e\"9\u0011q\u0002?A\u0002\u0005E\u0011a\u0002:fcV,7\u000f\u001e\t\u0004\u0011\u0005M\u0011bAA\u000b\u0005\t9!+Z2fSZ,\u0007bBA\r\u0001\u0011\u0005\u00111D\u0001\u0005e\u0016\fG\rF\u0002k\u0003;Aa!]A\f\u0001\u0004\u0011\bbBA\u0011\u0001\u0011\u0005\u00111E\u0001\u0006oJLG/\u001a\u000b\u0004U\u0006\u0015\u0002BB9\u0002 \u0001\u0007!\u000fC\u0004\u0002*\u0001!I!a\u000b\u0002\u0015\rD\u0017M\u001c8fY\u001a{'\u000fF\u0002O\u0003[Aa!]A\u0014\u0001\u0004\u0011\b")
public class Processor
extends AbstractServerThread
implements ScalaObject {
    private final Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping;
    private final Time time;
    private final SocketServerStats stats;
    private final int maxRequestSize;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections;
    private final Logger requestLogger;

    public Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping() {
        return this.handlerMapping;
    }

    public Time time() {
        return this.time;
    }

    public SocketServerStats stats() {
        return this.stats;
    }

    public int maxRequestSize() {
        return this.maxRequestSize;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    private Logger requestLogger() {
        return this.requestLogger;
    }

    /*
     * Exception decompiling
     */
    @Override
    public void run() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Back jump on a try block [egrp 0[TRYBLOCK] [0, 1, 2 : 81->159)] java.io.EOFException
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op02WithProcessedDataAndRefs.insertExceptionBlocks(Op02WithProcessedDataAndRefs.java:2283)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:415)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void close(SelectionKey key$1) {
        SocketChannel channel$1 = (SocketChannel)key$1.channel();
        if (this.logger().isDebugEnabled()) {
            this.logger().debug(new StringBuilder().append((Object)"Closing connection from ").append(channel$1.socket().getRemoteSocketAddress()).toString());
        }
        Utils$.MODULE$.swallow(new $anonfun$close$4(this), new $anonfun$close$1(this, channel$1));
        Utils$.MODULE$.swallow(new $anonfun$close$5(this), new $anonfun$close$2(this, channel$1));
        key$1.attach(null);
        Utils$.MODULE$.swallow(new $anonfun$close$6(this), new $anonfun$close$3(this, key$1));
    }

    public void accept(SocketChannel socketChannel) {
        this.newConnections().add(socketChannel);
        this.selector().wakeup();
    }

    private void configureNewConnections() {
        while (this.newConnections().size() > 0) {
            SocketChannel channel = this.newConnections().poll();
            if (this.logger().isDebugEnabled()) {
                this.logger().debug(new StringBuilder().append((Object)"Listening to new connection from ").append(channel.socket().getRemoteSocketAddress()).toString());
            }
            channel.register(this.selector(), 1);
        }
    }

    private Option<Send> handle(SelectionKey key, Receive request) {
        Function1<Receive, Option<Send>> handler;
        short requestTypeId = request.buffer().getShort();
        if (this.requestLogger().isTraceEnabled()) {
            short s2 = requestTypeId;
            if (s2 == RequestKeys$.MODULE$.Produce()) {
                this.requestLogger().trace(new StringBuilder().append((Object)"Handling produce request from ").append(this.channelFor(key).socket().getRemoteSocketAddress()).toString());
            } else if (s2 == RequestKeys$.MODULE$.Fetch()) {
                this.requestLogger().trace(new StringBuilder().append((Object)"Handling fetch request from ").append(this.channelFor(key).socket().getRemoteSocketAddress()).toString());
            } else if (s2 == RequestKeys$.MODULE$.MultiFetch()) {
                this.requestLogger().trace(new StringBuilder().append((Object)"Handling multi-fetch request from ").append(this.channelFor(key).socket().getRemoteSocketAddress()).toString());
            } else if (s2 == RequestKeys$.MODULE$.MultiProduce()) {
                this.requestLogger().trace(new StringBuilder().append((Object)"Handling multi-produce request from ").append(this.channelFor(key).socket().getRemoteSocketAddress()).toString());
            } else if (s2 == RequestKeys$.MODULE$.Offsets()) {
                this.requestLogger().trace(new StringBuilder().append((Object)"Handling offset request from ").append(this.channelFor(key).socket().getRemoteSocketAddress()).toString());
            } else {
                throw new InvalidRequestException(new StringBuilder().append((Object)"No mapping found for handler id ").append(BoxesRunTime.boxToShort(requestTypeId)).toString());
            }
        }
        if ((handler = this.handlerMapping().apply(BoxesRunTime.boxToShort(requestTypeId), request)) == null) {
            throw new InvalidRequestException("No handler found for request");
        }
        long start = this.time().nanoseconds();
        Option<Send> maybeSend = handler.apply(request);
        this.stats().recordRequest(requestTypeId, this.time().nanoseconds() - start);
        return maybeSend;
    }

    public void read(SelectionKey key) {
        Object object;
        SocketChannel socketChannel = this.channelFor(key);
        Receive request = (Receive)key.attachment();
        if (key.attachment() == null) {
            request = new BoundedByteBufferReceive(this.maxRequestSize());
            object = key.attach(request);
        } else {
            object = BoxedUnit.UNIT;
        }
        int read2 = request.readFrom(socketChannel);
        this.stats().recordBytesRead(read2);
        if (this.logger().isTraceEnabled()) {
            this.logger().trace(new StringBuilder().append(read2).append((Object)" bytes read from ").append(socketChannel.socket().getRemoteSocketAddress()).toString());
        }
        if (read2 < 0) {
            this.close(key);
            return;
        }
        if (request.complete()) {
            Option<Send> maybeResponse = this.handle(key, request);
            key.attach(null);
            if (maybeResponse.isDefined()) {
                key.attach(maybeResponse.getOrElse(new $anonfun$read$1(this)));
                key.interestOps(4);
            }
        } else {
            key.interestOps(1);
            this.selector().wakeup();
        }
    }

    public void write(SelectionKey key) {
        Send response = (Send)key.attachment();
        SocketChannel socketChannel = this.channelFor(key);
        int written = response.writeTo(socketChannel);
        this.stats().recordBytesWritten(written);
        if (this.logger().isTraceEnabled()) {
            this.logger().trace(new StringBuilder().append(written).append((Object)" bytes written to ").append(socketChannel.socket().getRemoteSocketAddress()).toString());
        }
        if (response.complete()) {
            key.attach(null);
            key.interestOps(1);
        } else {
            key.interestOps(4);
            this.selector().wakeup();
        }
    }

    private SocketChannel channelFor(SelectionKey key) {
        return (SocketChannel)key.channel();
    }

    public Processor(Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping, Time time, SocketServerStats stats, int maxRequestSize) {
        this.handlerMapping = handlerMapping;
        this.time = time;
        this.stats = stats;
        this.maxRequestSize = maxRequestSize;
        this.newConnections = new ConcurrentLinkedQueue();
        this.requestLogger = Logger.getLogger("kafka.request.logger");
    }
}

