/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import com.sun.security.auth.UserPrincipal;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.KerberosName;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslClientAuthenticator
implements Authenticator {
    private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
    private SaslClient saslClient;
    private Subject subject;
    private String servicePrincipal;
    private String host;
    private int node = 0;
    private TransportLayer transportLayer;
    private NetworkReceive netInBuffer;
    private NetworkSend netOutBuffer;
    private byte[] saslToken = new byte[0];
    private SaslState saslState = SaslState.INITIAL;

    public SaslClientAuthenticator(Subject subject, TransportLayer transportLayer, String servicePrincipal, String host) throws IOException {
        this.transportLayer = transportLayer;
        this.subject = subject;
        this.host = host;
        this.servicePrincipal = servicePrincipal;
        this.saslClient = this.createSaslClient();
    }

    private SaslClient createSaslClient() {
        boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
        if (usingNativeJgss) {
            try {
                GSSManager manager = GSSManager.getInstance();
                Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
                GSSCredential cred = manager.createCredential(null, Integer.MAX_VALUE, krb5Mechanism, 1);
                this.subject.getPrivateCredentials().add(cred);
            }
            catch (GSSException ex) {
                LOG.warn("Cannot add private credential to subject; authentication at the server may fail", ex);
            }
        }
        Object[] principals = this.subject.getPrincipals().toArray();
        Principal clientPrincipal = (Principal)principals[0];
        KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
        String serverRealm = System.getProperty("kafka.server.realm", clientKerberosName.getRealm());
        KerberosName serviceKerberosName = new KerberosName(this.servicePrincipal + "@" + serverRealm);
        final String clientPrincipalName = clientKerberosName.toString();
        try {
            this.saslClient = Subject.doAs(this.subject, new PrivilegedExceptionAction<SaslClient>(){

                @Override
                public SaslClient run() throws SaslException {
                    LOG.debug("Client will use GSSAPI as SASL mechanism.");
                    String[] mechs = new String[]{"GSSAPI"};
                    LOG.debug("creating sasl client: client=" + clientPrincipalName + ";service=" + SaslClientAuthenticator.this.servicePrincipal + ";serviceHostname=" + SaslClientAuthenticator.this.host);
                    SaslClient saslClient = Sasl.createSaslClient(mechs, clientPrincipalName, SaslClientAuthenticator.this.servicePrincipal, SaslClientAuthenticator.this.host, null, new ClientCallbackHandler(null));
                    return saslClient;
                }
            });
            return this.saslClient;
        }
        catch (Exception e) {
            LOG.error("Exception while trying to create SASL client", e);
            return null;
        }
    }

    @Override
    public int authenticate(boolean read2, boolean write2) throws IOException {
        if (this.netOutBuffer != null && !this.flushNetOutBuffer()) {
            return 4;
        }
        if (this.saslClient.isComplete()) {
            this.saslState = SaslState.COMPLETE;
            return 0;
        }
        byte[] serverToken = new byte[]{};
        if (read2 && this.saslState == SaslState.INTERMEDIATE) {
            long readLen;
            if (this.netInBuffer == null) {
                this.netInBuffer = new NetworkReceive(this.node);
            }
            if ((readLen = this.netInBuffer.readFrom(this.transportLayer)) == 0L || !this.netInBuffer.complete()) {
                return 1;
            }
            this.netInBuffer.payload().rewind();
            serverToken = new byte[this.netInBuffer.payload().remaining()];
            this.netInBuffer.payload().get(serverToken, 0, serverToken.length);
            this.netInBuffer = null;
        } else if (this.saslState == SaslState.INITIAL) {
            this.saslState = SaslState.INTERMEDIATE;
        }
        if (!this.saslClient.isComplete()) {
            try {
                this.saslToken = this.createSaslToken(serverToken);
                if (this.saslToken != null) {
                    this.netOutBuffer = new NetworkSend(this.node, ByteBuffer.wrap(this.saslToken));
                    if (!write2 || !this.flushNetOutBuffer()) {
                        return 4;
                    }
                }
            }
            catch (BufferUnderflowException be) {
                return 1;
            }
            catch (SaslException se) {
                this.saslState = SaslState.FAILED;
                throw new IOException("Unable to authenticate using SASL " + se);
            }
        }
        return 1;
    }

    @Override
    public Principal principal() {
        return new UserPrincipal("ANONYMOUS");
    }

    @Override
    public boolean isComplete() {
        return this.saslClient.isComplete() && this.saslState == SaslState.COMPLETE;
    }

    @Override
    public void close() throws IOException {
        this.saslClient.dispose();
    }

    private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
        if (saslToken == null) {
            throw new SaslException("Error in authenticating with a Kafka Broker: the kafka broker saslToken is null.");
        }
        try {
            byte[] retval = Subject.doAs(this.subject, new PrivilegedExceptionAction<byte[]>(){

                @Override
                public byte[] run() throws SaslException {
                    return SaslClientAuthenticator.this.saslClient.evaluateChallenge(saslToken);
                }
            });
            return retval;
        }
        catch (PrivilegedActionException e) {
            String error = "An error: (" + e + ") occurred when evaluating Kafka Brokers " + " received SASL token.";
            String UNKNOWN_SERVER_ERROR_TEXT = "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
            if (e.toString().indexOf("(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)") > -1) {
                error = error + " This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment.";
            }
            error = error + " Kafka Client will go to AUTH_FAILED state.";
            LOG.error(error);
            throw new SaslException(error);
        }
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    public static class ClientCallbackHandler
    implements CallbackHandler {
        private String password = null;

        public ClientCallbackHandler(String password) {
            this.password = password;
        }

        @Override
        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
            for (Callback callback : callbacks) {
                if (callback instanceof NameCallback) {
                    NameCallback nc = (NameCallback)callback;
                    nc.setName(nc.getDefaultName());
                    continue;
                }
                if (callback instanceof PasswordCallback) {
                    PasswordCallback pc = (PasswordCallback)callback;
                    if (this.password != null) {
                        pc.setPassword(this.password.toCharArray());
                        continue;
                    }
                    LOG.warn("Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and  the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. ");
                    continue;
                }
                if (callback instanceof RealmCallback) {
                    RealmCallback rc = (RealmCallback)callback;
                    rc.setText(rc.getDefaultText());
                    continue;
                }
                if (callback instanceof AuthorizeCallback) {
                    String authzid;
                    AuthorizeCallback ac = (AuthorizeCallback)callback;
                    String authid = ac.getAuthenticationID();
                    if (authid.equals(authzid = ac.getAuthorizationID())) {
                        ac.setAuthorized(true);
                    } else {
                        ac.setAuthorized(false);
                    }
                    if (!ac.isAuthorized()) continue;
                    ac.setAuthorizedID(authzid);
                    continue;
                }
                throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
            }
        }
    }

    public static enum SaslState {
        INITIAL,
        INTERMEDIATE,
        COMPLETE,
        FAILED;

    }
}

