package io.knotx.databridge.api.reactivex;

import io.knotx.databridge.api.DataSourceAdapterProxy;
import io.knotx.databridge.api.DataSourceAdapterRequest;
import io.knotx.databridge.api.DataSourceAdapterResponse;
import io.knotx.dataobjects.ClientResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.Single;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

/* loaded from: input_file:io/knotx/databridge/api/reactivex/AbstractDataSourceAdapterProxy.class */
public abstract class AbstractDataSourceAdapterProxy implements DataSourceAdapterProxy {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataSourceAdapterProxy.class);

    protected abstract Single<DataSourceAdapterResponse> processRequest(DataSourceAdapterRequest dataSourceAdapterRequest);

    @Override // io.knotx.databridge.api.DataSourceAdapterProxy
    public void process(DataSourceAdapterRequest dataSourceAdapterRequest, Handler<AsyncResult<DataSourceAdapterResponse>> handler) {
        processRequest(dataSourceAdapterRequest).subscribe(dataSourceAdapterResponse -> {
            handler.handle(Future.succeededFuture(dataSourceAdapterResponse));
        }, th -> {
            LOGGER.error("Error happened during Adapter Request processing", th);
            handler.handle(Future.succeededFuture(getErrorResponse(th)));
        });
    }

    protected DataSourceAdapterResponse getErrorResponse(Throwable th) {
        return new DataSourceAdapterResponse().setResponse(new ClientResponse().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).setBody(Buffer.buffer(th.getMessage())));
    }
}
