/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.Map;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerConnector {
    private static final Logger log = LoggerFactory.getLogger(WorkerConnector.class);
    private final String connName;
    private final ConnectorStatus.Listener statusListener;
    private final ConnectorContext ctx;
    private final Connector connector;
    private Map<String, String> config;
    private State state;

    public WorkerConnector(String connName, Connector connector, ConnectorContext ctx, ConnectorStatus.Listener statusListener) {
        this.connName = connName;
        this.ctx = ctx;
        this.connector = connector;
        this.statusListener = statusListener;
        this.state = State.INIT;
    }

    public void initialize(ConnectorConfig connectorConfig) {
        try {
            this.config = connectorConfig.originalsStrings();
            log.debug("Initializing connector {} with config {}", (Object)this.connName, (Object)this.config);
            this.connector.initialize(new ConnectorContext(){

                @Override
                public void requestTaskReconfiguration() {
                    WorkerConnector.this.ctx.requestTaskReconfiguration();
                }

                @Override
                public void raiseError(Exception e) {
                    log.error("Connector raised an error {}", (Object)WorkerConnector.this.connName, (Object)e);
                    WorkerConnector.this.onFailure(e);
                    WorkerConnector.this.ctx.raiseError(e);
                }
            });
        }
        catch (Throwable t) {
            log.error("Error initializing connector {}", (Object)this.connName, (Object)t);
            this.onFailure(t);
        }
    }

    private boolean doStart() {
        try {
            switch (this.state) {
                case STARTED: {
                    return false;
                }
                case INIT: 
                case STOPPED: {
                    this.connector.start(this.config);
                    this.state = State.STARTED;
                    return true;
                }
            }
            throw new IllegalArgumentException("Cannot start connector in state " + (Object)((Object)this.state));
        }
        catch (Throwable t) {
            log.error("Error while starting connector {}", (Object)this.connName, (Object)t);
            this.onFailure(t);
            return false;
        }
    }

    private void onFailure(Throwable t) {
        this.statusListener.onFailure(this.connName, t);
        this.state = State.FAILED;
    }

    private void resume() {
        if (this.doStart()) {
            this.statusListener.onResume(this.connName);
        }
    }

    private void start() {
        if (this.doStart()) {
            this.statusListener.onStartup(this.connName);
        }
    }

    public boolean isRunning() {
        return this.state == State.STARTED;
    }

    private void pause() {
        try {
            switch (this.state) {
                case STOPPED: {
                    return;
                }
                case STARTED: {
                    this.connector.stop();
                }
                case INIT: {
                    this.statusListener.onPause(this.connName);
                    this.state = State.STOPPED;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Cannot pause connector in state " + (Object)((Object)this.state));
                }
            }
        }
        catch (Throwable t) {
            log.error("Error while shutting down connector {}", (Object)this.connName, (Object)t);
            this.statusListener.onFailure(this.connName, t);
            this.state = State.FAILED;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        try {
            if (this.state == State.STARTED) {
                this.connector.stop();
            }
            this.state = State.STOPPED;
        }
        catch (Throwable t) {
            log.error("Error while shutting down connector {}", (Object)this.connName, (Object)t);
            this.state = State.FAILED;
        }
        finally {
            this.statusListener.onShutdown(this.connName);
        }
    }

    public void transitionTo(TargetState targetState) {
        if (this.state == State.FAILED) {
            log.warn("Cannot transition connector {} to {} since it has failed", (Object)this.connName, (Object)targetState);
            return;
        }
        log.debug("Transition connector {} to {}", (Object)this.connName, (Object)targetState);
        if (targetState == TargetState.PAUSED) {
            this.pause();
        } else if (targetState == TargetState.STARTED) {
            if (this.state == State.INIT) {
                this.start();
            } else {
                this.resume();
            }
        } else {
            throw new IllegalArgumentException("Unhandled target state " + (Object)((Object)targetState));
        }
    }

    public boolean isSinkConnector() {
        return SinkConnector.class.isAssignableFrom(this.connector.getClass());
    }

    public Connector connector() {
        return this.connector;
    }

    public String toString() {
        return "WorkerConnector{connName='" + this.connName + '\'' + ", connector=" + this.connector + '}';
    }

    private static enum State {
        INIT,
        STOPPED,
        STARTED,
        FAILED;

    }
}

