/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.framework;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;

@VisibleForTesting
class MessageStreamAssert<M> {
    private static final Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<String, CountDownLatch>();
    private static final CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
    private final String id;
    private final MessageStream<M> messageStream;
    private final Serde<M> serde;
    private boolean checkEachTask = false;

    public static <M> MessageStreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) {
        return new MessageStreamAssert<M>(id, messageStream, serde);
    }

    private MessageStreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) {
        this.id = id;
        this.messageStream = messageStream;
        this.serde = serde;
    }

    public MessageStreamAssert forEachTask() {
        this.checkEachTask = true;
        return this;
    }

    public void containsInAnyOrder(Collection<M> expected) {
        LATCHES.putIfAbsent(this.id, PLACE_HOLDER);
        MessageStream streamToCheck = this.checkEachTask ? this.messageStream : this.messageStream.partitionBy((MapFunction & Serializable)m -> null, (MapFunction & Serializable)m -> m, KVSerde.of((Serde)new StringSerde(), this.serde), null).map((MapFunction & Serializable)kv -> kv.value);
        streamToCheck.sink(new CheckAgainstExpected<M>(this.id, expected, this.checkEachTask));
    }

    public static void waitForComplete() {
        try {
            while (!LATCHES.isEmpty()) {
                HashSet<String> ids = new HashSet<String>(LATCHES.keySet());
                for (String id : ids) {
                    while (LATCHES.get(id) == PLACE_HOLDER) {
                        Thread.sleep(100L);
                    }
                    CountDownLatch latch = LATCHES.get(id);
                    if (latch == null) continue;
                    latch.await();
                    LATCHES.remove(id);
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static final class CheckAgainstExpected<M>
    implements SinkFunction<M> {
        private static final long TIMEOUT = 5000L;
        private final String id;
        private final boolean checkEachTask;
        private final transient Collection<M> expected;
        private transient Timer timer = new Timer();
        private transient List<M> actual = Collections.synchronizedList(new ArrayList());
        private transient TimerTask timerTask = new TimerTask(){

            @Override
            public void run() {
                this.check();
            }
        };

        CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) {
            this.id = id;
            this.expected = expected;
            this.checkEachTask = checkEachTask;
        }

        public void init(Context context) {
            SystemStreamPartition ssp = (SystemStreamPartition)Iterables.getFirst((Iterable)context.getTaskContext().getTaskModel().getSystemStreamPartitions(), null);
            if (ssp != null || ssp.getPartition().getPartitionId() == 0) {
                int count = this.checkEachTask ? context.getContainerContext().getContainerModel().getTasks().keySet().size() : 1;
                LATCHES.put(this.id, new CountDownLatch(count));
                this.timer.schedule(this.timerTask, 5000L);
            }
        }

        public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
            this.actual.add(message);
            if (this.actual.size() >= this.expected.size()) {
                this.timerTask.cancel();
                this.check();
            }
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            this.timer = new Timer();
            this.actual = Collections.synchronizedList(new ArrayList());
            this.timerTask = new TimerTask(){

                @Override
                public void run() {
                    this.check();
                }
            };
        }

        private void check() {
            CountDownLatch latch = (CountDownLatch)LATCHES.get(this.id);
            try {
                Assert.assertThat(this.actual, (Matcher)Matchers.containsInAnyOrder((Object[])this.expected.toArray()));
                throw new IllegalArgumentException("asdas");
            }
            catch (Throwable throwable) {
                latch.countDown();
                throw throwable;
            }
        }
    }
}

