package org.apache.plc4x.camel;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/plc4x/camel/Plc4XConsumer.class */
public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcSubscriptionEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
    private Plc4XEndpoint endpoint;
    private AsyncProcessor processor;
    private ExceptionHandler exceptionHandler;
    private PlcConnection plcConnection;
    private String fieldQuery;
    private Class<?> dataType;
    private PlcSubscriptionResponse subscriptionResponse;

    public Plc4XConsumer(Plc4XEndpoint plc4XEndpoint, Processor processor) throws PlcException {
        this.endpoint = plc4XEndpoint;
        this.dataType = plc4XEndpoint.getDataType();
        this.processor = AsyncProcessorConverterHelper.convert(processor);
        this.exceptionHandler = new LoggingExceptionHandler(plc4XEndpoint.getCamelContext(), getClass());
        this.plcConnection = plc4XEndpoint.getPlcDriverManager().getConnection(plc4XEndpoint.getEndpointUri().replaceFirst("plc4x:/?/?", ""));
        this.fieldQuery = plc4XEndpoint.getAddress();
    }

    public String toString() {
        return "Plc4XConsumer[" + this.endpoint + "]";
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    protected void doStart() throws InterruptedException, ExecutionException {
        this.subscriptionResponse = (PlcSubscriptionResponse) this.plcConnection.subscriptionRequestBuilder().addCyclicField("default", this.fieldQuery, Duration.of(3L, ChronoUnit.SECONDS)).build().execute().get();
    }

    protected void doStop() throws InterruptedException, ExecutionException, TimeoutException {
        this.plcConnection.unsubscriptionRequestBuilder().addHandles(this.subscriptionResponse.getSubscriptionHandles()).build().execute().get(5L, TimeUnit.SECONDS);
        try {
            this.plcConnection.close();
        } catch (Exception e) {
            LOGGER.error("Error closing connection", e);
        }
    }

    @Override // java.util.function.Consumer
    public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
        LOGGER.debug("Received {}", plcSubscriptionEvent);
        try {
            Exchange createExchange = this.endpoint.createExchange();
            createExchange.getIn().setBody(unwrapIfSingle(plcSubscriptionEvent.getAllObjects("default")));
            this.processor.process(createExchange);
        } catch (Exception e) {
            this.exceptionHandler.handleException(e);
        }
    }

    private Object unwrapIfSingle(Collection collection) {
        if (collection.isEmpty()) {
            return null;
        }
        return collection.size() == 1 ? collection.iterator().next() : collection;
    }
}
