/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.List;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.PulsarCommandSender;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.ChannelPromise;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarCommandSenderImpl
implements PulsarCommandSender {
    private static final Logger log = LoggerFactory.getLogger(PulsarCommandSenderImpl.class);
    private final BrokerInterceptor interceptor;
    private final ServerCnx cnx;

    public PulsarCommandSenderImpl(BrokerInterceptor interceptor, ServerCnx cnx) {
        this.interceptor = interceptor;
        this.cnx = cnx;
    }

    @Override
    public void sendPartitionMetadataResponse(PulsarApi.ServerError error, String errorMsg, long requestId) {
        PulsarApi.BaseCommand command = Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getPartitionMetadataResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendPartitionMetadataResponse(int partitions, long requestId) {
        PulsarApi.BaseCommand command = Commands.newPartitionMetadataResponseCommand(partitions, requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getPartitionMetadataResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendSuccessResponse(long requestId) {
        PulsarApi.BaseCommand command = Commands.newSuccessCommand(requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getSuccess().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendErrorResponse(long requestId, PulsarApi.ServerError error, String message) {
        PulsarApi.BaseCommand command = Commands.newErrorCommand(requestId, error, message);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getError().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion) {
        PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getProducerSuccess().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion) {
        PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId, schemaVersion);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getProducerSuccess().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId, long entryId) {
        PulsarApi.BaseCommand command = Commands.newSendReceiptCommand(producerId, sequenceId, highestId, ledgerId, entryId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getSendReceipt().getMessageId().recycle();
        command.getSendReceipt().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendSendError(long producerId, long sequenceId, PulsarApi.ServerError error, String errorMsg) {
        PulsarApi.BaseCommand command = Commands.newSendErrorCommand(producerId, sequenceId, error, errorMsg);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getSendError().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
        PulsarApi.BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getGetTopicsOfNamespaceResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) {
        PulsarApi.BaseCommand command = Commands.newGetSchemaResponseCommand(requestId, schema, version);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getGetSchemaResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendGetSchemaErrorResponse(long requestId, PulsarApi.ServerError error, String errorMessage) {
        PulsarApi.BaseCommand command = Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getGetSchemaResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendGetOrCreateSchemaResponse(long requestId, SchemaVersion schemaVersion) {
        PulsarApi.BaseCommand command = Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getGetOrCreateSchemaResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendGetOrCreateSchemaErrorResponse(long requestId, PulsarApi.ServerError error, String errorMessage) {
        PulsarApi.BaseCommand command = Commands.newGetOrCreateSchemaResponseErrorCommand(requestId, error, errorMessage);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getGetOrCreateSchemaResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize) {
        PulsarApi.BaseCommand command = Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getConnected().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative, PulsarApi.CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl) {
        PulsarApi.BaseCommand command = Commands.newLookupResponseCommand(brokerServiceUrl, brokerServiceUrlTls, authoritative, response, requestId, proxyThroughServiceUrl);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getLookupTopicResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendLookupResponse(PulsarApi.ServerError error, String errorMsg, long requestId) {
        PulsarApi.BaseCommand command = Commands.newLookupErrorResponseCommand(error, errorMsg, requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        command.getLookupTopicResponse().recycle();
        command.recycle();
        this.cnx.ctx().writeAndFlush(outBuf);
    }

    @Override
    public void sendActiveConsumerChange(long consumerId, boolean isActive) {
        if (!Commands.peerSupportsActiveConsumerListener(this.cnx.getRemoteEndpointProtocolVersion())) {
            return;
        }
        this.cnx.ctx().writeAndFlush(Commands.newActiveConsumerChange(consumerId, isActive), this.cnx.ctx().voidPromise());
    }

    @Override
    public void sendSuccess(long requestId) {
        this.cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));
    }

    @Override
    public void sendError(long requestId, PulsarApi.ServerError error, String message) {
        this.cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message));
    }

    @Override
    public void sendReachedEndOfTopic(long consumerId) {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= 9) {
            log.info("[{}] Notifying consumer that end of topic has been reached", (Object)this);
            this.cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
        }
    }

    public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, RedeliveryTracker redeliveryTracker) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        ChannelPromise writePromise = ctx.newPromise();
        ctx.channel().eventLoop().execute(() -> {
            for (int i = 0; i < entries.size(); ++i) {
                Entry entry = (Entry)entries.get(i);
                if (entry == null) continue;
                int batchSize = batchSizes.getBatchSize(i);
                if (batchSize > 1 && !this.cnx.isBatchMessageCompatibleVersion()) {
                    log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}", new Object[]{topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId()});
                    ctx.close();
                    entry.release();
                    continue;
                }
                PulsarApi.MessageIdData.Builder messageIdBuilder = PulsarApi.MessageIdData.newBuilder();
                PulsarApi.MessageIdData messageId = messageIdBuilder.setLedgerId(entry.getLedgerId()).setEntryId(entry.getEntryId()).setPartition(partitionIdx).build();
                ByteBuf metadataAndPayload = entry.getDataBuffer();
                metadataAndPayload.retain();
                if (this.cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v11.getNumber()) {
                    Commands.skipChecksumIfPresent(metadataAndPayload);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{} with batchSize {}", new Object[]{topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId(), batchSize});
                }
                int redeliveryCount = 0;
                PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
                if (redeliveryTracker.contains(position)) {
                    redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
                }
                ctx.write(this.cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
                messageId.recycle();
                messageIdBuilder.recycle();
                entry.release();
            }
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
            batchSizes.recyle();
            if (batchIndexesAcks != null) {
                batchIndexesAcks.recycle();
            }
        });
        return writePromise;
    }

    private void safeIntercept(PulsarApi.BaseCommand command, ServerCnx cnx) {
        try {
            this.interceptor.onPulsarCommand(command, cnx);
        }
        catch (Exception e) {
            log.error("Failed to execute command {} on broker interceptor.", (Object)command.getType(), (Object)e);
        }
    }
}

