package servicepatterns.basepatterns.ackreqrep;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import de.unistuttgart.isw.sfsc.commonjava.registry.CallbackRegistry;
import de.unistuttgart.isw.sfsc.commonjava.util.scheduling.MaxTimesRepetition;
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.outputmanagement.OutputPublisher;
import de.unistuttgart.isw.sfsc.framework.messagingpatterns.ackreqrep.Reply;
import de.unistuttgart.isw.sfsc.framework.messagingpatterns.ackreqrep.RequestOrAcknowledge;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:servicepatterns/basepatterns/ackreqrep/AckServerConsumer.class */
public final class AckServerConsumer implements BiConsumer<ByteString, ByteString> {
    private static final Logger logger = LoggerFactory.getLogger(AckServerConsumer.class);
    private final Supplier<Integer> idGenerator;
    private final OutputPublisher publisher;
    private final CallbackRegistry callbackRegistry;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ByteString serverTopic;
    private final Function<ByteString, AckServerResult> serverFunction;
    private final int timeoutMs;
    private final int sendRateMs;
    private final int sendMaxTries;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: servicepatterns.basepatterns.ackreqrep.AckServerConsumer$1, reason: invalid class name */
    /* loaded from: input_file:servicepatterns/basepatterns/ackreqrep/AckServerConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$de$unistuttgart$isw$sfsc$framework$messagingpatterns$ackreqrep$RequestOrAcknowledge$RequestOrAcknowledgeCase = new int[RequestOrAcknowledge.RequestOrAcknowledgeCase.values().length];

        static {
            try {
                $SwitchMap$de$unistuttgart$isw$sfsc$framework$messagingpatterns$ackreqrep$RequestOrAcknowledge$RequestOrAcknowledgeCase[RequestOrAcknowledge.RequestOrAcknowledgeCase.REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$de$unistuttgart$isw$sfsc$framework$messagingpatterns$ackreqrep$RequestOrAcknowledge$RequestOrAcknowledgeCase[RequestOrAcknowledge.RequestOrAcknowledgeCase.ACKNOWLEDGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AckServerConsumer(OutputPublisher outputPublisher, CallbackRegistry callbackRegistry, ScheduledExecutorService scheduledExecutorService, Function<ByteString, AckServerResult> function, ByteString byteString, int i, int i2, int i3) {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.idGenerator = atomicInteger::getAndIncrement;
        this.publisher = outputPublisher;
        this.callbackRegistry = callbackRegistry;
        this.scheduledExecutorService = scheduledExecutorService;
        this.serverTopic = byteString;
        this.serverFunction = function;
        this.timeoutMs = i;
        this.sendRateMs = i2;
        this.sendMaxTries = i3;
    }

    @Override // java.util.function.BiConsumer
    public void accept(ByteString byteString, ByteString byteString2) {
        try {
            RequestOrAcknowledge parseFrom = RequestOrAcknowledge.parseFrom(byteString2);
            switch (AnonymousClass1.$SwitchMap$de$unistuttgart$isw$sfsc$framework$messagingpatterns$ackreqrep$RequestOrAcknowledge$RequestOrAcknowledgeCase[parseFrom.getRequestOrAcknowledgeCase().ordinal()]) {
                case 1:
                    RequestOrAcknowledge.Request request = parseFrom.getRequest();
                    int expectedReplyId = request.getExpectedReplyId();
                    ByteString replyTopic = request.getReplyTopic();
                    AckServerResult apply = this.serverFunction.apply(request.getRequestPayload());
                    int intValue = this.idGenerator.get().intValue();
                    Reply wrapReply = wrapReply(intValue, this.serverTopic, expectedReplyId, apply.getResponse());
                    MaxTimesRepetition scheduleMaxTimes = MaxTimesRepetition.scheduleMaxTimes(this.scheduledExecutorService, () -> {
                        this.publisher.publish(replyTopic, wrapReply);
                    }, this.sendRateMs, this.sendMaxTries);
                    this.callbackRegistry.addCallback(intValue, byteString3 -> {
                        scheduleMaxTimes.close();
                        apply.getOnDeliverySuccess().run();
                    }, this.timeoutMs, apply.getOnDeliveryFail());
                    break;
                case 2:
                    this.callbackRegistry.performCallback(parseFrom.getAcknowledge().getAcknowledgeId(), (ByteString) null);
                    break;
                default:
                    logger.warn("Received unsupported message type {}", parseFrom.getRequestOrAcknowledgeCase());
                    break;
            }
        } catch (InvalidProtocolBufferException e) {
            logger.warn("Received malformed message", e);
        }
    }

    Reply wrapReply(int i, ByteString byteString, int i2, Message message) {
        return Reply.newBuilder().setAcknowledgeTopic(byteString).setExpectedAcknowledgeId(i).setReplyId(i2).setReplyPayload(message.toByteString()).build();
    }
}
