package io.debezium.pipeline.notification.channels;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.SnapshotRecord;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/notification/channels/SinkNotificationChannel.class */
public class SinkNotificationChannel implements NotificationChannel, ConnectChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SinkNotificationChannel.class);
    public static final Field NOTIFICATION_TOPIC = Field.create("notification.sink.topic.name").withDisplayName("Notification topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the notifications. This is required in case 'sink' is in the list of enabled channels").withValidation(SinkNotificationChannel::validateNotificationTopicName);
    public static final String CHANNEL_NAME = "sink";
    private BlockingConsumer<SourceRecord> consumer;
    private Schema keySchema;
    private Schema valueSchema;
    private String topicName;

    /* loaded from: input_file:io/debezium/pipeline/notification/channels/SinkNotificationChannel$EmptyOffsetContext.class */
    private static class EmptyOffsetContext implements OffsetContext {
        private EmptyOffsetContext() {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public Map<String, ?> getOffset() {
            return Map.of();
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public Schema getSourceInfoSchema() {
            return null;
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public Struct getSourceInfo() {
            return null;
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public boolean isSnapshotRunning() {
            return false;
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public void markSnapshotRecord(SnapshotRecord snapshotRecord) {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public void preSnapshotStart() {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public void preSnapshotCompletion() {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public void postSnapshotCompletion() {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public void event(DataCollectionId dataCollectionId, Instant instant) {
        }

        @Override // io.debezium.pipeline.spi.OffsetContext
        public TransactionContext getTransactionContext() {
            return null;
        }
    }

    private static int validateNotificationTopicName(Configuration configuration, Field field, Field.ValidationOutput validationOutput) {
        String string = configuration.getString(field);
        int i = 0;
        if (configuration.getList(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS).contains(CHANNEL_NAME) && string == null) {
            validationOutput.accept(field, string, "Notification topic name must be provided when kafka notification channel is enabled");
            i = 0 + 1;
        }
        return i;
    }

    @Override // io.debezium.pipeline.notification.channels.NotificationChannel
    public void init(CommonConnectorConfig commonConnectorConfig) {
        this.topicName = commonConnectorConfig.getNotificationTopic();
    }

    @Override // io.debezium.pipeline.notification.channels.ConnectChannel
    public void initConnectChannel(SchemaFactory schemaFactory, BlockingConsumer<SourceRecord> blockingConsumer) {
        this.consumer = blockingConsumer;
        this.keySchema = schemaFactory.notificationKeySchema(SchemaNameAdjuster.NO_OP);
        this.valueSchema = schemaFactory.notificationValueSchema(SchemaNameAdjuster.NO_OP);
    }

    @Override // io.debezium.pipeline.notification.channels.NotificationChannel
    public String name() {
        return CHANNEL_NAME;
    }

    @Override // io.debezium.pipeline.notification.channels.NotificationChannel
    public void send(Notification notification) {
        send(notification, Offsets.of(Collections.singletonMap(Map::of, new EmptyOffsetContext())));
    }

    @Override // io.debezium.pipeline.notification.channels.ConnectChannel
    public <P extends Partition, O extends OffsetContext> void send(Notification notification, Offsets<P, O> offsets) {
        Struct put = new Struct(this.keySchema).put("id", notification.getId());
        Struct put2 = new Struct(this.valueSchema).put("id", notification.getId()).put("type", notification.getType()).put(Notification.AGGREGATE_TYPE, notification.getAggregateType()).put(Notification.ADDITIONAL_DATA, notification.getAdditionalData()).put(Notification.TIMESTAMP, notification.getTimestamp());
        try {
            this.consumer.accept(new SourceRecord(offsets.getTheOnlyPartition().getSourcePartition(), offsets.getTheOnlyOffset() == null ? Map.of() : offsets.getTheOnlyOffset().getOffset(), this.topicName, this.keySchema, put, this.valueSchema, put2));
        } catch (InterruptedException e) {
            LOGGER.error("Notification {} not sent due to interrupt", notification);
        }
    }

    @Override // io.debezium.pipeline.notification.channels.NotificationChannel
    public void close() {
    }
}
