package org.zalando.spring.boot.nakadi.config;

import com.google.common.collect.Sets;
import java.io.IOException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.zalando.fahrschein.AuthorizationBuilder;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.SubscriptionBuilder;
import org.zalando.fahrschein.domain.Authorization;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.spring.boot.nakadi.NakadiListener;

/* loaded from: input_file:org/zalando/spring/boot/nakadi/config/FahrscheinNakadiConsumer.class */
public class FahrscheinNakadiConsumer implements NakadiConsumer, BeanNameAware, ApplicationEventPublisherAware {
    private final NakadiClient nakadiClient;
    private final ConsumerConfig consumerConfig;
    private String beanName;
    private ApplicationEventPublisher eventPublisher;

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    @Override // org.zalando.spring.boot.nakadi.config.NakadiConsumer
    public <Type> IORunnable runnable(NakadiListener<Type> nakadiListener) throws IOException {
        Subscription subscription = getSubscription();
        return this.nakadiClient.stream(subscription).withStreamParameters(getStreamParameters()).runnable(nakadiListener.getEventType(), nakadiListener);
    }

    private Subscription getSubscription() throws IOException {
        SubscriptionBuilder withAuthorization = this.nakadiClient.subscription(this.consumerConfig.getApplicationName(), Sets.newHashSet(this.consumerConfig.getTopics())).withConsumerGroup(this.consumerConfig.getConsumerGroup()).withAuthorization(AuthorizationBuilder.authorization().withReaders(new Authorization.AuthorizationAttribute[]{Authorization.AuthorizationAttribute.ANYONE}).addAdmin("service", "stups_nakajima").build());
        return (Position.END.equals(this.consumerConfig.getReadFrom()) ? withAuthorization.readFromEnd() : withAuthorization.readFromBegin()).subscribe();
    }

    protected StreamParameters getStreamParameters() {
        if (this.consumerConfig.getStreamParameters() == null) {
            return new StreamParameters();
        }
        StreamParametersConfig streamParameters = this.consumerConfig.getStreamParameters();
        StreamParameters streamParameters2 = new StreamParameters();
        if (streamParameters.getBatchFlushTimeout() != null) {
            streamParameters2 = streamParameters2.withBatchFlushTimeout(streamParameters.getBatchFlushTimeout().intValue());
        }
        if (streamParameters.getBatchLimit() != null) {
            streamParameters2 = streamParameters2.withBatchLimit(streamParameters.getBatchLimit().intValue());
        }
        if (streamParameters.getMaxUncommittedEvents() != null) {
            streamParameters2 = streamParameters2.withMaxUncommittedEvents(streamParameters.getMaxUncommittedEvents().intValue());
        }
        if (streamParameters.getStreamKeepAliveLimit() != null) {
            streamParameters2 = streamParameters2.withStreamKeepAliveLimit(streamParameters.getStreamKeepAliveLimit().intValue());
        }
        if (streamParameters.getStreamLimit() != null) {
            streamParameters2 = streamParameters2.withStreamLimit(streamParameters.getStreamLimit().intValue());
        }
        if (streamParameters.getStreamTimeout() != null) {
            streamParameters2 = streamParameters2.withStreamTimeout(streamParameters.getStreamTimeout().intValue());
        }
        return streamParameters2;
    }

    public FahrscheinNakadiConsumer(NakadiClient nakadiClient, ConsumerConfig consumerConfig) {
        this.nakadiClient = nakadiClient;
        this.consumerConfig = consumerConfig;
    }
}
