/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.security.authenticator;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.ReauthenticationContext;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class SaslClientAuthenticator
implements Authenticator {
    private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
    private static final Random RNG = new Random();
    public static final int MAX_RESERVED_CORRELATION_ID = Integer.MAX_VALUE;
    public static final int MIN_RESERVED_CORRELATION_ID = 0x7FFFFFF8;
    private final Subject subject;
    private final String servicePrincipal;
    private final String host;
    private final String node;
    private final String mechanism;
    private final TransportLayer transportLayer;
    private final SaslClient saslClient;
    private final Map<String, ?> configs;
    private final String clientPrincipalName;
    private final AuthenticateCallbackHandler callbackHandler;
    private final Time time;
    private final Logger log;
    private final ReauthInfo reauthInfo;
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;
    private SaslState saslState;
    private SaslState pendingSaslState;
    private int correlationId;
    private RequestHeader currentRequestHeader;
    private short saslAuthenticateVersion;
    private short saslHandshakeVersion;

    public static boolean isReserved(int correlationId) {
        return correlationId >= 0x7FFFFFF8;
    }

    public SaslClientAuthenticator(Map<String, ?> configs, AuthenticateCallbackHandler callbackHandler, String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable, TransportLayer transportLayer, Time time, LogContext logContext) {
        this.node = node;
        this.subject = subject;
        this.callbackHandler = callbackHandler;
        this.host = host;
        this.servicePrincipal = servicePrincipal;
        this.mechanism = mechanism;
        this.correlationId = 0;
        this.transportLayer = transportLayer;
        this.configs = configs;
        this.saslAuthenticateVersion = (short)-1;
        this.time = time;
        this.log = logContext.logger(this.getClass());
        this.reauthInfo = new ReauthInfo();
        try {
            this.setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL);
            this.clientPrincipalName = mechanism.equals("GSSAPI") ? SaslClientAuthenticator.firstPrincipal(subject) : null;
            this.saslClient = this.createSaslClient();
        }
        catch (Exception e) {
            throw new SaslAuthenticationException("Failed to configure SaslClientAuthenticator", e);
        }
    }

    SaslClient createSaslClient() {
        try {
            return Subject.doAs(this.subject, () -> {
                Object[] mechs = new String[]{this.mechanism};
                this.log.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}", this.clientPrincipalName, this.servicePrincipal, this.host, Arrays.toString(mechs));
                SaslClient retvalSaslClient = Sasl.createSaslClient((String[])mechs, this.clientPrincipalName, this.servicePrincipal, this.host, this.configs, this.callbackHandler);
                if (retvalSaslClient == null) {
                    throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + this.mechanism);
                }
                return retvalSaslClient;
            });
        }
        catch (PrivilegedActionException e) {
            throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + this.mechanism, e.getCause());
        }
    }

    @Override
    public void authenticate() throws IOException {
        if (this.netOutBuffer != null && !this.flushNetOutBufferAndUpdateInterestOps()) {
            return;
        }
        switch (this.saslState) {
            case SEND_APIVERSIONS_REQUEST: {
                ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build((short)0);
                this.send(apiVersionsRequest.toSend(this.node, this.nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version())));
                this.setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE);
                break;
            }
            case RECEIVE_APIVERSIONS_RESPONSE: {
                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse)this.receiveKafkaResponse();
                if (apiVersionsResponse == null) break;
                this.setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
                this.reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse;
                this.setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
            }
            case SEND_HANDSHAKE_REQUEST: {
                this.sendHandshakeRequest(this.saslHandshakeVersion);
                this.setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                break;
            }
            case RECEIVE_HANDSHAKE_RESPONSE: {
                SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse)this.receiveKafkaResponse();
                if (handshakeResponse == null) break;
                this.handleSaslHandshakeResponse(handshakeResponse);
                this.setSaslState(SaslState.INITIAL);
            }
            case INITIAL: {
                this.sendInitialToken();
                this.setSaslState(SaslState.INTERMEDIATE);
                break;
            }
            case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE: {
                this.setSaslAuthenticateAndHandshakeVersions(this.reauthInfo.apiVersionsResponseFromOriginalAuthentication);
                this.setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST);
            }
            case REAUTH_SEND_HANDSHAKE_REQUEST: {
                this.sendHandshakeRequest(this.saslHandshakeVersion);
                this.setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
                break;
            }
            case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE: {
                SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse)this.receiveKafkaResponse();
                if (handshakeResponse == null) break;
                this.handleSaslHandshakeResponse(handshakeResponse);
                this.setSaslState(SaslState.REAUTH_INITIAL);
            }
            case REAUTH_INITIAL: {
                this.sendInitialToken();
                this.setSaslState(SaslState.INTERMEDIATE);
                break;
            }
            case INTERMEDIATE: {
                boolean noResponsesPending;
                byte[] serverToken = this.receiveToken();
                boolean bl = noResponsesPending = serverToken != null && !this.sendSaslClientToken(serverToken, false);
                if (!this.saslClient.isComplete()) break;
                if (this.saslAuthenticateVersion == -1 || noResponsesPending) {
                    this.setSaslState(SaslState.COMPLETE);
                    break;
                }
                this.setSaslState(SaslState.CLIENT_COMPLETE);
                break;
            }
            case CLIENT_COMPLETE: {
                byte[] serverResponse = this.receiveToken();
                if (serverResponse == null) break;
                this.setSaslState(SaslState.COMPLETE);
                break;
            }
            case COMPLETE: {
                break;
            }
            case FAILED: {
                throw new IllegalStateException("SASL handshake has already failed");
            }
        }
    }

    private void sendHandshakeRequest(short version) throws IOException {
        SaslHandshakeRequest handshakeRequest = this.createSaslHandshakeRequest(version);
        this.send(handshakeRequest.toSend(this.node, this.nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
    }

    private void sendInitialToken() throws IOException {
        this.sendSaslClientToken(new byte[0], true);
    }

    @Override
    public void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
        NetworkReceive netInBufferFromChannel;
        SaslClientAuthenticator previousSaslClientAuthenticator = (SaslClientAuthenticator)Objects.requireNonNull(reauthenticationContext).previousAuthenticator();
        ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication = previousSaslClientAuthenticator.reauthInfo.apiVersionsResponse();
        previousSaslClientAuthenticator.close();
        this.reauthInfo.reauthenticating(apiVersionsResponseFromOriginalAuthentication, reauthenticationContext.reauthenticationBeginNanos());
        this.netInBuffer = netInBufferFromChannel = reauthenticationContext.networkReceive();
        this.setSaslState(SaslState.REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE);
        this.authenticate();
    }

    @Override
    public Optional<NetworkReceive> pollResponseReceivedDuringReauthentication() {
        return this.reauthInfo.pollResponseReceivedDuringReauthentication();
    }

    @Override
    public Long clientSessionReauthenticationTimeNanos() {
        return this.reauthInfo.clientSessionReauthenticationTimeNanos;
    }

    @Override
    public Long reauthenticationLatencyMs() {
        return this.reauthInfo.reauthenticationLatencyMs();
    }

    int nextCorrelationId() {
        if (!SaslClientAuthenticator.isReserved(this.correlationId)) {
            this.correlationId = 0x7FFFFFF8;
        }
        return this.correlationId++;
    }

    private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
        String clientId = (String)this.configs.get("client.id");
        short requestApiKey = apiKey.id;
        this.currentRequestHeader = new RequestHeader(new RequestHeaderData().setRequestApiKey(requestApiKey).setRequestApiVersion(version).setClientId(clientId).setCorrelationId(this.nextCorrelationId()), apiKey.requestHeaderVersion(version));
        return this.currentRequestHeader;
    }

    protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
        return new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism(this.mechanism)).build(version);
    }

    protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVersionsResponse) {
        ApiVersionsResponseData.ApiVersionsResponseKey handshakeVersion;
        ApiVersionsResponseData.ApiVersionsResponseKey authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
        if (authenticateVersion != null) {
            this.saslAuthenticateVersion = (short)Math.min(authenticateVersion.maxVersion(), ApiKeys.SASL_AUTHENTICATE.latestVersion());
        }
        if ((handshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id)) != null) {
            this.saslHandshakeVersion = (short)Math.min(handshakeVersion.maxVersion(), ApiKeys.SASL_HANDSHAKE.latestVersion());
        }
    }

    private void setSaslState(SaslState saslState) {
        if (this.netOutBuffer != null && !this.netOutBuffer.completed()) {
            this.pendingSaslState = saslState;
        } else {
            this.pendingSaslState = null;
            this.saslState = saslState;
            this.log.debug("Set SASL client state to {}", (Object)saslState);
            if (saslState == SaslState.COMPLETE) {
                this.reauthInfo.setAuthenticationEndAndSessionReauthenticationTimes(this.time.nanoseconds());
                if (!this.reauthInfo.reauthenticating()) {
                    this.transportLayer.removeInterestOps(4);
                } else {
                    this.transportLayer.addInterestOps(4);
                }
            }
        }
    }

    private boolean sendSaslClientToken(byte[] serverToken, boolean isInitial) throws IOException {
        byte[] saslToken;
        if (!this.saslClient.isComplete() && (saslToken = this.createSaslToken(serverToken, isInitial)) != null) {
            ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken);
            if (this.saslAuthenticateVersion != -1) {
                SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(tokenBuf.array());
                SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build(this.saslAuthenticateVersion);
                tokenBuf = request.serialize(this.nextRequestHeader(ApiKeys.SASL_AUTHENTICATE, this.saslAuthenticateVersion));
            }
            this.send(new NetworkSend(this.node, tokenBuf));
            return true;
        }
        return false;
    }

    private void send(Send send2) throws IOException {
        try {
            this.netOutBuffer = send2;
            this.flushNetOutBufferAndUpdateInterestOps();
        }
        catch (IOException e) {
            this.setSaslState(SaslState.FAILED);
            throw e;
        }
    }

    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushedCompletely = this.flushNetOutBuffer();
        if (flushedCompletely) {
            this.transportLayer.removeInterestOps(4);
            if (this.pendingSaslState != null) {
                this.setSaslState(this.pendingSaslState);
            }
        } else {
            this.transportLayer.addInterestOps(4);
        }
        return flushedCompletely;
    }

    private byte[] receiveResponseOrToken() throws IOException {
        if (this.netInBuffer == null) {
            this.netInBuffer = new NetworkReceive(this.node);
        }
        this.netInBuffer.readFrom(this.transportLayer);
        byte[] serverPacket = null;
        if (this.netInBuffer.complete()) {
            this.netInBuffer.payload().rewind();
            serverPacket = new byte[this.netInBuffer.payload().remaining()];
            this.netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
            this.netInBuffer = null;
        }
        return serverPacket;
    }

    @Override
    public KafkaPrincipal principal() {
        return new KafkaPrincipal("User", this.clientPrincipalName);
    }

    @Override
    public boolean complete() {
        return this.saslState == SaslState.COMPLETE;
    }

    @Override
    public void close() throws IOException {
        if (this.saslClient != null) {
            this.saslClient.dispose();
        }
    }

    private byte[] receiveToken() throws IOException {
        if (this.saslAuthenticateVersion == -1) {
            return this.receiveResponseOrToken();
        }
        SaslAuthenticateResponse response = (SaslAuthenticateResponse)this.receiveKafkaResponse();
        if (response != null) {
            Errors error = response.error();
            if (error != Errors.NONE) {
                this.setSaslState(SaslState.FAILED);
                String errMsg = response.errorMessage();
                throw errMsg == null ? error.exception() : error.exception(errMsg);
            }
            long sessionLifetimeMs = response.sessionLifetimeMs();
            if (sessionLifetimeMs > 0L) {
                this.reauthInfo.positiveSessionLifetimeMs = sessionLifetimeMs;
            }
            return Utils.copyArray(response.saslAuthBytes());
        }
        return null;
    }

    private byte[] createSaslToken(byte[] saslToken, boolean isInitial) throws SaslException {
        if (saslToken == null) {
            throw new IllegalSaslStateException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
        }
        try {
            if (isInitial && !this.saslClient.hasInitialResponse()) {
                return saslToken;
            }
            return Subject.doAs(this.subject, () -> this.saslClient.evaluateChallenge(saslToken));
        }
        catch (PrivilegedActionException e) {
            String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
            KerberosError kerberosError = KerberosError.fromException(e);
            if (kerberosError == KerberosError.SERVER_NOT_FOUND) {
                error = error + " This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
            }
            error = error + " Kafka Client will go to AUTHENTICATION_FAILED state.";
            Throwable cause = e.getCause();
            if (kerberosError != null && kerberosError.retriable()) {
                throw new SaslException(error, cause);
            }
            throw new SaslAuthenticationException(error, cause);
        }
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    private AbstractResponse receiveKafkaResponse() throws IOException {
        if (this.netInBuffer == null) {
            this.netInBuffer = new NetworkReceive(this.node);
        }
        NetworkReceive receive = this.netInBuffer;
        try {
            byte[] responseBytes = this.receiveResponseOrToken();
            if (responseBytes == null) {
                return null;
            }
            AbstractResponse response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), this.currentRequestHeader);
            this.currentRequestHeader = null;
            return response;
        }
        catch (IllegalArgumentException | SchemaException e) {
            if (this.reauthInfo.reauthenticating()) {
                receive.payload().rewind();
                this.reauthInfo.pendingAuthenticatedReceives.add(receive);
                return null;
            }
            this.log.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
            this.setSaslState(SaslState.FAILED);
            throw new IllegalSaslStateException("Invalid SASL mechanism response, server may be expecting a different protocol", e);
        }
    }

    private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
        Errors error = response.error();
        if (error != Errors.NONE) {
            this.setSaslState(SaslState.FAILED);
        }
        switch (error) {
            case NONE: {
                break;
            }
            case UNSUPPORTED_SASL_MECHANISM: {
                throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s", this.mechanism, response.enabledMechanisms()));
            }
            case ILLEGAL_SASL_STATE: {
                throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s", this.mechanism, response.enabledMechanisms()));
            }
            default: {
                throw new IllegalSaslStateException(String.format("Unknown error code %s, client mechanism is %s, enabled mechanisms are %s", new Object[]{response.error(), this.mechanism, response.enabledMechanisms()}));
            }
        }
    }

    public static String firstPrincipal(Subject subject) {
        Set<Principal> principals;
        Set<Principal> set = principals = subject.getPrincipals();
        synchronized (set) {
            Iterator<Principal> iterator2 = principals.iterator();
            if (iterator2.hasNext()) {
                return iterator2.next().getName();
            }
            throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login");
        }
    }

    private class ReauthInfo {
        public ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication;
        public long reauthenticationBeginNanos;
        public List<NetworkReceive> pendingAuthenticatedReceives = new ArrayList<NetworkReceive>();
        public ApiVersionsResponse apiVersionsResponseReceivedFromBroker;
        public Long positiveSessionLifetimeMs;
        public long authenticationEndNanos;
        public Long clientSessionReauthenticationTimeNanos;

        private ReauthInfo() {
        }

        public void reauthenticating(ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication, long reauthenticationBeginNanos) {
            this.apiVersionsResponseFromOriginalAuthentication = Objects.requireNonNull(apiVersionsResponseFromOriginalAuthentication);
            this.reauthenticationBeginNanos = reauthenticationBeginNanos;
        }

        public boolean reauthenticating() {
            return this.apiVersionsResponseFromOriginalAuthentication != null;
        }

        public ApiVersionsResponse apiVersionsResponse() {
            return this.reauthenticating() ? this.apiVersionsResponseFromOriginalAuthentication : this.apiVersionsResponseReceivedFromBroker;
        }

        public Optional<NetworkReceive> pollResponseReceivedDuringReauthentication() {
            if (this.pendingAuthenticatedReceives.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(this.pendingAuthenticatedReceives.remove(0));
        }

        public void setAuthenticationEndAndSessionReauthenticationTimes(long nowNanos) {
            this.authenticationEndNanos = nowNanos;
            long sessionLifetimeMsToUse = 0L;
            if (this.positiveSessionLifetimeMs != null) {
                double pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount = 0.85;
                double pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously = 0.1;
                double pctToUse = pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount + RNG.nextDouble() * pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously;
                sessionLifetimeMsToUse = (long)((double)this.positiveSessionLifetimeMs.longValue() * pctToUse);
                this.clientSessionReauthenticationTimeNanos = this.authenticationEndNanos + 1000000L * sessionLifetimeMsToUse;
                SaslClientAuthenticator.this.log.debug("Finished {} with session expiration in {} ms and session re-authentication on or after {} ms", this.authenticationOrReauthenticationText(), this.positiveSessionLifetimeMs, sessionLifetimeMsToUse);
            } else {
                SaslClientAuthenticator.this.log.debug("Finished {} with no session expiration and no session re-authentication", (Object)this.authenticationOrReauthenticationText());
            }
        }

        public Long reauthenticationLatencyMs() {
            return this.reauthenticating() ? Long.valueOf(Math.round((double)(this.authenticationEndNanos - this.reauthenticationBeginNanos) / 1000.0 / 1000.0)) : null;
        }

        private String authenticationOrReauthenticationText() {
            return this.reauthenticating() ? "re-authentication" : "authentication";
        }
    }

    public static enum SaslState {
        SEND_APIVERSIONS_REQUEST,
        RECEIVE_APIVERSIONS_RESPONSE,
        SEND_HANDSHAKE_REQUEST,
        RECEIVE_HANDSHAKE_RESPONSE,
        INITIAL,
        INTERMEDIATE,
        CLIENT_COMPLETE,
        COMPLETE,
        FAILED,
        REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE,
        REAUTH_SEND_HANDSHAKE_REQUEST,
        REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE,
        REAUTH_INITIAL;

    }
}

