package io.zeebe.logstreams.processor;

import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamFailureListener;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.spi.ReadableSnapshot;
import io.zeebe.logstreams.spi.SnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.state.ComposedState;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.TransitionState;
import io.zeebe.util.state.WaitState;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController.class */
public class StreamProcessorController implements Actor {
    public static final String ERROR_MESSAGE_REPROCESSING_FAILED = "Stream processor '%s' failed to reprocess. Cannot find source event position: %d";
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_OPEN = 1;
    protected static final int TRANSITION_CLOSE = 2;
    protected static final int TRANSITION_FAIL = 3;
    protected static final int TRANSITION_PROCESS = 4;
    protected static final int TRANSITION_SNAPSHOT = 5;
    protected static final int TRANSITION_RECOVER = 6;
    protected static final int TRANSITION_REPROCESS = 7;
    protected final StreamProcessor streamProcessor;
    protected final StreamProcessorContext streamProcessorContext;
    protected final DeferredCommandContext streamProcessorCmdQueue;
    protected final LogStreamReader logStreamReader;
    protected final LogStreamWriter logStreamWriter;
    protected final SnapshotPolicy snapshotPolicy;
    protected final SnapshotStorage snapshotStorage;
    protected final ActorScheduler actorScheduler;
    protected ActorReference actorRef;
    protected final EventFilter eventFilter;
    protected final EventFilter reprocessingEventFilter;
    protected final boolean isReadOnlyProcessor;
    protected final State<Context> openingState = new OpeningState();
    protected final State<Context> openedState = new OpenedState();
    protected final State<Context> processState = new ProcessState();
    protected final State<Context> snapshottingState = new SnapshottingState();
    protected final State<Context> recoveringState = new RecoveringState();
    protected final State<Context> prepareReprocessingState = new PrepareReprocessingState();
    protected final State<Context> reprocessingState = new ReprocessingState();
    protected final State<Context> closingSnapshottingState = new ClosingSnapshottingState();
    protected final State<Context> closingState = new ClosingState();
    protected final State<Context> closedState = new ClosedState();
    protected final State<Context> failedState = new FailedState();
    protected final StateMachineAgent<Context> stateMachineAgent = new StateMachineAgent<>(StateMachine.builder(stateMachine -> {
        return new Context(stateMachine);
    }).initialState(this.closedState).from(this.openingState).take(0).to(this.recoveringState).from(this.openingState).take(TRANSITION_FAIL).to(this.failedState).from(this.recoveringState).take(0).to(this.prepareReprocessingState).from(this.recoveringState).take(TRANSITION_FAIL).to(this.failedState).from(this.prepareReprocessingState).take(0).to(this.openedState).from(this.prepareReprocessingState).take(TRANSITION_REPROCESS).to(this.reprocessingState).from(this.prepareReprocessingState).take(TRANSITION_FAIL).to(this.failedState).from(this.reprocessingState).take(0).to(this.openedState).from(this.reprocessingState).take(TRANSITION_FAIL).to(this.failedState).from(this.openedState).take(TRANSITION_PROCESS).to(this.processState).from(this.openedState).take(2).to(this.closingSnapshottingState).from(this.openedState).take(TRANSITION_FAIL).to(this.failedState).from(this.processState).take(0).to(this.openedState).from(this.processState).take(TRANSITION_SNAPSHOT).to(this.snapshottingState).from(this.processState).take(TRANSITION_FAIL).to(this.failedState).from(this.processState).take(2).to(this.closingSnapshottingState).from(this.snapshottingState).take(0).to(this.openedState).from(this.snapshottingState).take(TRANSITION_FAIL).to(this.failedState).from(this.snapshottingState).take(2).to(this.closingSnapshottingState).from(this.failedState).take(2).to(this.closingState).from(this.failedState).take(1).to(this.openedState).from(this.failedState).take(TRANSITION_RECOVER).to(this.recoveringState).from(this.closingSnapshottingState).take(0).to(this.closingState).from(this.closingSnapshottingState).take(TRANSITION_FAIL).to(this.closingState).from(this.closingState).take(0).to(this.closedState).from(this.closedState).take(1).to(this.openingState).build());
    protected final LogStreamFailureListener logStreamFailureListener = new StreamFailureListener();
    protected final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final BiConsumer<Context, Exception> stateFailureHandler = (context, exc) -> {
        LOG.error("Stream processor '{}' failed.", name(), exc);
        context.take(TRANSITION_FAIL);
        context.completeFutureExceptionally(exc);
    };

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$ClosedState.class */
    private class ClosedState implements WaitState<Context> {
        private ClosedState() {
        }

