package io.numaproj.numaflow.sourcer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.Generated;

/* loaded from: input_file:io/numaproj/numaflow/sourcer/OutputObserverImpl.class */
class OutputObserverImpl implements OutputObserver {
    StreamObserver<SourceOuterClass.ReadResponse> responseObserver;

    @Override // io.numaproj.numaflow.sourcer.OutputObserver
    public void send(Message message) {
        this.responseObserver.onNext(buildResponse(message));
    }

    private SourceOuterClass.ReadResponse buildResponse(Message message) {
        return SourceOuterClass.ReadResponse.newBuilder().setResult(SourceOuterClass.ReadResponse.Result.newBuilder().addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).setPayload(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).setEventTime(Timestamp.newBuilder().setSeconds(message.getEventTime().getEpochSecond()).setNanos(message.getEventTime().getNano())).setOffset(SourceOuterClass.Offset.newBuilder().setOffset(message.getOffset().getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getOffset().getValue())).setPartitionId(message.getOffset().getPartitionId().intValue())).putAllHeaders(message.getHeaders() != null ? message.getHeaders() : new HashMap<>()).m2029build()).m1981build();
    }

    @Generated
    public OutputObserverImpl(StreamObserver<SourceOuterClass.ReadResponse> streamObserver) {
        this.responseObserver = streamObserver;
    }
}
