package org.eclipse.ditto.client.internal.bus;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/client/internal/bus/DefaultAdaptableBus.class */
final class DefaultAdaptableBus implements AdaptableBus {
    private static final String ACK_SUFFIX = ":ACK";
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAdaptableBus.class);
    private final ScheduledExecutorService scheduledExecutorService;
    private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor();
    private final Collection<Classifier<String>> stringClassifiers = new ConcurrentLinkedQueue();
    private final Collection<Classifier<Adaptable>> adaptableClassifiers = new ConcurrentLinkedQueue();
    private final Map<Classification, Set<Entry<Consumer<String>>>> oneTimeStringConsumers = new ConcurrentHashMap();
    private final Map<Classification, Set<Entry<Consumer<Adaptable>>>> oneTimeAdaptableConsumers = new ConcurrentHashMap();
    private final Map<Classification, Set<Entry<Consumer<Adaptable>>>> persistentAdaptableConsumers = new ConcurrentHashMap();
    private final Map<AdaptableBus.SubscriptionId, Future<?>> timeoutFutures = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/client/internal/bus/DefaultAdaptableBus$Entry.class */
    public static final class Entry<T> implements AdaptableBus.SubscriptionId {
        private final Classification key;
        private final T value;

        private Entry(Classification classification, T t) {
            this.key = classification;
            this.value = t;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultAdaptableBus(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public AdaptableBus addStringClassifier(Classifier<String> classifier) {
        this.stringClassifiers.add(classifier);
        return this;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public AdaptableBus addAdaptableClassifier(Classifier<Adaptable> classifier) {
        this.adaptableClassifiers.add(classifier);
        return this;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public CompletionStage<String> subscribeOnceForString(Classification classification, Duration duration) {
        return subscribeOnce(this.oneTimeStringConsumers, classification, duration);
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public CompletionStage<String> subscribeOnceForStringExclusively(Classification classification, Duration duration) {
        return subscribeOnce(this.oneTimeStringConsumers, classification, duration, true);
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public CompletionStage<Adaptable> subscribeOnceForAdaptable(Classification classification, Duration duration) {
        return subscribeOnce(this.oneTimeAdaptableConsumers, classification, duration);
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public AdaptableBus.SubscriptionId subscribeForAdaptable(Classification classification, Consumer<Adaptable> consumer) {
        Entry entry = new Entry(classification, consumer);
        addEntry(this.persistentAdaptableConsumers, entry);
        return entry;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public AdaptableBus.SubscriptionId subscribeForAdaptableExclusively(Classification classification, Consumer<Adaptable> consumer) {
        Entry entry = new Entry(classification, consumer);
        replaceEntry(this.persistentAdaptableConsumers, entry);
        return entry;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public AdaptableBus.SubscriptionId subscribeForAdaptableWithTimeout(Classification classification, Duration duration, Consumer<Adaptable> consumer, Predicate<Adaptable> predicate, Consumer<Throwable> consumer2) {
        CompletableFuture<Adaptable> completableFuture = new CompletableFuture<>();
        AtomicReference<Instant> atomicReference = new AtomicReference<>(Instant.now());
        Entry entry = new Entry(classification, withTermination(consumer, predicate, completableFuture, atomicReference));
        addEntry(this.persistentAdaptableConsumers, entry);
        removeAfterIdle(this.persistentAdaptableConsumers, entry, duration, completableFuture, atomicReference);
        completableFuture.thenAccept(adaptable -> {
            removeEntry(this.persistentAdaptableConsumers, entry, () -> {
            });
        }).exceptionally(th -> {
            consumer2.accept(th);
            return null;
        });
        return entry;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public boolean unsubscribe(@Nullable AdaptableBus.SubscriptionId subscriptionId) {
        if (subscriptionId == null) {
            return false;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (subscriptionId instanceof Entry) {
            removeEntry(this.persistentAdaptableConsumers, (Entry) subscriptionId, () -> {
                atomicBoolean.set(true);
            });
        }
        return atomicBoolean.get();
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public ScheduledExecutorService getScheduledExecutor() {
        return this.scheduledExecutorService;
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public void publish(String str) {
        this.singleThreadedExecutorService.submit(() -> {
            doPublish(str);
        });
    }

    @Override // org.eclipse.ditto.client.internal.bus.AdaptableBus
    public void shutdownExecutor() {
        LOGGER.trace("Shutting down AdaptableBus Executors");
        try {
            this.singleThreadedExecutorService.shutdownNow();
            this.scheduledExecutorService.shutdownNow();
            this.singleThreadedExecutorService.awaitTermination(2L, TimeUnit.SECONDS);
            this.scheduledExecutorService.awaitTermination(2L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.info("Waiting for termination was interrupted.");
        }
    }

    private void doPublish(String str) {
        if (publishToOneTimeStringSubscribers(str)) {
            return;
        }
        if (str.endsWith(ACK_SUFFIX)) {
            LOGGER.trace("Client got acknowledgement for which there is no subscriber: {}", str);
            return;
        }
        Optional<Adaptable> parseAsAdaptable = parseAsAdaptable(str);
        if (parseAsAdaptable.isPresent()) {
            Adaptable adaptable = parseAsAdaptable.get();
            List<Classification> allAdaptableTags = getAllAdaptableTags(adaptable);
            if (publishToOneTimeAdaptableSubscribers(adaptable, allAdaptableTags) || publishToPersistentAdaptableSubscribers(adaptable, allAdaptableTags)) {
                return;
            }
        }
        LOGGER.trace("Client got unhandled message: {}", str);
    }

    private Consumer<Adaptable> withTermination(Consumer<Adaptable> consumer, Predicate<Adaptable> predicate, CompletableFuture<Adaptable> completableFuture, AtomicReference<Instant> atomicReference) {
        return adaptable -> {
            if (predicate.test(adaptable)) {
                completableFuture.complete(adaptable);
            } else {
                atomicReference.set(Instant.now());
            }
            consumer.accept(adaptable);
        };
    }

    private <T> CompletionStage<T> subscribeOnce(Map<Classification, Set<Entry<Consumer<T>>>> map, Classification classification, Duration duration) {
        return subscribeOnce(map, classification, duration, false);
    }

    private <T> CompletionStage<T> subscribeOnce(Map<Classification, Set<Entry<Consumer<T>>>> map, Classification classification, Duration duration, boolean z) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        Objects.requireNonNull(completableFuture);
        Entry<T> entry = new Entry<>(classification, completableFuture::complete);
        if (z) {
            replaceEntry(map, entry);
        } else {
            addEntry(map, entry);
        }
        removeAfter(map, entry, duration, completableFuture);
        return completableFuture;
    }

    private boolean publishToOneTimeStringSubscribers(String str) {
        Consumer consumer;
        Iterator<Classifier<String>> it = this.stringClassifiers.iterator();
        while (it.hasNext()) {
            Optional<Classification> classify = it.next().classify(str);
            if (classify.isPresent() && (consumer = (Consumer) removeOne(this.oneTimeStringConsumers, classify.get())) != null) {
                runConsumerAsync(consumer, str, classify.get());
                return true;
            }
        }
        return false;
    }

    private <T> void runConsumerAsync(Consumer<T> consumer, T t, Classification classification) {
        LOGGER.trace("publishing for {}: {}", classification, t);
        if (classification.mustBeSequential()) {
            consumer.accept(t);
        } else {
            this.scheduledExecutorService.submit(() -> {
                consumer.accept(t);
            });
        }
    }

    private boolean publishToOneTimeAdaptableSubscribers(Adaptable adaptable, List<Classification> list) {
        for (Classification classification : list) {
            Consumer consumer = (Consumer) removeOne(this.oneTimeAdaptableConsumers, classification);
            if (consumer != null) {
                runConsumerAsync(consumer, adaptable, classification);
                return true;
            }
        }
        return false;
    }

    private boolean publishToPersistentAdaptableSubscribers(Adaptable adaptable, List<Classification> list) {
        boolean z = false;
        for (Classification classification : list) {
            Set<Entry<Consumer<Adaptable>>> set = this.persistentAdaptableConsumers.get(classification);
            if (set != null && !set.isEmpty()) {
                z = true;
                Iterator<Entry<Consumer<Adaptable>>> it = set.iterator();
                while (it.hasNext()) {
                    runConsumerAsync((Consumer) ((Entry) it.next()).value, adaptable, classification);
                }
            }
        }
        return z;
    }

    private List<Classification> getAllAdaptableTags(Adaptable adaptable) {
        return (List) this.adaptableClassifiers.stream().flatMap(classifier -> {
            return (Stream) classifier.classify(adaptable).map((v0) -> {
                return Stream.of(v0);
            }).orElseGet(Stream::empty);
        }).collect(Collectors.toList());
    }

    private <T> void removeAfter(Map<Classification, Set<Entry<T>>> map, Entry<T> entry, Duration duration, CompletableFuture<?> completableFuture) {
        schedule(entry, () -> {
            removeEntry(map, entry, () -> {
                completableFuture.completeExceptionally(timeout(duration));
            });
        }, duration);
    }

    private void schedule(AdaptableBus.SubscriptionId subscriptionId, Runnable runnable, Duration duration) {
        this.timeoutFutures.compute(subscriptionId, (subscriptionId2, future) -> {
            if (future != null) {
                future.cancel(false);
            }
            return this.scheduledExecutorService.schedule(runnable, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private <T> void removeAfterIdle(Map<Classification, Set<Entry<T>>> map, Entry<T> entry, Duration duration, CompletableFuture<Adaptable> completableFuture, AtomicReference<Instant> atomicReference) {
        schedule(entry, () -> {
            if (duration.minus(Duration.between((Temporal) atomicReference.get(), Instant.now())).isNegative()) {
                removeEntry(map, entry, () -> {
                    completableFuture.completeExceptionally(timeout(duration));
                });
            } else {
                removeAfterIdle(map, entry, duration, completableFuture, atomicReference);
            }
        }, duration);
    }

    private static <T> void addEntry(Map<Classification, Set<Entry<T>>> map, Entry<T> entry) {
        map.compute(((Entry) entry).key, (classification, set) -> {
            Set newKeySet = set != null ? set : ConcurrentHashMap.newKeySet();
            newKeySet.add(entry);
            return newKeySet;
        });
    }

    private static <T> void replaceEntry(Map<Classification, Set<Entry<T>>> map, Entry<T> entry) {
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        newKeySet.add(entry);
        map.put(((Entry) entry).key, newKeySet);
    }

    private Optional<Adaptable> parseAsAdaptable(String str) {
        try {
            return Optional.of(ProtocolFactory.jsonifiableAdaptableFromJson(JsonObject.of(str)));
        } catch (JsonParseException e) {
            LOGGER.warn("Client got unknown non-JSON message: {}", str, e);
            return Optional.empty();
        } catch (JsonRuntimeException e2) {
            LOGGER.warn("Client could not understand incoming JSON due to: <{}>:\n  <{}>", e2.getMessage(), str);
            return Optional.empty();
        }
    }

    private <T> void removeEntry(Map<Classification, Set<Entry<T>>> map, Entry<?> entry, Runnable runnable) {
        map.computeIfPresent(((Entry) entry).key, (classification, set) -> {
            if (set.remove(entry)) {
                runnable.run();
            }
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
        this.timeoutFutures.computeIfPresent(entry, (subscriptionId, future) -> {
            future.cancel(false);
            return null;
        });
    }

    @Nullable
    private static <T> T removeOne(Map<Classification, Set<Entry<T>>> map, Classification classification) {
        AtomicReference atomicReference = new AtomicReference(null);
        map.computeIfPresent(classification, (classification2, set) -> {
            return (Set) set.stream().findAny().map(entry -> {
                if (set.remove(entry)) {
                    atomicReference.set(entry.value);
                }
                if (set.isEmpty()) {
                    return null;
                }
                return set;
            }).orElse(null);
        });
        return (T) atomicReference.get();
    }

    private static Throwable timeout(Duration duration) {
        return new TimeoutException("Timed out after " + duration);
    }
}
