package io.activej.csp.consumer;

import io.activej.async.function.AsyncConsumer;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.function.ConsumerEx;
import io.activej.common.recycle.Recyclable;
import io.activej.csp.consumer.impl.OfAnotherReactor;
import io.activej.csp.consumer.impl.OfAsyncConsumer;
import io.activej.csp.consumer.impl.OfConsumer;
import io.activej.csp.consumer.impl.OfException;
import io.activej.csp.consumer.impl.OfLazyProvider;
import io.activej.csp.consumer.impl.OfOutputStream;
import io.activej.csp.consumer.impl.OfPromise;
import io.activej.csp.consumer.impl.Recycling;
import io.activej.csp.queue.ChannelQueue;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.promise.Promise;
import io.activej.reactor.Reactor;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/csp/consumer/ChannelConsumers.class */
public class ChannelConsumers {
    public static <T> ChannelConsumer<T> ofAsyncConsumer(AsyncConsumer<T> asyncConsumer) {
        return ofAsyncConsumer(asyncConsumer, AsyncCloseable.of(exc -> {
        }));
    }

    public static <T> ChannelConsumer<T> ofAsyncConsumer(AsyncConsumer<T> asyncConsumer, @Nullable AsyncCloseable asyncCloseable) {
        return new OfAsyncConsumer(asyncConsumer, asyncCloseable);
    }

    public static <T> ChannelConsumer<T> ofConsumer(ConsumerEx<T> consumerEx) {
        return ofConsumer(consumerEx, null);
    }

    public static <T> ChannelConsumer<T> ofConsumer(ConsumerEx<T> consumerEx, @Nullable AsyncCloseable asyncCloseable) {
        return new OfConsumer(consumerEx, asyncCloseable);
    }

    public static <T> ChannelConsumer<T> ofException(Exception exc) {
        return new OfException(exc);
    }

    public static <T> ChannelConsumer<T> ofSupplier(AsyncConsumer<ChannelSupplier<T>> asyncConsumer) {
        return ofSupplier(asyncConsumer, new ChannelZeroBuffer());
    }

    public static <T> ChannelConsumer<T> ofSupplier(AsyncConsumer<ChannelSupplier<T>> asyncConsumer, ChannelQueue<T> channelQueue) {
        Promise accept = asyncConsumer.accept(channelQueue.getSupplier());
        ChannelConsumer<T> consumer = channelQueue.getConsumer();
        return accept == Promise.complete() ? consumer : consumer.withAcknowledgement(promise -> {
            return promise.both(accept);
        });
    }

    public static <T> ChannelConsumer<T> ofPromise(Promise<? extends ChannelConsumer<T>> promise) {
        return promise.isResult() ? (ChannelConsumer) promise.getResult() : new OfPromise(promise);
    }

    public static <T> ChannelConsumer<T> ofAnotherReactor(Reactor reactor, ChannelConsumer<T> channelConsumer) {
        return Reactor.getCurrentReactor() == reactor ? channelConsumer : new OfAnotherReactor(reactor, channelConsumer);
    }

    public static <T> ChannelConsumer<T> ofLazyProvider(Supplier<? extends ChannelConsumer<T>> supplier) {
        return new OfLazyProvider(supplier);
    }

    public static ChannelConsumer<ByteBuf> ofSocket(ITcpSocket iTcpSocket) {
        Objects.requireNonNull(iTcpSocket);
        return ofAsyncConsumer(iTcpSocket::write, iTcpSocket).withAcknowledgement(promise -> {
            return promise.then(() -> {
                return iTcpSocket.write((ByteBuf) null);
            });
        });
    }

    public static <T extends Recyclable> ChannelConsumer<T> recycling() {
        return new Recycling();
    }

    public static ChannelConsumer<ByteBuf> ofOutputStream(Executor executor, OutputStream outputStream) {
        return new OfOutputStream(executor, outputStream);
    }
}
