package com.emc.mongoose.storage.driver.kafka;

import com.emc.mongoose.base.config.IllegalConfigurationException;
import com.emc.mongoose.base.data.DataInput;
import com.emc.mongoose.base.item.DataItem;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.item.ItemFactory;
import com.emc.mongoose.base.item.op.OpType;
import com.emc.mongoose.base.item.op.Operation;
import com.emc.mongoose.base.item.op.data.DataOperation;
import com.emc.mongoose.base.item.op.path.PathOperation;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.storage.Credential;
import com.emc.mongoose.storage.driver.kafka.cache.AdminClientCreateFunctionImpl;
import com.emc.mongoose.storage.driver.kafka.cache.ProducerCreateFunctionImpl;
import com.emc.mongoose.storage.driver.kafka.cache.TopicCreateFunctionImpl;
import com.emc.mongoose.storage.driver.preempt.PreemptStorageDriverBase;
import com.github.akurilov.commons.concurrent.ContextAwareThreadFactory;
import com.github.akurilov.commons.lang.Exceptions;
import com.github.akurilov.confuse.Config;
import java.io.EOFException;
import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;

/* loaded from: input_file:com/emc/mongoose/storage/driver/kafka/KafkaStorageDriver.class */
public class KafkaStorageDriver<I extends Item, O extends Operation<I>> extends PreemptStorageDriverBase<I, O> {
    private final String[] endpointAddrs;
    private final boolean useKey;
    private final int requestSizeLimit;
    private final int batchSize;
    private final int sndBuf;
    private final int rcvBuf;
    private final int linger;
    private final long buffer;
    private final String compression;
    private final Duration readTimeout;
    private final Semaphore concurrencyThrottle;
    private final AtomicInteger rrc;
    private final Map<String, Properties> configCache;
    private final Map<Properties, AdminClientCreateFunctionImpl> adminClientCreateFuncCache;
    private final Map<String, AdminClient> adminClientCache;
    private final Map<Properties, ProducerCreateFunctionImpl> producerCreateFuncCache;
    private final ThreadLocal<Map<String, KafkaProducer>> threadLocalProducerCache;
    private final Map<AdminClient, TopicCreateFunctionImpl> topicCreateFuncCache;
    private final Map<String, NewTopic> topicCache;
    protected final Map<String, String> sharedHeaders;
    protected final Map<String, String> dynamicHeaders;
    private volatile boolean listWasCalled;

    /* renamed from: com.emc.mongoose.storage.driver.kafka.KafkaStorageDriver$1, reason: invalid class name */
    /* loaded from: input_file:com/emc/mongoose/storage/driver/kafka/KafkaStorageDriver$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$emc$mongoose$base$item$op$OpType = new int[OpType.values().length];

        static {
            try {
                $SwitchMap$com$emc$mongoose$base$item$op$OpType[OpType.NOOP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$emc$mongoose$base$item$op$OpType[OpType.CREATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$emc$mongoose$base$item$op$OpType[OpType.READ.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$emc$mongoose$base$item$op$OpType[OpType.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$emc$mongoose$base$item$op$OpType[OpType.LIST.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:com/emc/mongoose/storage/driver/kafka/KafkaStorageDriver$IoWorkerThreadFactory.class */
    final class IoWorkerThreadFactory extends ContextAwareThreadFactory {

        /* loaded from: input_file:com/emc/mongoose/storage/driver/kafka/KafkaStorageDriver$IoWorkerThreadFactory$IoWorkerThread.class */
        final class IoWorkerThread extends ContextAwareThreadFactory.ContextAwareThread {
            public IoWorkerThread(Runnable runnable, String str, boolean z, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, Map<String, String> map) {
                super(runnable, str, z, uncaughtExceptionHandler, map);
            }

            public final void interrupt() {
                Map<String, KafkaProducer> map = KafkaStorageDriver.this.threadLocalProducerCache.get();
                map.values().parallelStream().forEach(kafkaProducer -> {
                    kafkaProducer.close(Duration.ofSeconds(10L));
                });
                map.clear();
                super.interrupt();
            }
        }

        public IoWorkerThreadFactory() {
            super("io_worker_" + KafkaStorageDriver.this.stepId, true, ThreadContext.getContext());
        }