        public void work(Context context) {
            if (StreamProcessorController.this.isRunning.compareAndSet(true, false)) {
                context.completeFuture();
                StreamProcessorController.this.actorRef.close();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$ClosingSnapshottingState.class */
    private class ClosingSnapshottingState implements State<Context> {
        private ClosingSnapshottingState() {
        }

        public int doWork(Context context) throws Exception {
            if (context.getEvent() != null) {
                StreamProcessorController.this.ensureSnapshotWritten(context);
            }
            context.take(0);
            return 0 + 1;
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$ClosingState.class */
    private class ClosingState implements TransitionState<Context> {
        private ClosingState() {
        }

        public void work(Context context) {
            StreamProcessorController.this.streamProcessor.onClose();
            StreamProcessorController.this.streamProcessorContext.getLogStreamReader().close();
            StreamProcessorController.this.streamProcessorContext.getLogStream().removeFailureListener(StreamProcessorController.this.logStreamFailureListener);
            context.take(0);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$Context.class */
    public class Context extends SimpleStateMachineContext {
        private LoggedEvent event;
        private long lastSuccessfulProcessedEventPosition;
        private long lastWrittenEventPosition;
        private long lastSourceEventPosition;
        private long snapshotPosition;
        private long failedEventPosition;
        private CompletableFuture<Void> future;

        Context(StateMachine<Context> stateMachine) {
            super(stateMachine);
            this.lastSuccessfulProcessedEventPosition = -1L;
            this.lastWrittenEventPosition = -1L;
            this.lastSourceEventPosition = -1L;
            this.snapshotPosition = -1L;
            this.failedEventPosition = -1L;
        }

        public LoggedEvent getEvent() {
            return this.event;
        }

        public void setEvent(LoggedEvent loggedEvent) {
            this.event = loggedEvent;
        }

        public void completeFuture() {
            if (this.future != null) {
                this.future.complete(null);
                this.future = null;
            }
        }

        public void completeFutureExceptionally(Throwable th) {
            if (this.future != null) {
                this.future.completeExceptionally(th);
                this.future = null;
            }
        }

        public void setFuture(CompletableFuture<Void> completableFuture) {
            this.future = completableFuture;
        }

        public long getLastSuccessfulProcessedEventPosition() {
            return this.lastSuccessfulProcessedEventPosition;
        }

        public void setLastSuccessfulProcessedEventPosition(long j) {
            this.lastSuccessfulProcessedEventPosition = j;
        }

        public long getLastWrittenEventPosition() {
            return this.lastWrittenEventPosition;
        }

        public void setLastWrittenEventPosition(long j) {
            this.lastWrittenEventPosition = j;
        }

        public long getFailedEventPosition() {
            return this.failedEventPosition;
        }

        public void setFailedEventPosition(long j) {
            this.failedEventPosition = j;
        }

        public void setSnapshotPosition(long j) {
            this.snapshotPosition = j;
        }

        public long getSnapshotPosition() {
            return this.snapshotPosition;
        }

        public long getLastSourceEventPosition() {
            return this.lastSourceEventPosition;
        }

        public void setLastSourceEventPosition(long j) {
            this.lastSourceEventPosition = j;
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$FailedState.class */
    private class FailedState implements WaitState<Context> {
        private FailedState() {
        }

        public void work(Context context) {
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$OpenedState.class */
    private class OpenedState implements State<Context> {
        private OpenedState() {
        }

        public int doWork(Context context) {
            int doWork = 0 + StreamProcessorController.this.streamProcessorCmdQueue.doWork();
            if (!StreamProcessorController.this.streamProcessor.isSuspended() && StreamProcessorController.this.logStreamReader.hasNext()) {
                doWork++;
                LoggedEvent next = StreamProcessorController.this.logStreamReader.next();
                context.setEvent(next);
                if (StreamProcessorController.this.eventFilter == null || StreamProcessorController.this.eventFilter.applies(next)) {
                    context.take(StreamProcessorController.TRANSITION_PROCESS);
                }
            }
            return doWork;
        }

        public void onFailure(Context context, Exception exc) {
            StreamProcessorController.this.stateFailureHandler.accept(context, exc);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$OpeningState.class */
    private class OpeningState implements TransitionState<Context> {
        private OpeningState() {
        }

        public void work(Context context) {
            LogStream logStream = StreamProcessorController.this.streamProcessorContext.getLogStream();
            StreamProcessorController.this.logStreamReader.wrap(logStream);
            StreamProcessorController.this.logStreamWriter.wrap(logStream);
            logStream.removeFailureListener(StreamProcessorController.this.logStreamFailureListener);
            logStream.registerFailureListener(StreamProcessorController.this.logStreamFailureListener);
            context.take(0);
        }

        public void onFailure(Context context, Exception exc) {
            StreamProcessorController.this.stateFailureHandler.accept(context, exc);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$PrepareReprocessingState.class */
    private class PrepareReprocessingState implements State<Context> {
        private PrepareReprocessingState() {
        }

        public int doWork(Context context) throws Exception {
            if (StreamProcessorController.this.isReadOnlyProcessor || !StreamProcessorController.this.logStreamReader.hasNext()) {
                context.take(0);
                context.completeFuture();
                return 1;
            }
            long findLastSourceEvent = findLastSourceEvent(context);
            StreamProcessorController.this.logStreamReader.seek(context.snapshotPosition + 1);
            if (findLastSourceEvent <= context.snapshotPosition) {
                context.take(0);
                context.completeFuture();
                return 1;
            }
            context.setLastSourceEventPosition(findLastSourceEvent);
            context.take(StreamProcessorController.TRANSITION_REPROCESS);
            context.completeFuture();
            return 1;
        }

        private long findLastSourceEvent(Context context) {
            long j = context.snapshotPosition;
            while (StreamProcessorController.this.logStreamReader.hasNext()) {
                LoggedEvent next = StreamProcessorController.this.logStreamReader.next();
                if (next.getProducerId() == StreamProcessorController.this.streamProcessorContext.getId() && (StreamProcessorController.this.reprocessingEventFilter == null || StreamProcessorController.this.reprocessingEventFilter.applies(next))) {
                    long sourceEventPosition = next.getSourceEventPosition();
                    if (sourceEventPosition > 0 && sourceEventPosition > j) {
                        j = sourceEventPosition;
                    }
                }
            }
            return j;
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$ProcessState.class */
    private class ProcessState extends ComposedState<Context> {
        private EventProcessor eventProcessor;
        private long eventPosition;
        private ComposedState.Step<Context> processEventStep;
        private ComposedState.Step<Context> sideEffectsStep;
        private ComposedState.Step<Context> writeEventStep;
        private ComposedState.FailSafeStep<Context> updateStateStep;

        private ProcessState() {
            this.processEventStep = context -> {
                boolean z = false;
                this.eventProcessor = StreamProcessorController.this.streamProcessor.onEvent(context.getEvent());
                if (this.eventProcessor != null) {
                    this.eventProcessor.processEvent();
                    z = true;
                } else {
                    context.take(0);
                }
                return z;
            };
            this.sideEffectsStep = context2 -> {
                return this.eventProcessor.executeSideEffects();
            };
            this.writeEventStep = context3 -> {
                StreamProcessorController.this.logStreamWriter.producerId(StreamProcessorController.this.streamProcessorContext.getId()).sourceEvent(StreamProcessorController.this.streamProcessorContext.getLogStream().getPartitionId(), context3.getEvent().getPosition());
                this.eventPosition = this.eventProcessor.writeEvent(StreamProcessorController.this.logStreamWriter);
                return this.eventPosition >= 0;
            };
            this.updateStateStep = context4 -> {
                this.eventProcessor.updateState();
                StreamProcessorController.this.streamProcessor.afterEvent();
                context4.setLastSuccessfulProcessedEventPosition(context4.event.getPosition());
                boolean z = this.eventPosition > 0;
                if (z) {
                    context4.setLastWrittenEventPosition(this.eventPosition);
                }
                if (z && StreamProcessorController.this.snapshotPolicy.apply(context4.getEvent().getPosition())) {
                    context4.take(StreamProcessorController.TRANSITION_SNAPSHOT);
                } else {
                    context4.take(0);
                }
            };
        }

        protected List<ComposedState.Step<Context>> steps() {
            return Arrays.asList(this.processEventStep, this.sideEffectsStep, this.writeEventStep, this.updateStateStep);
        }

        public void onFailure(Context context, Exception exc) {
            StreamProcessorController.LOG.error("The log stream processor '{}' failed to process event. It stop processing further events.", StreamProcessorController.this.name(), exc);
            context.take(StreamProcessorController.TRANSITION_FAIL);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$RecoveringState.class */
    private class RecoveringState implements TransitionState<Context> {
        private RecoveringState() {
        }

        public void work(Context context) throws Exception {
            StreamProcessorController.this.streamProcessor.getStateResource().reset();
            long j = -1;
            ReadableSnapshot lastSnapshot = StreamProcessorController.this.snapshotStorage.getLastSnapshot(StreamProcessorController.this.streamProcessorContext.getName());
            if (lastSnapshot != null) {
                lastSnapshot.recoverFromSnapshot(StreamProcessorController.this.streamProcessor.getStateResource());
                j = lastSnapshot.getPosition();
                if (!StreamProcessorController.this.logStreamReader.seek(j) || !StreamProcessorController.this.logStreamReader.hasNext()) {
                    throw new IllegalStateException(String.format("Stream processor '%s' failed to recover. Cannot find event with the snapshot position in target log stream.", StreamProcessorController.this.name()));
                }
                StreamProcessorController.this.logStreamReader.seek(j + 1);
            }
            StreamProcessorController.this.streamProcessor.onOpen(StreamProcessorController.this.streamProcessorContext);
            context.setSnapshotPosition(j);
            context.take(0);
        }

        public void onFailure(Context context, Exception exc) {
            StreamProcessorController.this.stateFailureHandler.accept(context, exc);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$ReprocessingState.class */
    private class ReprocessingState implements State<Context> {
        private ReprocessingState() {
        }

        public int doWork(Context context) {
            if (!StreamProcessorController.this.logStreamReader.hasNext()) {
                throw new IllegalStateException(String.format(StreamProcessorController.ERROR_MESSAGE_REPROCESSING_FAILED, StreamProcessorController.this.streamProcessorContext.getName(), Long.valueOf(context.lastSourceEventPosition)));
            }
            long lastSourceEventPosition = context.getLastSourceEventPosition();
            LoggedEvent next = StreamProcessorController.this.logStreamReader.next();
            long position = next.getPosition();
            if (position > lastSourceEventPosition) {
                throw new IllegalStateException(String.format(StreamProcessorController.ERROR_MESSAGE_REPROCESSING_FAILED, StreamProcessorController.this.streamProcessorContext.getName(), Long.valueOf(lastSourceEventPosition)));
            }
            reprocessEvent(next);
            if (position != lastSourceEventPosition) {
                return 1;
            }
            context.take(0);
            context.completeFuture();
            return 1;
        }

        private void reprocessEvent(LoggedEvent loggedEvent) {
            if (StreamProcessorController.this.eventFilter == null || StreamProcessorController.this.eventFilter.applies(loggedEvent)) {
                try {
                    EventProcessor onEvent = StreamProcessorController.this.streamProcessor.onEvent(loggedEvent);
                    if (onEvent != null) {
                        onEvent.processEvent();
                        onEvent.updateState();
                        StreamProcessorController.this.streamProcessor.afterEvent();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(String.format("Stream processor '%s' failed to reprocess event: %s", StreamProcessorController.this.streamProcessorContext.getName(), loggedEvent, e));
                }
            }
        }

        public void onFailure(Context context, Exception exc) {
            StreamProcessorController.this.stateFailureHandler.accept(context, exc);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$SnapshottingState.class */
    private class SnapshottingState implements State<Context> {
        private SnapshottingState() {
        }

        public int doWork(Context context) {
            int i = 0;
            if (StreamProcessorController.this.ensureSnapshotWritten(context)) {
                context.take(0);
                i = 0 + 1;
            }
            return i;
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$StreamFailureListener.class */
    private class StreamFailureListener implements LogStreamFailureListener {
        private StreamFailureListener() {
        }

        @Override // io.zeebe.logstreams.log.LogStreamFailureListener
        public void onFailed(long j) {
            StreamProcessorController.this.stateMachineAgent.addCommand(context -> {
                if (context.tryTake(StreamProcessorController.TRANSITION_FAIL)) {
                    context.setFailedEventPosition(j);
                }
            });
        }

        @Override // io.zeebe.logstreams.log.LogStreamFailureListener
        public void onRecovered() {
            StreamProcessorController.this.stateMachineAgent.addCommand(context -> {
                long failedEventPosition = context.getFailedEventPosition();
                if (failedEventPosition >= 0) {
                    if (failedEventPosition <= context.getLastWrittenEventPosition()) {
                        context.take(StreamProcessorController.TRANSITION_RECOVER);
                    } else {
                        long position = context.event.getPosition();
                        if (position > context.lastSuccessfulProcessedEventPosition) {
                            StreamProcessorController.this.logStreamReader.seek(position);
                        }
                        context.take(1);
                    }
                }
                context.setFailedEventPosition(-1L);
            });
        }
    }

    public StreamProcessorController(StreamProcessorContext streamProcessorContext) {
        this.streamProcessorContext = streamProcessorContext;
        this.actorScheduler = streamProcessorContext.getTaskScheduler();
        this.streamProcessor = streamProcessorContext.getStreamProcessor();
        this.logStreamReader = streamProcessorContext.getLogStreamReader();
        this.logStreamWriter = streamProcessorContext.getLogStreamWriter();
        this.snapshotPolicy = streamProcessorContext.getSnapshotPolicy();
        this.snapshotStorage = streamProcessorContext.getSnapshotStorage();
        this.streamProcessorCmdQueue = streamProcessorContext.getStreamProcessorCmdQueue();
        this.eventFilter = streamProcessorContext.getEventFilter();
        this.reprocessingEventFilter = streamProcessorContext.getReprocessingEventFilter();
        this.isReadOnlyProcessor = streamProcessorContext.isReadOnlyProcessor();
    }

    public int doWork() {
        return this.stateMachineAgent.doWork();
    }

    public String name() {
        return this.streamProcessorContext.getName();
    }

    public CompletableFuture<Void> openAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(context -> {
            if (context.tryTake(1)) {
                context.setFuture(completableFuture);
            } else {
                completableFuture.completeExceptionally(new IllegalStateException("Cannot open stream processor."));
            }
        });
        if (this.isRunning.compareAndSet(false, true)) {
            try {
                this.actorRef = this.actorScheduler.schedule(this);
            } catch (Exception e) {
                this.isRunning.set(false);
                completableFuture.completeExceptionally(e);
            }
        }
        return completableFuture;
    }

    public int getPriority(long j) {
        return this.streamProcessor.getPriority(j);
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(context -> {
            if (context.tryTake(2)) {
                context.setFuture(completableFuture);
            } else {
                completableFuture.completeExceptionally(new IllegalStateException("Cannot close stream processor."));
            }
        });
        return completableFuture;
    }

    public boolean isOnRecover() {
        return this.stateMachineAgent.getCurrentState() == this.recoveringState || this.stateMachineAgent.getCurrentState() == this.prepareReprocessingState || this.stateMachineAgent.getCurrentState() == this.reprocessingState;
    }

    public boolean isOpen() {
        return this.stateMachineAgent.getCurrentState() == this.openedState || this.stateMachineAgent.getCurrentState() == this.processState || this.stateMachineAgent.getCurrentState() == this.snapshottingState;
    }

    public boolean isClosing() {
        return this.stateMachineAgent.getCurrentState() == this.closingState || this.stateMachineAgent.getCurrentState() == this.closingSnapshottingState;
    }

    public boolean isClosed() {
        return this.stateMachineAgent.getCurrentState() == this.closedState;
    }

    public boolean isFailed() {
        return this.stateMachineAgent.getCurrentState() == this.failedState;
    }

    public EventFilter getEventFilter() {
        return this.eventFilter;
    }

    public EventFilter getReprocessingEventFilter() {
        return this.reprocessingEventFilter;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean ensureSnapshotWritten(Context context) {
        boolean z = false;
        long lastSuccessfulProcessedEventPosition = context.getLastSuccessfulProcessedEventPosition();
        long lastWrittenEventPosition = context.getLastWrittenEventPosition();
        long commitPosition = this.streamProcessorContext.getLogStream().getCommitPosition();
        if (lastSuccessfulProcessedEventPosition <= context.getSnapshotPosition()) {
            z = true;
        } else if (commitPosition >= lastWrittenEventPosition) {
            writeSnapshot(context, lastSuccessfulProcessedEventPosition);
            z = true;
        }
        return z;
    }

    protected void writeSnapshot(Context context, long j) {
        SnapshotWriter snapshotWriter = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            String name = this.streamProcessorContext.getName();
            LOG.info("Write snapshot for stream processor {} at event position {}.", name, Long.valueOf(j));
            snapshotWriter = this.snapshotStorage.createSnapshot(name, j);
            snapshotWriter.writeSnapshot(this.streamProcessor.getStateResource());
            snapshotWriter.commit();
            LOG.info("Creation of snapshot {} took {} ms.", name, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            context.setSnapshotPosition(j);
        } catch (Exception e) {
            LOG.error("Stream processor '{}' failed. Can not write snapshot.", name(), e);
            if (snapshotWriter != null) {
                snapshotWriter.abort();
            }
        }
    }
}
