package org.culturegraph.mf.plumbing;

import org.culturegraph.mf.framework.FluxCommand;
import org.culturegraph.mf.framework.StreamReceiver;
import org.culturegraph.mf.framework.annotations.Description;
import org.culturegraph.mf.framework.annotations.In;
import org.culturegraph.mf.framework.annotations.Out;
import org.culturegraph.mf.framework.helpers.DefaultStreamPipe;

@Out(StreamReceiver.class)
@FluxCommand("merge-batch-stream")
@Description("Merges a sequence of batchSize records")
@In(StreamReceiver.class)
/* loaded from: input_file:org/culturegraph/mf/plumbing/StreamBatchMerger.class */
public final class StreamBatchMerger extends DefaultStreamPipe<StreamReceiver> {
    public static final long DEFAULT_BATCH_SIZE = 1;
    private long batchSize = 1;
    private long recordCount;

    public void setBatchSize(long j) {
        this.batchSize = j;
    }

    public long getBatchSize() {
        return this.batchSize;
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultStreamPipe, org.culturegraph.mf.framework.StreamReceiver
    public void startRecord(String str) {
        if (this.recordCount == 0) {
            ((StreamReceiver) getReceiver()).startRecord(str);
        }
        this.recordCount++;
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultStreamPipe, org.culturegraph.mf.framework.StreamReceiver
    public void endRecord() {
        if (this.recordCount >= this.batchSize) {
            ((StreamReceiver) getReceiver()).endRecord();
            this.recordCount = 0L;
        }
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultStreamPipe, org.culturegraph.mf.framework.StreamReceiver
    public void startEntity(String str) {
        ((StreamReceiver) getReceiver()).startEntity(str);
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultStreamPipe, org.culturegraph.mf.framework.StreamReceiver
    public void endEntity() {
        ((StreamReceiver) getReceiver()).endEntity();
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultStreamPipe, org.culturegraph.mf.framework.StreamReceiver
    public void literal(String str, String str2) {
        ((StreamReceiver) getReceiver()).literal(str, str2);
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultSender
    protected void onResetStream() {
        this.recordCount = 0L;
    }

    @Override // org.culturegraph.mf.framework.helpers.DefaultSender
    protected void onCloseStream() {
        if (this.recordCount > 0) {
            ((StreamReceiver) getReceiver()).endRecord();
            this.recordCount = 0L;
        }
    }
}
