package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.exporter.ExporterObjectMapper;
import io.zeebe.broker.exporter.record.value.DeploymentRecordValueImpl;
import io.zeebe.broker.exporter.record.value.IncidentRecordValueImpl;
import io.zeebe.broker.exporter.record.value.JobBatchRecordValueImpl;
import io.zeebe.broker.exporter.record.value.JobRecordValueImpl;
import io.zeebe.broker.exporter.record.value.MessageRecordValueImpl;
import io.zeebe.broker.exporter.record.value.MessageSubscriptionRecordValueImpl;
import io.zeebe.broker.exporter.record.value.RaftRecordValueImpl;
import io.zeebe.broker.exporter.record.value.VariableDocumentRecordValueImpl;
import io.zeebe.broker.exporter.record.value.VariableRecordValueImpl;
import io.zeebe.broker.exporter.record.value.WorkflowInstanceCreationRecordValueImpl;
import io.zeebe.broker.exporter.record.value.WorkflowInstanceRecordValueImpl;
import io.zeebe.broker.exporter.record.value.WorkflowInstanceSubscriptionRecordValueImpl;
import io.zeebe.broker.exporter.record.value.deployment.DeployedWorkflowImpl;
import io.zeebe.broker.exporter.record.value.deployment.DeploymentResourceImpl;
import io.zeebe.broker.exporter.record.value.job.HeadersImpl;
import io.zeebe.broker.exporter.record.value.raft.RaftMemberImpl;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.broker.exporter.stream.ExporterRecord;
import io.zeebe.broker.exporter.util.ControlledTestExporter;
import io.zeebe.broker.exporter.util.PojoConfigurationExporter;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.state.DefaultZeebeDbFactory;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessorRule;
import io.zeebe.db.ZeebeDb;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.exporter.api.record.Record;
import io.zeebe.exporter.api.record.RecordValue;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.protocol.BpmnElementType;
import io.zeebe.protocol.ErrorType;
import io.zeebe.protocol.VariableDocumentUpdateSemantic;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.zeebe.protocol.impl.record.value.deployment.ResourceType;
import io.zeebe.protocol.impl.record.value.deployment.Workflow;
import io.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.zeebe.protocol.impl.record.value.variable.VariableRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceCreationRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.ExporterIntent;
import io.zeebe.protocol.intent.IncidentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.intent.RaftIntent;
import io.zeebe.protocol.intent.VariableDocumentIntent;
import io.zeebe.protocol.intent.VariableIntent;
import io.zeebe.protocol.intent.WorkflowInstanceCreationIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.raft.event.RaftConfigurationEvent;
import io.zeebe.raft.event.RaftConfigurationEventMember;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.test.util.collection.Maps;
import io.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/exporter/stream/ExporterStreamProcessorTest.class */
public class ExporterStreamProcessorTest {
    private static final int PARTITION_ID = 1;
    private static final ExporterObjectMapper OBJECT_MAPPER = new ExporterObjectMapper();
    private static final Map<String, Object> VARIABLES = Collections.singletonMap("foo", TestJarExporter.FOO);
    private static final String VARIABLES_JSON = OBJECT_MAPPER.toJson(VARIABLES);
    private static final DirectBuffer VARIABLES_MSGPACK = new UnsafeBuffer(OBJECT_MAPPER.toMsgpack(VARIABLES));
    private static final Map<String, Object> CUSTOM_HEADERS = Collections.singletonMap("workerVersion", 42);
    private static final DirectBuffer CUSTOM_HEADERS_MSGPACK = new UnsafeBuffer(OBJECT_MAPPER.toMsgpack(CUSTOM_HEADERS));

    @Rule
    public StreamProcessorRule rule = new StreamProcessorRule(1, DefaultZeebeDbFactory.defaultFactory(ExporterColumnFamilies.class));
    private List<ControlledTestExporter> exporters;
    private ExporterStreamProcessorState state;

