package org.apache.spark.sql.kafka010;

import org.apache.kafka.common.TopicPartition;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.kafka010.KafkaSourceTest;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamTest;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.test.SharedSparkSession;
import org.apache.spark.sql.test.TestSparkSession;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaContinuousSourceSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001A2A\u0001B\u0003\u0001!!)\u0001\u0004\u0001C\u00013!91\u0004\u0001b\u0001\n\u0003b\u0002BB\u0018\u0001A\u0003%QDA\u0014LC\u001a\\\u0017mQ8oi&tWo\\;t'>,(oY3U_BL7\rR3mKRLwN\\*vSR,'B\u0001\u0004\b\u0003!Y\u0017MZ6baE\u0002$B\u0001\u0005\n\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0015-\tQa\u001d9be.T!\u0001D\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0011aA8sO\u000e\u00011c\u0001\u0001\u0012+A\u0011!cE\u0007\u0002\u000b%\u0011A#\u0002\u0002\u0010\u0017\u000647.Y*pkJ\u001cW\rV3tiB\u0011!CF\u0005\u0003/\u0015\u00111cS1gW\u0006\u001cuN\u001c;j]V|Wo\u001d+fgR\fa\u0001P5oSRtD#\u0001\u000e\u0011\u0005I\u0001\u0011a\u00032s_.,'\u000f\u0015:paN,\u0012!\b\t\u0005=\u0015:s%D\u0001 \u0015\t\u0001\u0013%A\u0005j[6,H/\u00192mK*\u0011!eI\u0001\u000bG>dG.Z2uS>t'\"\u0001\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019z\"aA'baB\u0011\u0001&L\u0007\u0002S)\u0011!fK\u0001\u0005Y\u0006twMC\u0001-\u0003\u0011Q\u0017M^1\n\u00059J#AB*ue&tw-\u0001\u0007ce>\\WM\u001d)s_B\u001c\b\u0005")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaContinuousSourceTopicDeletionSuite.class */
public class KafkaContinuousSourceTopicDeletionSuite extends KafkaSourceTest implements KafkaContinuousTest {
    private final Map<String, String> brokerProps;
    private final Trigger defaultTrigger;
    private final boolean defaultUseV2Sink;
    private final SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$beforeEach() {
        SharedSparkSession.beforeEach$(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$afterEach() {
        SharedSparkSession.afterEach$(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public TestSparkSession createSparkSession() {
        TestSparkSession createSparkSession;
        createSparkSession = createSparkSession();
        return createSparkSession;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void setTopicPartitions(String str, int i, StreamExecution streamExecution) {
        setTopicPartitions(str, i, streamExecution);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void beforeEach() {
        beforeEach();
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void afterEach() {
        afterEach();
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public Trigger defaultTrigger() {
        return this.defaultTrigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public boolean defaultUseV2Sink() {
        return this.defaultUseV2Sink;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener() {
        return this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultTrigger_$eq(Trigger trigger) {
        this.defaultTrigger = trigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultUseV2Sink_$eq(boolean z) {
        this.defaultUseV2Sink = z;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public final void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener_$eq(SparkListener sparkListener) {
        this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener = sparkListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public Map<String, String> brokerProps() {
        return this.brokerProps;
    }

    public static final /* synthetic */ int $anonfun$new$31(Tuple2 tuple2) {
        return new StringOps(Predef$.MODULE$.augmentString((String) tuple2._2())).toInt() + 1;
    }

    public static final /* synthetic */ boolean $anonfun$new$35(String str, TopicPartition topicPartition) {
        String str2 = topicPartition.topic();
        return str2 != null ? str2.equals(str) : str == null;
    }

    public static final /* synthetic */ boolean $anonfun$new$34(String str, KafkaContinuousReader kafkaContinuousReader) {
        return kafkaContinuousReader.knownPartitions().exists(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$new$35(str, topicPartition));
        });
    }

    public KafkaContinuousSourceTopicDeletionSuite() {
        KafkaContinuousTest.$init$(this);
        this.brokerProps = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.create.topics.enable"), "false")}));
        test("subscribing topic by pattern with topic deletions", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String newTopic = this.newTopic();
            String sb = new StringBuilder(6).append(newTopic).append("-seems").toString();
            String sb2 = new StringBuilder(4).append(newTopic).append("-bad").toString();
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(sb, 5, testUtils.createTopic$default$3());
            this.testUtils().sendMessages(sb, new String[]{"-1"});
            Predef$.MODULE$.require(this.convertToEqualizer(BoxesRunTime.boxToInteger(this.testUtils().getLatestOffsets((Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{sb}))).size())).$eq$eq$eq(BoxesRunTime.boxToInteger(5), Equality$.MODULE$.default()));
            final KafkaContinuousSourceTopicDeletionSuite kafkaContinuousSourceTopicDeletionSuite = null;
            Dataset<?> map = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("kafka.metadata.max.age.ms", "1").option("kafka.default.api.timeout.ms", "3000").option("subscribePattern", new StringBuilder(3).append(newTopic).append("-.*").toString()).option("failOnDataLoss", "false").load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).as(this.testImplicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaContinuousSourceTopicDeletionSuite.class.getClassLoader()), new TypeCreator(kafkaContinuousSourceTopicDeletionSuite) { // from class: org.apache.spark.sql.kafka010.KafkaContinuousSourceTopicDeletionSuite$$typecreator4$2
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), Nil$.MODULE$)));
                }
            }))).map(tuple2 -> {
                return BoxesRunTime.boxToInteger($anonfun$new$31(tuple2));
            }, this.testImplicits().newIntEncoder());
            OutputMode testStream$default$2 = this.testStream$default$2();
            boolean testStream$default$3 = this.testStream$default$3();
            Predef$ predef$ = Predef$.MODULE$;
            Set<String> set = (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{sb}));
            Set<String> set2 = (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{sb2}));
            this.testStream(map, testStream$default$2, testStream$default$3, predef$.wrapRefArray(new StreamTest.StreamAction[]{this.makeSureGetOffsetCalled(), new KafkaSourceTest.AddKafkaData(this, set, Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3}), this.AddKafkaData().apply$default$3(set, Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3})), this.AddKafkaData().apply$default$4(set, Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3})), this.AddKafkaData().apply$default$5(set, Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3})), this.AddKafkaData().apply$default$6(set, Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3}))), this.CheckAnswer().apply(Predef$.MODULE$.wrapIntArray(new int[]{2, 3, 4}), this.testImplicits().newIntEncoder()), this.Execute().apply(streamExecution -> {
                this.testUtils().deleteTopic(sb);
                KafkaTestUtils testUtils2 = this.testUtils();
                testUtils2.createTopic(sb2, 5, testUtils2.createTopic$default$3());
                return (Assertion) this.eventually(this.timeout(this.streamingTimeout()), () -> {
                    return this.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamExecution.lastExecution().logical().collectFirst(new KafkaContinuousSourceTopicDeletionSuite$$anonfun$1(null)).exists(kafkaContinuousReader -> {
                        return BoxesRunTime.boxToBoolean($anonfun$new$34(sb2, kafkaContinuousReader));
                    }), "query.lastExecution.logical.collectFirst[org.apache.spark.sql.kafka010.KafkaContinuousReader](({\n  @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan,org.apache.spark.sql.kafka010.KafkaContinuousReader] with Serializable {\n    def <init>(): <$anon: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan => org.apache.spark.sql.kafka010.KafkaContinuousReader> = {\n      $anonfun.super.<init>();\n      ()\n    };\n    final override def applyOrElse[A1 <: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, B1 >: org.apache.spark.sql.kafka010.KafkaContinuousReader](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan): org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @unchecked) match {\n      case (output: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference], source: org.apache.spark.sql.sources.v2.DataSourceV2, options: Map[String,String], reader: org.apache.spark.sql.sources.v2.reader.DataSourceReader)org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation(_, _, _, (r @ (_: org.apache.spark.sql.kafka010.KafkaContinuousReader))) => r\n      case (defaultCase$ @ _) => default.apply(x1)\n    };\n    final def isDefinedAt(x1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan): Boolean = ((x1.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan): org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @unchecked) match {\n      case (output: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference], source: org.apache.spark.sql.sources.v2.DataSourceV2, options: Map[String,String], reader: org.apache.spark.sql.sources.v2.reader.DataSourceReader)org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation(_, _, _, (r @ (_: org.apache.spark.sql.kafka010.KafkaContinuousReader))) => true\n      case (defaultCase$ @ _) => false\n    }\n  };\n  new $anonfun()\n}: PartialFunction[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan,org.apache.spark.sql.kafka010.KafkaContinuousReader])).exists(((r: org.apache.spark.sql.kafka010.KafkaContinuousReader) => r.knownPartitions.exists(((x$1: org.apache.kafka.common.TopicPartition) => x$1.topic().==(topic2)))))", Prettifier$.MODULE$.default()), new StringBuilder(38).append("query never reconfigured to new topic ").append(sb2).toString(), Prettifier$.MODULE$.default(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 209));
                }, this.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 208));
            }), new KafkaSourceTest.AddKafkaData(this, set2, Predef$.MODULE$.wrapIntArray(new int[]{4, 5, 6}), this.AddKafkaData().apply$default$3(set2, Predef$.MODULE$.wrapIntArray(new int[]{4, 5, 6})), this.AddKafkaData().apply$default$4(set2, Predef$.MODULE$.wrapIntArray(new int[]{4, 5, 6})), this.AddKafkaData().apply$default$5(set2, Predef$.MODULE$.wrapIntArray(new int[]{4, 5, 6})), this.AddKafkaData().apply$default$6(set2, Predef$.MODULE$.wrapIntArray(new int[]{4, 5, 6}))), this.CheckAnswer().apply(Predef$.MODULE$.wrapIntArray(new int[]{2, 3, 4, 5, 6, 7}), this.testImplicits().newIntEncoder())}));
        }, new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 179));
    }
}
