package zio.pravega;

import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.Transaction;
import java.util.UUID;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Some;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Cause;
import zio.Chunk;
import zio.Chunk$;
import zio.Exit;
import zio.IsSubtypeOfError$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$ZIOAutoCloseableOps$;
import zio.stream.ZChannel;
import zio.stream.ZSink;
import zio.stream.ZSink$;
import zio.stream.ZSink$UnwrapScopedPartiallyApplied$;
import zio.stream.ZStream;
import zio.stream.ZStream$;
import zio.stream.ZStream$UnwrapScopedPartiallyApplied$;

/* compiled from: PravegaStream.scala */
@ScalaSignature(bytes = "\u0006\u0005]4AAB\u0004\u0005\u0019!Aq\u0003\u0001B\u0001B\u0003%\u0001\u0004C\u0003\"\u0001\u0011\u0005!\u0005C\u0003&\u0001\u0011\u0005c\u0005C\u0003`\u0001\u0011\u0005\u0001\rC\u00038\u0001\u0011\u0005\u0013N\u0001\rQe\u00064XmZ1TiJ,\u0017-\\*feZL7-Z%na2T!\u0001C\u0005\u0002\u000fA\u0014\u0018M^3hC*\t!\"A\u0002{S>\u001c\u0001aE\u0002\u0001\u001bM\u0001\"AD\t\u000e\u0003=Q\u0011\u0001E\u0001\u0006g\u000e\fG.Y\u0005\u0003%=\u0011a!\u00118z%\u00164\u0007C\u0001\u000b\u0016\u001b\u00059\u0011B\u0001\f\b\u0005Q\u0001&/\u0019<fO\u0006\u001cFO]3b[N+'O^5dK\u0006ARM^3oiN#(/Z1n\u00072LWM\u001c;GC\u000e$xN]=\u0011\u0005eyR\"\u0001\u000e\u000b\u0005ma\u0012AB2mS\u0016tGO\u0003\u0002\t;)\ta$\u0001\u0002j_&\u0011\u0001E\u0007\u0002\u0019\u000bZ,g\u000e^*ue\u0016\fWn\u00117jK:$h)Y2u_JL\u0018A\u0002\u001fj]&$h\b\u0006\u0002$IA\u0011A\u0003\u0001\u0005\u0006/\t\u0001\r\u0001G\u0001\u0005g&t7.\u0006\u0002(\u000fR\u0019\u0001\u0006\u0015.\u0011\u0007%\nDG\u0004\u0002+_9\u00111FL\u0007\u0002Y)\u0011QfC\u0001\u0007yI|w\u000e\u001e \n\u0003)I!\u0001M\u0005\u0002\u000fA\f7m[1hK&\u0011!g\r\u0002\u0005)\u0006\u001c8N\u0003\u00021\u0013A9Q\u0007\u000f\u001e>\u000b*kU\"\u0001\u001c\u000b\u0005]J\u0011AB:ue\u0016\fW.\u0003\u0002:m\t)!lU5oWB\u0011abO\u0005\u0003y=\u00111!\u00118z!\tq$I\u0004\u0002@\u0003:\u00111\u0006Q\u0005\u0002!%\u0011\u0001gD\u0005\u0003\u0007\u0012\u0013\u0011\u0002\u00165s_^\f'\r\\3\u000b\u0005Az\u0001C\u0001$H\u0019\u0001!Q\u0001S\u0002C\u0002%\u0013\u0011!Q\t\u0003\u0015j\u0002\"AD&\n\u00051{!a\u0002(pi\"Lgn\u001a\t\u0003\u001d9K!aT\b\u0003\tUs\u0017\u000e\u001e\u0005\u0006#\u000e\u0001\rAU\u0001\u000bgR\u0014X-Y7OC6,\u0007CA*X\u001d\t!V\u000b\u0005\u0002,\u001f%\u0011akD\u0001\u0007!J,G-\u001a4\n\u0005aK&AB*ue&twM\u0003\u0002W\u001f!)1l\u0001a\u00019\u0006A1/\u001a;uS:<7\u000fE\u0002\u0015;\u0016K!AX\u0004\u0003\u001d]\u0013\u0018\u000e^3s'\u0016$H/\u001b8hg\u000611/\u001b8l)b,\"!Y3\u0015\u0007\t4w\rE\u0002*c\r\u0004r!\u000e\u001d;{\u0011TU\n\u0005\u0002GK\u0012)\u0001\n\u0002b\u0001\u0013\")\u0011\u000b\u0002a\u0001%\")1\f\u0002a\u0001QB\u0019A#\u00183\u0016\u0005)\u0004HcA6rgB\u0019\u0011&\r7\u0011\u000bUj'(P8\n\u000594$a\u0002.TiJ,\u0017-\u001c\t\u0003\rB$Q\u0001S\u0003C\u0002%CQA]\u0003A\u0002I\u000bqB]3bI\u0016\u0014xI]8va:\u000bW.\u001a\u0005\u00067\u0016\u0001\r\u0001\u001e\t\u0004)U|\u0017B\u0001<\b\u00059\u0011V-\u00193feN+G\u000f^5oON\u0004")
/* loaded from: input_file:zio/pravega/PravegaStreamServiceImpl.class */
public class PravegaStreamServiceImpl implements PravegaStreamService {
    private final EventStreamClientFactory eventStreamClientFactory;

