/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

public class StreamProcessorHealthTest {
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1L);
    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    private StreamProcessor streamProcessor;
    private TypedStreamWriter mockedLogStreamWriter;
    private AtomicBoolean shouldFlushThrowException;
    private AtomicInteger invocation;
    private AtomicBoolean shouldFailErrorHandlingInTransaction;
    private AtomicBoolean shouldProcessingThrowException;

    @Before
    public void before() {
        this.invocation = new AtomicInteger();
        this.shouldFlushThrowException = new AtomicBoolean();
        this.shouldFailErrorHandlingInTransaction = new AtomicBoolean();
        this.shouldProcessingThrowException = new AtomicBoolean();
    }

    @After
    public void tearDown() {
        this.shouldFlushThrowException.set(false);
        this.shouldFailErrorHandlingInTransaction.set(false);
        this.shouldProcessingThrowException.set(false);
    }

    @Test
    public void shouldBeHealthyOnStart() {
        this.streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class)));
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY);
    }

    @Test
    public void shouldMarkUnhealthyWhenOnErrorHandlingWriteEventFails() {
        this.streamProcessor = this.getErrorProneStreamProcessor();
        HealthStatusCheck healthStatusCheck = HealthStatusCheck.of(this.streamProcessor);
        this.streamProcessorRule.getActorSchedulerRule().submitActor((Actor)healthStatusCheck);
        TestUtil.waitUntil(() -> healthStatusCheck.hasHealthStatus(HealthStatus.HEALTHY));
        this.shouldFlushThrowException.set(true);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> healthStatusCheck.hasHealthStatus(HealthStatus.UNHEALTHY));
    }

    @Test
    public void shouldMarkUnhealthyWhenProcessingOnWriteEventFails() {
        this.streamProcessor = this.getErrorProneStreamProcessor();
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY);
        this.shouldProcessingThrowException.set(false);
        this.shouldFlushThrowException.set(true);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.UNHEALTHY);
    }

    @Test
    public void shouldMarkUnhealthyWhenExceptionErrorHandlingInTransaction() {
        this.shouldProcessingThrowException.set(true);
        this.streamProcessor = this.getErrorProneStreamProcessor();
        HealthStatusCheck healthStatusCheck = HealthStatusCheck.of(this.streamProcessor);
        this.streamProcessorRule.getActorSchedulerRule().submitActor((Actor)healthStatusCheck);
        TestUtil.waitUntil(() -> healthStatusCheck.hasHealthStatus(HealthStatus.HEALTHY));
        this.shouldFailErrorHandlingInTransaction.set(true);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> healthStatusCheck.hasHealthStatus(HealthStatus.UNHEALTHY));
    }

    @Test
    public void shouldBecomeHealthyWhenErrorIsResolved() {
        this.shouldFlushThrowException.set(true);
        this.streamProcessor = this.getErrorProneStreamProcessor();
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.UNHEALTHY);
        this.shouldFlushThrowException.set(false);
        TestUtil.waitUntil(() -> this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY);
    }

    private StreamProcessor getErrorProneStreamProcessor() {
        this.streamProcessor = this.streamProcessorRule.startTypedStreamProcessorNotAwaitOpening(processingContext -> {
            MutableZeebeState zeebeState = processingContext.getZeebeState();
            this.mockedLogStreamWriter = new WrappedStreamWriter();
            processingContext.logStreamWriter(this.mockedLogStreamWriter);
            return TypedRecordProcessors.processors((KeyGenerator)zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

                public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                    StreamProcessorHealthTest.this.invocation.getAndIncrement();
                    if (StreamProcessorHealthTest.this.shouldProcessingThrowException.get()) {
                        throw new RuntimeException("Expected failure on processing");
                    }
                }
            });
        });
        return this.streamProcessor;
    }

    private final class WrappedStreamWriter
    implements TypedStreamWriter {
        private WrappedStreamWriter() {
        }

        public void appendRejection(TypedRecord<? extends RecordValue> command, RejectionType type, String reason) {
        }

        public void configureSourceContext(long sourceRecordPosition) {
        }

        public void appendFollowUpEvent(long key, Intent intent, RecordValue value) {
            if (StreamProcessorHealthTest.this.shouldFailErrorHandlingInTransaction.get()) {
                throw new RuntimeException("Expected failure on append followup event");
            }
        }

        public void appendNewCommand(Intent intent, RecordValue value) {
        }

        public void appendFollowUpCommand(long key, Intent intent, RecordValue value) {
        }

        public void reset() {
        }

        public long flush() {
            if (StreamProcessorHealthTest.this.shouldFlushThrowException.get()) {
                throw new RuntimeException("Expected failure on flush");
            }
            return 1L;
        }
    }

    private static final class HealthStatusCheck
    extends Actor {
        private final StreamProcessor streamProcessor;

        private HealthStatusCheck(StreamProcessor streamProcessor) {
            this.streamProcessor = streamProcessor;
        }

        public static HealthStatusCheck of(StreamProcessor streamProcessor) {
            return new HealthStatusCheck(streamProcessor);
        }

        public boolean hasHealthStatus(HealthStatus healthStatus) {
            return (Boolean)this.actor.call(() -> this.streamProcessor.getHealthStatus() == healthStatus).join(5L, TimeUnit.SECONDS);
        }
    }
}