        public final Thread newThread(Runnable runnable) {
            return new IoWorkerThread(runnable, this.threadNamePrefix + "#" + this.threadNumber.incrementAndGet(), this.daemonFlag, exceptionHandler, this.threadContext);
        }
    }

    public KafkaStorageDriver(String str, DataInput dataInput, Config config, boolean z, int i) throws IllegalConfigurationException {
        super(str, dataInput, config, z, i);
        this.rrc = new AtomicInteger(0);
        this.configCache = new ConcurrentHashMap();
        this.adminClientCreateFuncCache = new ConcurrentHashMap();
        this.adminClientCache = new ConcurrentHashMap();
        this.producerCreateFuncCache = new ConcurrentHashMap();
        this.threadLocalProducerCache = ThreadLocal.withInitial(ConcurrentHashMap::new);
        this.topicCreateFuncCache = new ConcurrentHashMap();
        this.topicCache = new ConcurrentHashMap();
        this.sharedHeaders = new HashMap();
        this.dynamicHeaders = new HashMap();
        this.listWasCalled = false;
        Config configVal = config.configVal("driver");
        Config configVal2 = configVal.configVal("create");
        Map mapVal = configVal2.mapVal("headers");
        if (!mapVal.isEmpty()) {
            for (Map.Entry entry : mapVal.entrySet()) {
                String str2 = (String) entry.getKey();
                String str3 = (String) entry.getValue();
                if (str2.contains("#{") || str2.contains("${") || str2.contains("%{") || str3.contains("#{") || str3.contains("${") || str3.contains("%{")) {
                    this.dynamicHeaders.put(str2, str3);
                } else {
                    this.sharedHeaders.put(str2, str3);
                }
            }
        }
        this.useKey = configVal2.boolVal("key-enabled");
        this.requestSizeLimit = configVal.intVal("request-size");
        this.batchSize = i;
        Config configVal3 = config.configVal("net");
        this.buffer = configVal.longVal("buffer-memory");
        this.compression = configVal.stringVal("compression-type");
        Config configVal4 = configVal3.configVal("node");
        int intVal = configVal4.intVal("port");
        List listVal = configVal4.listVal("addrs");
        this.endpointAddrs = (String[]) listVal.toArray(new String[listVal.size()]);
        for (int i2 = 0; i2 < this.endpointAddrs.length; i2++) {
            if (!this.endpointAddrs[i2].contains(":")) {
                this.endpointAddrs[i2] = this.endpointAddrs[i2] + ":" + intVal;
            }
        }
        this.sndBuf = configVal3.intVal("sndBuf");
        this.rcvBuf = configVal3.intVal("rcvBuf");
        this.linger = configVal3.intVal("linger");
        long longVal = configVal.longVal("read-timeoutMillis");
        this.readTimeout = longVal > 0 ? Duration.ofMillis(longVal) : Duration.ofDays(Long.MAX_VALUE);
        this.concurrencyThrottle = new Semaphore(this.concurrencyLimit > 0 ? this.concurrencyLimit : Integer.MAX_VALUE);
        this.requestAuthTokenFunc = null;
        this.requestNewPathFunc = null;
    }

    protected ThreadFactory ioWorkerThreadFactory() {
        return new IoWorkerThreadFactory();
    }

    protected final boolean isBatch(List<O> list, int i, int i2) {
        return true;
    }

    protected final void execute(O o) {
        OpType type = o.type();
        if (o instanceof DataOperation) {
            switch (AnonymousClass1.$SwitchMap$com$emc$mongoose$base$item$op$OpType[type.ordinal()]) {
                case 1:
                    noop(List.of(o));
                    return;
                case 2:
                    produceRecords(List.of(o));
                    return;
                case 3:
                    consumeRecords(List.of(o));
                    return;
                default:
                    throw new AssertionError("Unsupported records operation type: " + type);
            }
        }
        if (!(o instanceof PathOperation)) {
            throw new AssertionError("Unsupported operation class: " + o.getClass());
        }
        switch (AnonymousClass1.$SwitchMap$com$emc$mongoose$base$item$op$OpType[type.ordinal()]) {
            case 1:
                noop(List.of(o));
                return;
            case 2:
                createTopics(List.of(o));
                return;
            case 3:
            default:
                return;
            case 4:
                deleteTopics(List.of(o));
                return;
            case 5:
                throw new AssertionError("Unsupported topics operation type: " + type);
        }
    }