    @Override // zio.pravega.PravegaStreamService
    public <A> ZIO<Object, Throwable, ZSink<Object, Throwable, A, Nothing$, BoxedUnit>> sink(String str, WriterSettings<A> writerSettings) {
        ZIO map = ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(() -> {
            return this.eventStreamClientFactory.createEventWriter(str, writerSettings.serializer(), writerSettings.eventWriterConfig());
        }, "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter(PravegaStream.scala:40)")), "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter(PravegaStream.scala:47)").map(eventStreamWriter -> {
            return new ZSink($anonfun$sink$2(writerSettings, eventStreamWriter));
        }, "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter(PravegaStream.scala:48)");
        return ZIO$.MODULE$.attempt(() -> {
            return new ZSink($anonfun$sink$6(map));
        }, "zio.pravega.PravegaStreamServiceImpl.sink(PravegaStream.scala:57)");
    }

    @Override // zio.pravega.PravegaStreamService
    public <A> ZIO<Object, Throwable, ZSink<Object, Throwable, A, Nothing$, BoxedUnit>> sinkTx(String str, WriterSettings<A> writerSettings) {
        ZIO map = ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(() -> {
            return this.eventStreamClientFactory.createTransactionalEventWriter(str, writerSettings.serializer(), writerSettings.eventWriterConfig());
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:68)")), "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:75)").flatMap(transactionalEventStreamWriter -> {
            return ZIO$.MODULE$.acquireReleaseExit(() -> {
                return ZIO$.MODULE$.attemptBlocking(() -> {
                    return transactionalEventStreamWriter.beginTxn();
                }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:77)");
            }, (transaction, exit) -> {
                ZIO orDie;
                Tuple2 tuple2 = new Tuple2(transaction, exit);
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Transaction transaction = (Transaction) tuple2._1();
                Exit.Failure failure = (Exit) tuple2._2();
                if (failure instanceof Exit.Failure) {
                    Cause cause = failure.cause();
                    orDie = ZIO$.MODULE$.logCause(() -> {
                        return cause;
                    }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:81)").$times$greater(() -> {
                        return ZIO$.MODULE$.attemptBlocking(() -> {
                            transaction.abort();
                        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:83)").orDie(IsSubtypeOfError$.MODULE$.impl($less$colon$less$.MODULE$.refl()), CanFail$.MODULE$.canFail(), "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:84)");
                    }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:81)");
                } else {
                    if (!(failure instanceof Exit.Success)) {
                        throw new MatchError(failure);
                    }
                    orDie = ZIO$.MODULE$.attemptBlocking(() -> {
                        transaction.commit();
                    }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:86)").orDie(IsSubtypeOfError$.MODULE$.impl($less$colon$less$.MODULE$.refl()), CanFail$.MODULE$.canFail(), "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:86)");
                }
                return orDie;
            }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:77)");
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:76)").map(transaction -> {
            return new ZSink($anonfun$sinkTx$10(writerSettings, transaction));
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:90)");
        return ZIO$.MODULE$.attempt(() -> {
            return new ZSink($anonfun$sinkTx$14(map));
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx(PravegaStream.scala:99)");
    }

    @Override // zio.pravega.PravegaStreamService
    public <A> ZIO<Object, Throwable, ZStream<Object, Throwable, A>> stream(String str, ReaderSettings<A> readerSettings) {
        return ZIO$.MODULE$.attempt(() -> {
            return ZStream$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZStream$.MODULE$.unwrapScoped(), () -> {
                return ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(() -> {
                    return this.eventStreamClientFactory.createReader((String) readerSettings.readerId().getOrElse(() -> {
                        return UUID.randomUUID().toString();
                    }), str, readerSettings.serializer(), readerSettings.readerConfig());
                }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:111)")), "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:120)").map(eventStreamReader -> {
                    return ZStream$.MODULE$.repeatZIOChunk(() -> {
                        return ZIO$.MODULE$.attemptBlocking(() -> {
                            Chunk empty;
                            EventRead readNextEvent = eventStreamReader.readNextEvent(readerSettings.timeout());
                            if (readNextEvent.isCheckpoint()) {
                                empty = Chunk$.MODULE$.empty();
                            } else {
                                Object event = readNextEvent.getEvent();
                                empty = event == null ? Chunk$.MODULE$.empty() : Chunk$.MODULE$.single(event);
                            }
                            return empty;
                        }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:123)");
                    }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:122)");
                }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:121)");
            }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:109)");
        }, "zio.pravega.PravegaStreamServiceImpl.stream(PravegaStream.scala:108)");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ZIO writeEvent$1(Object obj, WriterSettings writerSettings, EventStreamWriter eventStreamWriter) {
        ZIO fromCompletableFuture;
        Some keyExtractor = writerSettings.keyExtractor();
        if (None$.MODULE$.equals(keyExtractor)) {
            fromCompletableFuture = ZIO$.MODULE$.fromCompletableFuture(() -> {
                return eventStreamWriter.writeEvent(obj);
            }, "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter.writeEvent(PravegaStream.scala:50)");
        } else {
            if (!(keyExtractor instanceof Some)) {
                throw new MatchError(keyExtractor);
            }
            Function1 function1 = (Function1) keyExtractor.value();
            fromCompletableFuture = ZIO$.MODULE$.fromCompletableFuture(() -> {
                return eventStreamWriter.writeEvent((String) function1.apply(obj), obj);
            }, "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter.writeEvent(PravegaStream.scala:52)");
        }
        return fromCompletableFuture;
    }

    public static final /* synthetic */ ZChannel $anonfun$sink$2(WriterSettings writerSettings, EventStreamWriter eventStreamWriter) {
        return ZSink$.MODULE$.foreach(obj -> {
            return writeEvent$1(obj, writerSettings, eventStreamWriter);
        }, "zio.pravega.PravegaStreamServiceImpl.sink.acquireWriter(PravegaStream.scala:54)");
    }

    public static final /* synthetic */ ZChannel $anonfun$sink$6(ZIO zio2) {
        return ZSink$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZSink$.MODULE$.unwrapScoped(), () -> {
            return zio2;
        }, "zio.pravega.PravegaStreamServiceImpl.sink(PravegaStream.scala:58)");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ZIO writeEvent$2(Object obj, WriterSettings writerSettings, Transaction transaction) {
        ZIO attemptBlocking;
        Some keyExtractor = writerSettings.keyExtractor();
        if (None$.MODULE$.equals(keyExtractor)) {
            attemptBlocking = ZIO$.MODULE$.attemptBlocking(() -> {
                transaction.writeEvent(obj);
            }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter.writeEvent(PravegaStream.scala:92)");
        } else {
            if (!(keyExtractor instanceof Some)) {
                throw new MatchError(keyExtractor);
            }
            Function1 function1 = (Function1) keyExtractor.value();
            attemptBlocking = ZIO$.MODULE$.attemptBlocking(() -> {
                transaction.writeEvent((String) function1.apply(obj), obj);
            }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter.writeEvent(PravegaStream.scala:94)");
        }
        return attemptBlocking;
    }

    public static final /* synthetic */ ZChannel $anonfun$sinkTx$10(WriterSettings writerSettings, Transaction transaction) {
        return ZSink$.MODULE$.foreach(obj -> {
            return writeEvent$2(obj, writerSettings, transaction);
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx.acquireWriter(PravegaStream.scala:96)");
    }

    public static final /* synthetic */ ZChannel $anonfun$sinkTx$14(ZIO zio2) {
        return ZSink$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZSink$.MODULE$.unwrapScoped(), () -> {
            return zio2;
        }, "zio.pravega.PravegaStreamServiceImpl.sinkTx(PravegaStream.scala:100)");
    }

    public PravegaStreamServiceImpl(EventStreamClientFactory eventStreamClientFactory) {
        this.eventStreamClientFactory = eventStreamClientFactory;
    }
}