    @Test
    public void shouldConfigureAllExportersProperlyOnStart() throws InterruptedException {
        Map[] mapArr = {newConfig("foo", TestJarExporter.FOO), newConfig(TestJarExporter.FOO, "foo")};
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(mapArr);
        CountDownLatch countDownLatch = new CountDownLatch(this.exporters.size());
        Iterator<ControlledTestExporter> it = this.exporters.iterator();
        while (it.hasNext()) {
            it.next().onOpen(controller -> {
                countDownLatch.countDown();
            });
        }
        this.rule.initStreamProcessor((zeebeDb, dbContext) -> {
            return new ExporterStreamProcessor(zeebeDb, dbContext, 1, createMockedExporters);
        }).start();
        Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        for (int i = 0; i < this.exporters.size(); i++) {
            Assertions.assertThat(this.exporters.get(i).getContext().getConfiguration().getId()).isEqualTo(createMockedExporters.get(i).getId());
            Assertions.assertThat(this.exporters.get(i).getContext().getConfiguration().getArguments()).isEqualTo(mapArr[i]);
            Assertions.assertThat(this.exporters.get(i).getContext().getLogger()).isNotNull();
            Assertions.assertThat(this.exporters.get(i).getController()).isNotNull();
        }
    }

    @Test
    public void shouldInstantiateConfigurationClass() {
        HashMap hashMap = new HashMap();
        hashMap.put(TestJarExporter.FOO, "baz");
        hashMap.put("y", Double.valueOf(32.12d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo", TestJarExporter.FOO);
        hashMap2.put("x", Integer.valueOf(WorkflowInstanceStreamProcessorRule.WORKFLOW_KEY));
        hashMap2.put("nested", hashMap);
        ExporterDescriptor exporterDescriptor = new ExporterDescriptor("instantiateConfiguration", PojoConfigurationExporter.class, hashMap2);
        this.rule.runStreamProcessor((zeebeDb, dbContext) -> {
            return new ExporterStreamProcessor(zeebeDb, dbContext, 1, Collections.singletonList(exporterDescriptor));
        });
        PojoConfigurationExporter.PojoExporterConfiguration pojoExporterConfiguration = PojoConfigurationExporter.configuration;
        Assertions.assertThat(pojoExporterConfiguration.foo).isEqualTo(TestJarExporter.FOO);
        Assertions.assertThat(pojoExporterConfiguration.x).isEqualTo(WorkflowInstanceStreamProcessorRule.WORKFLOW_KEY);
        Assertions.assertThat(pojoExporterConfiguration.nested.bar).isEqualTo("baz");
        Assertions.assertThat(pojoExporterConfiguration.nested.y).isEqualTo(32.12d);
    }

    @Test
    public void shouldCloseAllExportersOnClose() {
        boolean[] zArr = {false, false};
        StreamProcessorControl runStreamProcessor = this.rule.runStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, zArr.length);
        });
        this.exporters.get(0).onClose(() -> {
            zArr[0] = true;
        });
        this.exporters.get(1).onClose(() -> {
            zArr[1] = true;
        });
        for (int i = 0; i < this.exporters.size(); i++) {
            Assertions.assertThat(zArr[i]).isFalse();
        }
        runStreamProcessor.close();
        for (int i2 = 0; i2 < this.exporters.size(); i2++) {
            Assertions.assertThat(zArr[i2]).isTrue();
        }
    }

    @Test
    public void shouldRestartEachExporterFromCorrectPosition() {
        StreamProcessorControl runStreamProcessor = this.rule.runStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, 2);
        });
        AtomicLong atomicLong = new AtomicLong();
        runStreamProcessor.blockAfterEvent(loggedEvent -> {
            return atomicLong.incrementAndGet() == 2;
        });
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        runStreamProcessor.getClass();
        TestUtil.waitUntil(runStreamProcessor::isBlocked);
        runStreamProcessor.unblock();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 2;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 2;
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent2);
        this.exporters.get(1).getController().updateLastExportedRecordPosition(writeEvent);
        runStreamProcessor.close();
        runStreamProcessor.blockAfterEvent(loggedEvent2 -> {
            return loggedEvent2.getPosition() == writeEvent2;
        });
        runStreamProcessor.start();
        runStreamProcessor.getClass();
        TestUtil.waitUntil(runStreamProcessor::isBlocked);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).hasSize(0);
        Assertions.assertThat(this.exporters.get(1).getExportedRecords()).hasSize(1);
        Assertions.assertThat(this.exporters.get(1).getExportedRecords().get(0).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldRecoverPositionsFromLogStream() {
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(1);
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor((zeebeDb, dbContext) -> {
            ExporterStreamProcessor exporterStreamProcessor = new ExporterStreamProcessor(zeebeDb, dbContext, 1, createMockedExporters);
            this.state = exporterStreamProcessor.getState();
            return exporterStreamProcessor;
        });
        long writeEvent = writeEvent();
        long writeExporterEvent = writeExporterEvent(createMockedExporters.get(0).getId(), writeEvent);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeExporterEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Assertions.assertThat(this.state.getPosition(createMockedExporters.get(0).getId())).isEqualTo(writeEvent);
    }

    @Test
    public void shouldRetryExportingOnException() {
        StreamProcessorControl runStreamProcessor = this.rule.runStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, 3);
        });
        AtomicLong atomicLong = new AtomicLong(3L);
        this.exporters.get(1).onExport(record -> {
            if (atomicLong.getAndDecrement() > 0) {
                throw new RuntimeException("Export failed (expected)");
            }
        });
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        runStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent2;
        });
        runStreamProcessor.getClass();
        TestUtil.waitUntil(runStreamProcessor::isBlocked);
        Iterator<ControlledTestExporter> it = this.exporters.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next().getExportedRecords()).extracting("position").containsExactly(new Object[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
        }
    }

    @Test
    public void shouldExecuteScheduledTask() throws Exception {
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Duration ofSeconds = Duration.ofSeconds(10L);
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        controlledTestExporter.onExport(record -> {
            Controller controller = controlledTestExporter.getController();
            countDownLatch.getClass();
            controller.scheduleTask(ofSeconds, countDownLatch::countDown);
            countDownLatch2.countDown();
        });
        this.rule.runStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, (List<ExporterDescriptor>) createMockedExporters);
        });
        writeEvent();
        countDownLatch2.await();
        this.rule.getClock().addTime(ofSeconds.plusSeconds(20L));
        countDownLatch.await();
    }

    @Test
    public void shouldNotExportExporterRecords() {
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(1);
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor((zeebeDb, dbContext) -> {
            return new ExporterStreamProcessor(zeebeDb, dbContext, 1, createMockedExporters);
        });
        long writeEvent = writeEvent();
        long writeExporterEvent = writeExporterEvent(createMockedExporters.get(0).getId(), writeEvent);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeExporterEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).hasSize(1);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords().get(0).getPosition()).isEqualTo(writeEvent);
    }

    @Test
    public void shouldExportDeploymentEvent() {
        ResourceType resourceType = ResourceType.BPMN_XML;
        DirectBuffer wrapString = BufferUtil.wrapString("contents");
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResourceName(BufferUtil.wrapString("resource")).setResourceType(resourceType).setResource(wrapString);
        ((Workflow) deploymentRecord.workflows().add()).setBpmnProcessId(BufferUtil.wrapString("testProcess")).setKey(123L).setResourceName(BufferUtil.wrapString("resource")).setVersion(12);
        assertRecordExported(DeploymentIntent.CREATE, deploymentRecord, new DeploymentRecordValueImpl(OBJECT_MAPPER, Collections.singletonList(new DeployedWorkflowImpl("testProcess", "resource", 123L, 12)), Collections.singletonList(new DeploymentResourceImpl(BufferUtil.bufferAsArray(wrapString), io.zeebe.exporter.api.record.value.deployment.ResourceType.BPMN_XML, "resource"))));
    }

    @Test
    public void shouldExportIncidentRecord() {
        ErrorType errorType = ErrorType.IO_MAPPING_ERROR;
        assertRecordExported(IncidentIntent.CREATED, new IncidentRecord().setElementInstanceKey(34L).setWorkflowInstanceKey(10L).setElementId(BufferUtil.wrapString("activity")).setBpmnProcessId(BufferUtil.wrapString("process")).setErrorMessage("error").setErrorType(errorType).setJobKey(123L).setVariableScopeKey(34L), new IncidentRecordValueImpl(OBJECT_MAPPER, errorType.name(), "error", "process", "activity", 10L, 34L, 123L, 34L));
    }

    @Test
    public void shouldExportJobRecord() {
        JobRecord errorMessage = new JobRecord().setWorker(BufferUtil.wrapString("myWorker")).setType(BufferUtil.wrapString("myType")).setVariables(VARIABLES_MSGPACK).setRetries(12).setDeadline(13L).setErrorMessage("failed message");
        errorMessage.getHeaders().setBpmnProcessId(BufferUtil.wrapString("test-process")).setWorkflowKey(13L).setWorkflowDefinitionVersion(12).setWorkflowInstanceKey(1234L).setElementId(BufferUtil.wrapString("activity")).setElementInstanceKey(123L);
        errorMessage.setCustomHeaders(CUSTOM_HEADERS_MSGPACK);
        assertRecordExported(JobIntent.CREATED, errorMessage, new JobRecordValueImpl(OBJECT_MAPPER, VARIABLES_JSON, "myType", "myWorker", Instant.ofEpochMilli(13L), new HeadersImpl("test-process", "activity", 123L, 1234L, 13L, 12), CUSTOM_HEADERS, 12, "failed message"));
    }

    @Test
    public void shouldExportMessageRecord() {
        assertRecordExported(MessageIntent.PUBLISHED, new MessageRecord().setCorrelationKey(BufferUtil.wrapString("test-key")).setName(BufferUtil.wrapString("test-message")).setVariables(VARIABLES_MSGPACK).setTimeToLive(12L).setMessageId(BufferUtil.wrapString("test-id")), new MessageRecordValueImpl(OBJECT_MAPPER, VARIABLES_JSON, "test-message", "test-id", "test-key", 12L));
    }

    @Test
    public void shouldExportMessageSubscriptionRecord() {
        assertRecordExported(MessageSubscriptionIntent.CORRELATE, new MessageSubscriptionRecord().setElementInstanceKey(1L).setMessageName(BufferUtil.wrapString("name")).setWorkflowInstanceKey(1L).setCorrelationKey(BufferUtil.wrapString("key")), new MessageSubscriptionRecordValueImpl(OBJECT_MAPPER, "name", "key", 1L, 1L));
    }

    @Test
    public void shouldExportRaftRecord() {
        List list = (List) IntStream.of(4).boxed().collect(Collectors.toList());
        RaftConfigurationEvent raftConfigurationEvent = new RaftConfigurationEvent();
        list.forEach(num -> {
            ((RaftConfigurationEventMember) raftConfigurationEvent.members().add()).setNodeId(num.intValue());
        });
        assertRecordExported(RaftIntent.MEMBER_ADDED, raftConfigurationEvent, new RaftRecordValueImpl(OBJECT_MAPPER, (List) list.stream().map((v1) -> {
            return new RaftMemberImpl(v1);
        }).collect(Collectors.toList())));
    }

    @Test
    public void shouldExportWorkflowInstanceRecord() {
        BpmnElementType bpmnElementType = BpmnElementType.SERVICE_TASK;
        assertRecordExported(WorkflowInstanceIntent.ELEMENT_ACTIVATING, new WorkflowInstanceRecord().setElementId("activity").setBpmnElementType(bpmnElementType).setBpmnProcessId(BufferUtil.wrapString("test-process")).setVersion(12).setWorkflowKey(13L).setWorkflowInstanceKey(1234L).setFlowScopeKey(123L), new WorkflowInstanceRecordValueImpl(OBJECT_MAPPER, "test-process", "activity", 12, 13L, 1234L, 123L, bpmnElementType));
    }

    @Test
    public void shouldExportWorkflowInstanceSubscriptionRecord() {
        assertRecordExported(WorkflowInstanceSubscriptionIntent.OPENED, new WorkflowInstanceSubscriptionRecord().setElementInstanceKey(123L).setMessageName(BufferUtil.wrapString("test-message")).setSubscriptionPartitionId(2).setWorkflowInstanceKey(1345L).setVariables(VARIABLES_MSGPACK), new WorkflowInstanceSubscriptionRecordValueImpl(OBJECT_MAPPER, VARIABLES_JSON, "test-message", 1345L, 123L));
    }

    @Test
    public void shouldExportJobBatchRecord() {
        JobBatchRecord truncated = new JobBatchRecord().setMaxJobsToActivate(1).setTimeout(2L).setType("type").setWorker("worker").setTruncated(true);
        ((LongValue) truncated.jobKeys().add()).setValue(3L);
        JobRecord jobRecord = (JobRecord) truncated.jobs().add();
        jobRecord.setWorker(BufferUtil.wrapString("worker")).setType(BufferUtil.wrapString("type")).setVariables(VARIABLES_MSGPACK).setRetries(3).setErrorMessage("failed message").setDeadline(1000L);
        jobRecord.getHeaders().setBpmnProcessId(BufferUtil.wrapString("test-process")).setWorkflowKey(13L).setWorkflowDefinitionVersion(12).setWorkflowInstanceKey(1234L).setElementId(BufferUtil.wrapString("activity")).setElementInstanceKey(123L);
        assertRecordExported(JobBatchIntent.ACTIVATED, truncated, new JobBatchRecordValueImpl(OBJECT_MAPPER, "type", "worker", Duration.ofMillis(2L), 1, Arrays.asList(3L), Arrays.asList(new JobRecordValueImpl(OBJECT_MAPPER, VARIABLES_JSON, "type", "worker", Instant.ofEpochMilli(1000L), new HeadersImpl("test-process", "activity", 123L, 1234L, 13L, 12), Collections.EMPTY_MAP, 3, "failed message")), truncated.getTruncated()));
    }

    @Test
    public void shouldExportVariableRecord() {
        assertRecordExported(VariableIntent.CREATED, new VariableRecord().setName(BufferUtil.wrapString("x")).setValue(MsgPackUtil.asMsgPack("1")).setScopeKey(3L).setWorkflowInstanceKey(2L).setWorkflowKey(4L), new VariableRecordValueImpl(OBJECT_MAPPER, "x", "1", 3L, 2L, 4L));
    }

    @Test
    public void shouldExportVariableDocumentRecord() {
        VariableDocumentUpdateSemantic variableDocumentUpdateSemantic = VariableDocumentUpdateSemantic.LOCAL;
        Map of = Maps.of(new Map.Entry[]{Assertions.entry("foo", TestJarExporter.FOO), Assertions.entry("baz", "boz")});
        assertRecordExported(VariableDocumentIntent.UPDATED, new VariableDocumentRecord().setScopeKey(1L).setUpdateSemantics(variableDocumentUpdateSemantic).setDocument(MsgPackUtil.asMsgPack(of)), new VariableDocumentRecordValueImpl(OBJECT_MAPPER, 1L, variableDocumentUpdateSemantic, of));
    }

    @Test
    public void shouldExportWorkflowInstanceCreationRecord() {
        Map of = Maps.of(new Map.Entry[]{Assertions.entry("foo", TestJarExporter.FOO), Assertions.entry("baz", "boz")});
        assertRecordExported(WorkflowInstanceCreationIntent.CREATED, new WorkflowInstanceCreationRecord().setBpmnProcessId("process").setKey(1L).setVersion(1).setVariables(MsgPackUtil.asMsgPack(of)).setInstanceKey(2L), new WorkflowInstanceCreationRecordValueImpl(OBJECT_MAPPER, "process", 1, 1L, 2L, of));
    }

    @Test
    public void shouldUpdateLastExportedPositionOnClose() {
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, 1);
        });
        initStreamProcessor.start();
        long writeEvent = writeEvent();
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        TestUtil.waitUntil(() -> {
            return controlledTestExporter.getExportedRecords().size() == 1;
        });
        controlledTestExporter.onClose(() -> {
            controlledTestExporter.getController().updateLastExportedRecordPosition(writeEvent);
        });
        initStreamProcessor.close();
        long writeEvent2 = writeEvent();
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent2;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        List<Record> exportedRecords = controlledTestExporter.getExportedRecords();
        Assertions.assertThat(exportedRecords).hasSize(1);
        Assertions.assertThat(exportedRecords.get(0).getPosition()).isEqualTo(writeEvent);
        ControlledTestExporter controlledTestExporter2 = this.exporters.get(0);
        Assertions.assertThat(controlledTestExporter).isNotEqualTo(controlledTestExporter2);
        List<Record> exportedRecords2 = controlledTestExporter2.getExportedRecords();
        Assertions.assertThat(exportedRecords2).hasSize(1);
        Assertions.assertThat(exportedRecords2.get(0).getPosition()).isEqualTo(writeEvent2);
    }

    private ExporterStreamProcessor createStreamProcessor(ZeebeDb zeebeDb, List<ExporterDescriptor> list) {
        return new ExporterStreamProcessor(zeebeDb, zeebeDb.createContext(), 1, list);
    }

    private ExporterStreamProcessor createStreamProcessor(ZeebeDb zeebeDb, int i) {
        return new ExporterStreamProcessor(zeebeDb, zeebeDb.createContext(), 1, createMockedExporters(i));
    }

    private List<ExporterDescriptor> createMockedExporters(int i) {
        return createMockedExporters(i, new Map[0]);
    }

    private List<ExporterDescriptor> createMockedExporters(Map... mapArr) {
        return createMockedExporters(mapArr.length, mapArr);
    }

    private List<ExporterDescriptor> createMockedExporters(int i, Map[] mapArr) {
        ArrayList arrayList = new ArrayList(i);
        this.exporters = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            Map map = mapArr.length > 0 ? mapArr[i2] : null;
            ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
            ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor(String.valueOf(i2), controlledTestExporter.getClass(), map));
            ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
                return controlledTestExporter;
            }).when(exporterDescriptor)).newInstance();
            this.exporters.add(controlledTestExporter);
            arrayList.add(exporterDescriptor);
        }
        return arrayList;
    }

    private long writeEvent() {
        return this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
    }

    private long writeExporterEvent(String str, long j) {
        UnpackedObject exporterRecord = new ExporterRecord();
        ExporterRecord.ExporterPosition exporterPosition = (ExporterRecord.ExporterPosition) exporterRecord.getPositions().add();
        exporterPosition.setId(str);
        exporterPosition.setPosition(j);
        return this.rule.writeEvent(ExporterIntent.EXPORTED, exporterRecord);
    }

    private Map<String, Object> newConfig(String... strArr) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < strArr.length; i += 2) {
            hashMap.put(strArr[i], strArr[i + 1]);
        }
        return hashMap;
    }

    private void assertRecordExported(Intent intent, UnpackedObject unpackedObject, RecordValue recordValue) {
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor((zeebeDb, dbContext) -> {
            return createStreamProcessor(zeebeDb, 1);
        });
        long writeEvent = this.rule.writeEvent(intent, unpackedObject);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        List<Record> exportedRecords = this.exporters.get(0).getExportedRecords();
        Assertions.assertThat(exportedRecords).hasSize(1);
        Record record = exportedRecords.get(0);
        LoggedEvent withPosition = this.rule.events().withPosition(writeEvent);
        RecordMetadata recordMetadata = new RecordMetadata();
        io.zeebe.exporter.api.record.Assertions.assertThat(record).hasPosition(withPosition.getPosition()).hasRaftTerm(withPosition.getRaftTerm()).hasSourceRecordPosition(withPosition.getSourceEventPosition()).hasProducerId(withPosition.getProducerId()).hasKey(withPosition.getKey()).hasTimestamp(Instant.ofEpochMilli(withPosition.getTimestamp()));
        withPosition.readMetadata(recordMetadata);
        io.zeebe.exporter.api.record.Assertions.assertThat(record.getMetadata()).hasIntent(recordMetadata.getIntent()).hasPartitionId(1).hasRecordType(recordMetadata.getRecordType()).hasRejectionType(recordMetadata.getRejectionType()).hasRejectionReason(BufferUtil.bufferAsString(recordMetadata.getRejectionReason())).hasValueType(recordMetadata.getValueType());
        io.zeebe.exporter.api.record.Assertions.assertThat(record).hasValue(recordValue);
    }
}
