/*
 * Decompiled with CFR 0.152.
 */
package com.liferay.petra.salesforce.client.streaming;

import com.liferay.petra.salesforce.client.BaseSalesforceClientImpl;
import com.liferay.petra.salesforce.client.streaming.SalesforceStreamingClient;
import com.liferay.petra.string.StringBundler;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceStreamingClientImpl
extends BaseSalesforceClientImpl
implements SalesforceStreamingClient {
    private static final Logger _logger = LoggerFactory.getLogger(SalesforceStreamingClientImpl.class);
    private BayeuxClient _bayeuxClient;
    private final HttpClient _httpClient = new HttpClient();
    private int _transportTimeout = 1;

    @Override
    public boolean connect() throws ConnectionException {
        if (this._bayeuxClient == null) {
            this.afterPropertiesSet();
        }
        if (this._bayeuxClient == null) {
            throw new ConnectionException();
        }
        if (this._bayeuxClient.isConnected()) {
            return true;
        }
        this._bayeuxClient.handshake();
        boolean connected = this._bayeuxClient.waitFor(10000L, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0]);
        if (_logger.isInfoEnabled()) {
            _logger.info("Connected: {}", (Object)connected);
        }
        return connected;
    }

    public void destroy() {
        if (this._bayeuxClient.isConnected()) {
            boolean disconnected = false;
            while (!disconnected) {
                disconnected = this.disconnect();
            }
        }
        try {
            this._httpClient.stop();
        }
        catch (Exception e) {
            _logger.error("Unable to stop http client", e);
        }
    }

    @Override
    public boolean disconnect() {
        if (this._bayeuxClient.isDisconnected()) {
            return true;
        }
        this._bayeuxClient.disconnect();
        boolean disconnected = this._bayeuxClient.waitFor(10000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
        if (_logger.isInfoEnabled()) {
            _logger.info("Disconnected: {}", (Object)disconnected);
        }
        return disconnected;
    }

    @Override
    public Channel getChannel(String name) {
        return this._bayeuxClient.getChannel(name);
    }

    @Override
    public int getTransportTimeout() {
        return this._transportTimeout;
    }

    @Override
    public void setTransportTimeout(int transportTimeout) {
        this._transportTimeout = transportTimeout;
    }

    @Override
    protected void afterPropertiesSet() {
        super.afterPropertiesSet();
        try {
            PartnerConnection partnerConnection = this.getPartnerConnection();
            ConnectorConfig connectorConfig = partnerConnection.getConfig();
            HashMap<String, Object> options = new HashMap<String, Object>();
            options.put("maxNetworkDelay", this._transportTimeout * 6000);
            this._httpClient.start();
            URL url = new URL(connectorConfig.getServiceEndpoint());
            this._bayeuxClient = new BayeuxClient(StringBundler.concat((String[])new String[]{url.getProtocol(), "://", url.getHost(), "/cometd/37.0"}), new SalesforceTransport(connectorConfig.getSessionId(), options, this._httpClient), new ClientTransport[0]);
            ClientSessionChannel handshakeClientSessionChannel = this._bayeuxClient.getChannel("/meta/handshake");
            handshakeClientSessionChannel.addListener(new SalesforceMessageListener());
            ClientSessionChannel connectClientSessionChannel = this._bayeuxClient.getChannel("/meta/connect");
            connectClientSessionChannel.addListener(new SalesforceMessageListener());
            ClientSessionChannel subscribeClientSessionChannel = this._bayeuxClient.getChannel("/meta/subscribe");
            subscribeClientSessionChannel.addListener(new SalesforceMessageListener());
        }
        catch (Exception e) {
            _logger.error(e.getMessage(), e);
        }
    }

    private class SalesforceTransport
    extends LongPollingTransport {
        private final String _sessionId;

        public SalesforceTransport(String sessionId, Map<String, Object> options, HttpClient httpClient) {
            super(options, httpClient);
            this._sessionId = sessionId;
        }

        @Override
        protected void customize(Request request) {
            super.customize(request);
            request.header("Authorization", "OAuth " + this._sessionId);
        }
    }

    private class SalesforceMessageListener
    implements ClientSessionChannel.MessageListener {
        private SalesforceMessageListener() {
        }

        @Override
        public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
            if (_logger.isInfoEnabled()) {
                _logger.info("Received message: {}", (Object)message);
            }
            if (!message.isSuccessful()) {
                _logger.error("Unable to send message");
                if (message.get("error") != null) {
                    _logger.error((String)message.get("error"));
                }
                if (message.get("exception") != null) {
                    Exception e = (Exception)message.get("exception");
                    e.printStackTrace();
                }
                SalesforceStreamingClientImpl.this._bayeuxClient.disconnect();
            }
        }
    }
}

