/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.logstreams.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventFilter;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.logstreams.processor.StreamProcessorErrorHandler;
import io.zeebe.logstreams.spi.SnapshotPositionProvider;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.buffer.BufferReader;

public class StreamProcessorService
implements Service<StreamProcessorController> {
    private final Injector<LogStream> sourceStreamInjector = new Injector();
    private final Injector<LogStream> targetStreamInjector = new Injector();
    private final Injector<SnapshotStorage> snapshotStorageInjector = new Injector();
    private final Injector<ActorScheduler> actorSchedulerInjector = new Injector();
    private final String name;
    private final int id;
    private final StreamProcessor streamProcessor;
    protected MetadataFilter customEventFilter;
    protected EventFilter customReprocessingEventFilter;
    protected boolean readOnly;
    protected StreamProcessorErrorHandler errorHandler;
    protected final MetadataFilter versionFilter = m -> {
        if (m.getProtocolVersion() > 1) {
            throw new RuntimeException(String.format("Cannot handle event with version newer than what is implemented by broker (%d > %d)", m.getProtocolVersion(), 1));
        }
        return true;
    };
    protected SnapshotPositionProvider snapshotPositionProvider;
    private StreamProcessorController streamProcessorController;

    public StreamProcessorService(String name, int id, StreamProcessor streamProcessor) {
        this.name = name;
        this.id = id;
        this.streamProcessor = streamProcessor;
    }

    public StreamProcessorService eventFilter(MetadataFilter eventFilter) {
        this.customEventFilter = eventFilter;
        return this;
    }

    public StreamProcessorService reprocessingEventFilter(EventFilter reprocessingEventFilter) {
        this.customReprocessingEventFilter = reprocessingEventFilter;
        return this;
    }

    public StreamProcessorService readOnly(boolean readOnly) {
        this.readOnly = readOnly;
        return this;
    }

    public StreamProcessorService snapshotPositionProvider(SnapshotPositionProvider snapshotPositionProvider) {
        this.snapshotPositionProvider = snapshotPositionProvider;
        return this;
    }

    public StreamProcessorService errorHandler(StreamProcessorErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
        return this;
    }

    public void start(ServiceStartContext ctx) {
        LogStream sourceStream = (LogStream)this.sourceStreamInjector.getValue();
        LogStream targetStream = (LogStream)this.targetStreamInjector.getValue();
        SnapshotStorage snapshotStorage = (SnapshotStorage)this.snapshotStorageInjector.getValue();
        ActorScheduler actorScheduler = (ActorScheduler)this.actorSchedulerInjector.getValue();
        MetadataFilter metadataFilter = this.versionFilter;
        if (this.customEventFilter != null) {
            metadataFilter = metadataFilter.and(this.customEventFilter);
        }
        MetadataEventFilter eventFilter = new MetadataEventFilter(metadataFilter);
        MetadataEventFilter reprocessingEventFilter = new MetadataEventFilter(this.versionFilter);
        if (this.customReprocessingEventFilter != null) {
            reprocessingEventFilter = reprocessingEventFilter.and(this.customReprocessingEventFilter);
        }
        if (this.errorHandler == null) {
            this.errorHandler = new DefaultStreamProcessorErrorHandler();
        }
        this.streamProcessorController = LogStreams.createStreamProcessor((String)this.name, (int)this.id, (StreamProcessor)this.streamProcessor).sourceStream(sourceStream).targetStream(targetStream).snapshotStorage(snapshotStorage).snapshotPositionProvider(this.snapshotPositionProvider).actorScheduler(actorScheduler).eventFilter((EventFilter)eventFilter).reprocessingEventFilter((EventFilter)reprocessingEventFilter).errorHandler(this.errorHandler).readOnly(this.readOnly).build();
        ctx.async(this.streamProcessorController.openAsync());
    }

    public StreamProcessorController get() {
        return this.streamProcessorController;
    }

    public void stop(ServiceStopContext ctx) {
        ctx.async(this.streamProcessorController.closeAsync());
    }

    public Injector<SnapshotStorage> getSnapshotStorageInjector() {
        return this.snapshotStorageInjector;
    }

    public Injector<ActorScheduler> getActorSchedulerInjector() {
        return this.actorSchedulerInjector;
    }

    public Injector<LogStream> getSourceStreamInjector() {
        return this.sourceStreamInjector;
    }

    public Injector<LogStream> getTargetStreamInjector() {
        return this.targetStreamInjector;
    }

    public StreamProcessorController getStreamProcessorController() {
        return this.streamProcessorController;
    }

    protected static class DefaultStreamProcessorErrorHandler
    implements StreamProcessorErrorHandler {
        protected DefaultStreamProcessorErrorHandler() {
        }

        public boolean canHandle(Exception error) {
            return false;
        }

        public boolean onError(LoggedEvent failedEvent, Exception error) {
            return false;
        }
    }

    protected static class MetadataEventFilter
    implements EventFilter {
        protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
        protected final MetadataFilter metadataFilter;

        public MetadataEventFilter(MetadataFilter metadataFilter) {
            this.metadataFilter = metadataFilter;
        }

        public boolean applies(LoggedEvent event) {
            event.readMetadata((BufferReader)this.metadata);
            return this.metadataFilter.applies(this.metadata);
        }
    }
}