    protected final void execute(List<O> list) throws IllegalStateException {
        O o = list.get(0);
        OpType type = o.type();
        if (o instanceof DataOperation) {
            switch (AnonymousClass1.$SwitchMap$com$emc$mongoose$base$item$op$OpType[type.ordinal()]) {
                case 1:
                    noop(list);
                    return;
                case 2:
                    produceRecords(list);
                    return;
                case 3:
                    consumeRecords(list);
                    return;
                default:
                    throw new AssertionError("Unsupported record operation type: " + type);
            }
        }
        if (!(o instanceof PathOperation)) {
            throw new AssertionError("Unsupported operation class: " + o.getClass());
        }
        switch (AnonymousClass1.$SwitchMap$com$emc$mongoose$base$item$op$OpType[type.ordinal()]) {
            case 1:
                noop(List.of(o));
                return;
            case 2:
                createTopics(list);
                return;
            case 3:
            default:
                return;
            case 4:
                deleteTopics(list);
                return;
            case 5:
                throw new AssertionError("Unsupported topics operation type: " + type);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    void noop(List<O> list) {
        try {
            this.concurrencyThrottle.acquire();
        } catch (InterruptedException e) {
            Exceptions.throwUnchecked(e);
        }
        if (((Operation) list.get(0)) instanceof DataOperation) {
            for (int i = 0; i < list.size(); i++) {
                DataOperation dataOperation = (Operation) list.get(i);
                dataOperation.startRequest();
                dataOperation.finishRequest();
                dataOperation.startResponse();
                dataOperation.finishResponse();
                try {
                    DataOperation dataOperation2 = dataOperation;
                    dataOperation2.countBytesDone(dataOperation2.item().size());
                } catch (IOException e2) {
                }
            }
        } else {
            for (int i2 = 0; i2 < list.size(); i2++) {
                Operation operation = (Operation) list.get(i2);
                operation.startRequest();
                operation.finishRequest();
                operation.startResponse();
                operation.finishResponse();
            }
        }
        this.concurrencyThrottle.release();
        completeOperations(list, Operation.Status.SUCC);
    }

    void completeOperations(List<? extends O> list, Operation.Status status) {
        for (int i = 0; i < list.size(); i++) {
            O o = list.get(i);
            o.status(status);
            handleCompleted(o);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    void produceRecords(List<O> list) {
        String nodeAddr = ((Operation) list.get(0)).nodeAddr();
        try {
            Properties computeIfAbsent = this.configCache.computeIfAbsent(nodeAddr, this::createConfig);
            KafkaProducer computeIfAbsent2 = this.threadLocalProducerCache.get().computeIfAbsent(nodeAddr, this.producerCreateFuncCache.computeIfAbsent(computeIfAbsent, ProducerCreateFunctionImpl::new));
            AdminClient computeIfAbsent3 = this.adminClientCache.computeIfAbsent(nodeAddr, this.adminClientCreateFuncCache.computeIfAbsent(computeIfAbsent, AdminClientCreateFunctionImpl::new));
            for (int i = 0; i < list.size(); i++) {
                DataOperation dataOperation = (DataOperation) list.get(i);
                String dstPath = dataOperation.dstPath();
                this.topicCache.computeIfAbsent(dstPath, this.topicCreateFuncCache.computeIfAbsent(computeIfAbsent3, TopicCreateFunctionImpl::new));
                DataItem item = dataOperation.item();
                ProducerRecord producerRecord = new ProducerRecord(dstPath, this.useKey ? item.name() : null, item);
                long size = item.size();
                this.concurrencyThrottle.acquire();
                dataOperation.startRequest();
                computeIfAbsent2.send(producerRecord, (recordMetadata, exc) -> {
                    handleRecordProduce(dataOperation, size, exc);
                });
                try {
                    dataOperation.finishRequest();
                } catch (IllegalStateException e) {
                }
            }
            computeIfAbsent2.flush();
        } catch (Throwable th) {
            com.emc.mongoose.base.Exceptions.throwUncheckedIfInterrupted(th);
            LogUtil.exception(Level.DEBUG, th, "Producing records failure", new Object[0]);
            completeOperations(list, Operation.Status.FAIL_UNKNOWN);
        }
    }

    void handleRecordProduce(DataOperation dataOperation, long j, Throwable th) {
        dataOperation.startResponse();
        dataOperation.finishResponse();
        this.concurrencyThrottle.release();
        if (null != th) {
            completeFailedOperation(dataOperation, th);
        } else {
            dataOperation.countBytesDone(j);
            completeOperation(dataOperation, Operation.Status.SUCC);
        }
    }

    void consumeRecords(List<O> list) {
        String nodeAddr = list.get(0).nodeAddr();
        int size = list.size();
        int i = size;
        while (i > 0) {
            Properties createConsumerConfig = createConsumerConfig(nodeAddr);
            createConsumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(i));
            try {
                KafkaConsumer kafkaConsumer = new KafkaConsumer(createConsumerConfig);
                try {
                    try {
                        this.concurrencyThrottle.acquire();
                        for (int i2 = size - i; i2 < size; i2++) {
                            DataOperation dataOperation = list.get(i2);
                            dataOperation.startRequest();
                            dataOperation.finishRequest();
                        }
                        Iterator it = kafkaConsumer.poll(this.readTimeout).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            DataOperation dataOperation2 = list.get(size - i);
                            dataOperation2.startResponse();
                            dataOperation2.finishResponse();
                            dataOperation2.countBytesDone(consumerRecord.serializedValueSize());
                            completeOperation(dataOperation2, Operation.Status.SUCC);
                            i--;
                        }
                        this.concurrencyThrottle.release();
                        kafkaConsumer.close();
                    } catch (Throwable th) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                        break;
                    }
                } catch (Throwable th3) {
                    this.concurrencyThrottle.release();
                    throw th3;
                    break;
                }
            } catch (Throwable th4) {
                com.emc.mongoose.base.Exceptions.throwUncheckedIfInterrupted(th4);
            }
        }
    }

    void createTopics(List<O> list) {
    }

    void deleteTopics(List<O> list) {
    }

    void completeOperation(O o, Operation.Status status) {
        o.status(status);
        handleCompleted(o);
    }

    void completeFailedOperation(O o, Throwable th) {
        LogUtil.exception(Level.DEBUG, th, "{}: operation failed: {}", new Object[]{this.stepId, o});
        completeOperation(o, Operation.Status.FAIL_UNKNOWN);
    }

    Properties createConfig(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(this.batchSize));
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(this.requestSizeLimit));
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Long.valueOf(this.buffer));
        properties.put("compression.type", this.compression);
        properties.put("send.buffer.bytes", Integer.valueOf(this.sndBuf));
        properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(this.linger));
        properties.put("receive.buffer.bytes", Integer.valueOf(this.rcvBuf));
        try {
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.forName("com.emc.mongoose.storage.driver.kafka.io.DataItemSerializer"));
        } catch (ClassNotFoundException e) {
            LogUtil.exception(Level.DEBUG, e, "{}: operation failed", new Object[]{this.stepId});
        }
        return properties;
    }

    Properties createConsumerConfig(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("send.buffer.bytes", Integer.valueOf(this.sndBuf));
        properties.put("receive.buffer.bytes", Integer.valueOf(this.rcvBuf));
        return properties;
    }

    String nextEndpointAddr() {
        return this.endpointAddrs[this.rrc.getAndIncrement() % this.endpointAddrs.length];
    }

    protected boolean prepare(O o) {
        super.prepare(o);
        if (o.nodeAddr() != null) {
            return true;
        }
        o.nodeAddr(nextEndpointAddr());
        return true;
    }

    protected String requestNewPath(String str) {
        throw new AssertionError("Should not be invoked");
    }

    protected void doClose() throws IOException, IllegalStateException {
        super.doClose();
        this.configCache.clear();
        this.adminClientCreateFuncCache.clear();
        this.adminClientCache.values().parallelStream().forEach(adminClient -> {
            adminClient.close(Duration.ofSeconds(10L));
        });
        this.adminClientCache.clear();
        this.producerCreateFuncCache.clear();
        this.topicCreateFuncCache.clear();
        this.topicCache.clear();
    }

    protected String requestNewAuthToken(Credential credential) {
        throw new AssertionError("Should not be invoked");
    }

    public List<I> list(ItemFactory<I> itemFactory, String str, String str2, int i, I i2, int i3) throws IOException {
        if (this.listWasCalled) {
            throw new EOFException();
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(itemFactory.getItem(str + str2, 0L, 0L));
        this.listWasCalled = true;
        return arrayList;
    }

    public void adjustIoBuffers(long j, OpType opType) {
    }
}
