package io.zeebe.broker.logstreams.processor;

import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamBatchWriter;
import io.zeebe.logstreams.log.LogStreamBatchWriterImpl;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.RecordMetadata;
import io.zeebe.protocol.intent.Intent;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedStreamWriterImpl.class */
public class TypedStreamWriterImpl implements TypedStreamWriter, TypedBatchWriter {
    protected final Consumer<RecordMetadata> noop = recordMetadata -> {
    };
    protected RecordMetadata metadata = new RecordMetadata();
    protected final Map<Class<? extends UnpackedObject>, ValueType> typeRegistry;
    protected final LogStream stream;
    protected LogStreamWriter writer;
    protected LogStreamBatchWriter batchWriter;
    protected int producerId;
    protected long sourceRecordPosition;

    public TypedStreamWriterImpl(LogStream logStream, Map<ValueType, Class<? extends UnpackedObject>> map) {
        this.stream = logStream;
        this.metadata.protocolVersion(1);
        this.writer = new LogStreamWriterImpl(logStream);
        this.batchWriter = new LogStreamBatchWriterImpl(logStream);
        this.typeRegistry = new HashMap();
        map.forEach((valueType, cls) -> {
            this.typeRegistry.put(cls, valueType);
        });
    }

    public void configureSourceContext(int i, long j) {
        this.producerId = i;
        this.sourceRecordPosition = j;
    }

    protected void initMetadata(RecordType recordType, Intent intent, UnpackedObject unpackedObject) {
        this.metadata.reset();
        ValueType valueType = this.typeRegistry.get(unpackedObject.getClass());
        this.metadata.recordType(recordType);
        this.metadata.valueType(valueType);
        this.metadata.intent(intent);
    }

    private long writeRecord(long j, RecordType recordType, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        this.writer.reset();
        this.writer.producerId(this.producerId);
        if (this.sourceRecordPosition >= 0) {
            this.writer.sourceRecordPosition(this.sourceRecordPosition);
        }
        initMetadata(recordType, intent, unpackedObject);
        consumer.accept(this.metadata);
        if (j >= 0) {
            this.writer.key(j);
        } else {
            this.writer.positionAsKey();
        }
        return this.writer.metadataWriter(this.metadata).valueWriter(unpackedObject).tryWrite();
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeNewCommand(Intent intent, UnpackedObject unpackedObject) {
        return writeRecord(-1L, RecordType.COMMAND, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowUpCommand(long j, Intent intent, UnpackedObject unpackedObject) {
        return writeRecord(j, RecordType.COMMAND, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowUpCommand(long j, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        return writeRecord(j, RecordType.COMMAND, intent, unpackedObject, consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeNewEvent(Intent intent, UnpackedObject unpackedObject) {
        return writeRecord(-1L, RecordType.EVENT, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject) {
        return writeRecord(j, RecordType.EVENT, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        return writeRecord(j, RecordType.EVENT, intent, unpackedObject, consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeRejection(TypedRecord<? extends UnpackedObject> typedRecord) {
        return writeRecord(typedRecord.getKey(), RecordType.COMMAND_REJECTION, typedRecord.getMetadata().getIntent(), typedRecord.getValue(), this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeRejection(TypedRecord<? extends UnpackedObject> typedRecord, Consumer<RecordMetadata> consumer) {
        return writeRecord(typedRecord.getKey(), RecordType.COMMAND_REJECTION, typedRecord.getMetadata().getIntent(), typedRecord.getValue(), consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addNewCommand(Intent intent, UnpackedObject unpackedObject) {
        return addRecord(-1L, RecordType.COMMAND, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addFollowUpCommand(long j, Intent intent, UnpackedObject unpackedObject) {
        return addRecord(j, RecordType.COMMAND, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addNewEvent(Intent intent, UnpackedObject unpackedObject) {
        return addRecord(-1L, RecordType.EVENT, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject) {
        return addRecord(j, RecordType.EVENT, intent, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        return addRecord(j, RecordType.EVENT, intent, unpackedObject, consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addRejection(TypedRecord<? extends UnpackedObject> typedRecord) {
        return addRecord(typedRecord.getKey(), RecordType.COMMAND_REJECTION, typedRecord.getMetadata().getIntent(), typedRecord.getValue(), this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addRejection(TypedRecord<? extends UnpackedObject> typedRecord, Consumer<RecordMetadata> consumer) {
        return addRecord(typedRecord.getKey(), RecordType.COMMAND_REJECTION, typedRecord.getMetadata().getIntent(), typedRecord.getValue(), consumer);
    }

    private TypedBatchWriter addRecord(long j, RecordType recordType, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        initMetadata(recordType, intent, unpackedObject);
        consumer.accept(this.metadata);
        LogStreamBatchWriter.LogEntryBuilder event = this.batchWriter.event();
        if (j >= 0) {
            event.key(j);
        } else {
            event.positionAsKey();
        }
        event.metadataWriter(this.metadata).valueWriter(unpackedObject).done();
        return this;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public long write() {
        return this.batchWriter.tryWrite();
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public TypedBatchWriter newBatch() {
        this.batchWriter.reset();
        this.batchWriter.producerId(this.producerId);
        if (this.sourceRecordPosition >= 0) {
            this.batchWriter.sourceRecordPosition(this.sourceRecordPosition);
        }
        return this;
    }
}
