package tamer;

import java.time.Duration;
import log.effect.LogWriter;
import log.effect.LogWriter$;
import log.effect.LogWriterOps$;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.StrictOptimizedIterableOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.math.Ordering;
import scala.math.Ordering$;
import scala.math.Ordering$Int$;
import scala.math.Ordering$String$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import tamer.Tamer;
import zio.CanFail$;
import zio.Chunk;
import zio.Chunk$;
import zio.Dequeue;
import zio.DurationSyntax$;
import zio.Enqueue;
import zio.NonEmptyChunk;
import zio.Promise$;
import zio.Schedule;
import zio.Schedule$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$ScopedPartiallyApplied$;
import zio.ZLayer;
import zio.Zippable$;
import zio.kafka.consumer.Consumer;
import zio.kafka.consumer.Offset;
import zio.kafka.consumer.Subscription;
import zio.kafka.consumer.Subscription$;
import zio.kafka.producer.Transaction;
import zio.kafka.producer.TransactionalProducer;
import zio.kafka.serde.Serializer;
import zio.package;
import zio.stream.ZStream;
import zio.stream.ZStream$;

/* compiled from: Tamer.scala */
/* loaded from: input_file:tamer/Tamer$.class */
public final class Tamer$ {
    public static final Tamer$ MODULE$ = new Tamer$();
    private static final Schedule<Object, Object, Tuple2<Object, Duration>> retries = Schedule$.MODULE$.recurs(10, "tamer.Tamer.retries(Tamer.scala:38)").$amp$amp(Schedule$.MODULE$.exponential(DurationSyntax$.MODULE$.milliseconds$extension(zio.package$.MODULE$.durationInt(100)), Schedule$.MODULE$.exponential$default$2(), "tamer.Tamer.retries(Tamer.scala:38)"), Zippable$.MODULE$.Zippable2());

    /* JADX INFO: Access modifiers changed from: private */
    public final Schedule<Object, Object, Tuple2<Object, Duration>> retries() {
        return retries;
    }

    private final Offset OffsetOps(Offset offset) {
        return offset;
    }

    public final <K, V> ZStream<Object, Throwable, BoxedUnit> sinkStream(String str, Serializer<Object, K> serializer, Serializer<Object, V> serializer2, Dequeue<Tuple2<Tamer.TxInfo, Chunk<Record<K, V>>>> dequeue, LogWriter<ZIO> logWriter) {
        return ZStream$.MODULE$.fromQueueWithShutdown(() -> {
            return dequeue;
        }, () -> {
            return ZStream$.MODULE$.fromQueueWithShutdown$default$2();
        }, "tamer.Tamer.sinkStream(Tamer.scala:52)").mapZIO(tuple2 -> {
            if (tuple2 != null) {
                Tamer.TxInfo txInfo = (Tamer.TxInfo) tuple2._1();
                Chunk chunk = (Chunk) tuple2._2();
                if (txInfo instanceof Tamer.TxInfo.Context) {
                    Transaction transaction = ((Tamer.TxInfo.Context) txInfo).transaction();
                    if (chunk.nonEmpty()) {
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return new StringBuilder(21).append("pushing ").append(chunk.size()).append(" messages to ").append(str).toString();
                        })).$times$greater(() -> {
                            return transaction.produceChunk(chunk.map(record -> {
                                return record.toKafkaProducerRecord(str);
                            }), serializer, serializer2, None$.MODULE$).tapError(th -> {
                                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return new StringBuilder(53).append("failed pushing ").append(chunk.size()).append(" messages to ").append(str).append(", will retry. Caused by: ").append(th.getMessage()).toString();
                                });
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:58)").retry(() -> {
                                return MODULE$.retries();
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:59)").tapError(th2 -> {
                                return (ZIO) LogWriterOps$.MODULE$.warn$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return new StringBuilder(61).append("finally failed pushing ").append(chunk.size()).append(" messages to ").append(str).append(", will abort. Caused by: ").append(th2.getMessage()).toString();
                                }, () -> {
                                    return th2;
                                });
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:60)").unit("tamer.Tamer.sinkStream(Tamer.scala:61)");
                        }, "tamer.Tamer.sinkStream(Tamer.scala:55)").$times$greater(() -> {
                            return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                return new StringBuilder(33).append("successfully pushed ").append(chunk.size()).append(" messages to ").append(str).toString();
                            });
                        }, "tamer.Tamer.sinkStream(Tamer.scala:61)");
                    }
                }
            }
            if (tuple2 != null) {
                Tamer.TxInfo txInfo2 = (Tamer.TxInfo) tuple2._1();
                if (txInfo2 instanceof Tamer.TxInfo.Delimiter) {
                    return ((Tamer.TxInfo.Delimiter) txInfo2).promise().succeed(BoxedUnit.UNIT, "tamer.Tamer.sinkStream(Tamer.scala:64)").unit("tamer.Tamer.sinkStream(Tamer.scala:64)").$less$times(() -> {
                        return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return "user implicitly signalled end of data production";
                        });
                    }, "tamer.Tamer.sinkStream(Tamer.scala:64)");
                }
            }
            return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                return new StringBuilder(28).append("received an empty chunk for ").append(str).toString();
            });
        }, "tamer.Tamer.sinkStream(Tamer.scala:53)");
    }

    public final <K, V, SV> ZStream<Object, Throwable, BoxedUnit> sourceStream(String str, String str2, int i, zio.kafka.serde.Serde<Object, Tamer.StateKey> serde, zio.kafka.serde.Serde<Object, SV> serde2, SV sv, Consumer consumer, TransactionalProducer transactionalProducer, Enqueue<Tuple2<Tamer.TxInfo, Chunk<Record<K, V>>>> enqueue, Function2<SV, Enqueue<NonEmptyChunk<Record<K, V>>>, ZIO<Object, Throwable, SV>> function2, LogWriter<ZIO> logWriter) {
        Tamer.StateKey stateKey = new Tamer.StateKey(RichInt$.MODULE$.toHexString$extension(Predef$.MODULE$.intWrapper(i)), str2);
        Subscription subscription = Subscription$.MODULE$.topics(str, Nil$.MODULE$);
        ZIO map = consumer.partitionsFor(str, consumer.partitionsFor$default$2()).map(list -> {
            return list.map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).toSet();
        }, "tamer.Tamer.sourceStream.partitionSet(Tamer.scala:86)");
        ZIO flatMap = ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
            return new StringBuilder(31).append("obtaining information on topic ").append(str).toString();
        })).flatMap(boxedUnit -> {
            return map.flatMap(set -> {
                return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(46).append("received the following information on topic ").append(str).append(": ").append(map).toString();
                })).flatMap(boxedUnit -> {
                    return consumer.committed(set, consumer.committed$default$2()).flatMap(map2 -> {
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return new StringBuilder(76).append("received the following commited state information on topic ").append(str).append(" for the group ").append(str2).append(": ").append(map2).toString();
                        })).flatMap(boxedUnit -> {
                            return consumer.endOffsets(set, consumer.endOffsets$default$2()).flatMap(map2 -> {
                                return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return new StringBuilder(62).append("received the following end offsets information on the topic ").append(str).append(": ").append(map2).toString();
                                })).flatMap(boxedUnit -> {
                                    return decide$1(map2, map2).flatMap(startupDecision -> {
                                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                            return new StringBuilder(11).append("decided to ").append(startupDecision).toString();
                                        })).map(boxedUnit -> {
                                            return startupDecision;
                                        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:113)");
                                    }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:112)");
                                }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:111)");
                            }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:110)");
                        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:109)");
                    }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:108)");
                }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:107)");
            }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:106)");
        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:105)").flatMap(startupDecision -> {
            if (Tamer$StartupDecision$Initialize$.MODULE$.equals(startupDecision)) {
                return ((ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(42).append("consumer group ").append(str2).append(" never consumed from topic ").append(str).toString();
                })).$times$greater(() -> {
                    return ZIO$ScopedPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.scoped(), () -> {
                        return transactionalProducer.createTransaction().flatMap(transaction -> {
                            return transaction.produce(str, stateKey, sv, serde, serde2, None$.MODULE$).tap(recordMetadata -> {
                                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return new StringBuilder(25).append("pushed initial state ").append(sv).append(" to ").append(recordMetadata).toString();
                                });
                            }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:123)").unit("tamer.Tamer.sourceStream.initialize(Tamer.scala:124)");
                        }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:120)");
                    }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:119)");
                }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:118)");
            }
            if (Tamer$StartupDecision$Resume$.MODULE$.equals(startupDecision)) {
                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(48).append("consumer group ").append(str2).append(" resuming consumption from topic ").append(str).toString();
                });
            }
            throw new MatchError(startupDecision);
        }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:116)");
        ZStream mapZIO = consumer.plainStream(subscription, serde, serde2, consumer.plainStream$default$4()).mapZIO(committableRecord -> {
            Object key = committableRecord.record().key();
            if (key != null ? key.equals(stateKey) : stateKey == null) {
                return ZIO$ScopedPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.scoped(), () -> {
                    Offset offset = committableRecord.offset();
                    return transactionalProducer.createTransaction().flatMap(transaction -> {
                        EnrichedBoundedEnqueue enrichedBoundedEnqueue = new EnrichedBoundedEnqueue(enqueue, nonEmptyChunk -> {
                            return new Tuple2(new Tamer.TxInfo.Context(transaction), nonEmptyChunk.toChunk());
                        });
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return new StringBuilder(37).append("consumer group ").append(str2).append(" consumed state ").append(committableRecord.record().value()).append(" from ").append(Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset))).toString();
                        })).$times$greater(() -> {
                            return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                return new StringBuilder(38).append("invoking the iteration function under ").append(str2).toString();
                            });
                        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:139)").$times$greater(() -> {
                            return ((ZIO) function2.apply(committableRecord.record().value(), enrichedBoundedEnqueue)).flatMap(obj -> {
                                return Promise$.MODULE$.make("tamer.Tamer.sourceStream.stateStream(Tamer.scala:146)").flatMap(promise -> {
                                    return enqueue.offer(new Tuple2(new Tamer.TxInfo.Delimiter(promise), Chunk$.MODULE$.empty()), "tamer.Tamer.sourceStream.stateStream(Tamer.scala:149)").$times$greater(() -> {
                                        return promise.await("tamer.Tamer.sourceStream.stateStream(Tamer.scala:152)");
                                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:149)");
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:146)").$times$greater(() -> {
                                    return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                        return new StringBuilder(35).append("consumer group ").append(str2).append(" will commit offset ").append(Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset))).toString();
                                    });
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:153)").$less$times(() -> {
                                    return transaction.produce(str, stateKey, obj, serde, serde2, new Some(offset)).tap(recordMetadata -> {
                                        return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                            return new StringBuilder(22).append("pushed state ").append(obj).append(" to ").append(recordMetadata).append(" for ").append(str2).toString();
                                        });
                                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:157)");
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:154)");
                            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:141)");
                        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:141)");
                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:136)");
                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:134)");
            }
            Offset offset = committableRecord.offset();
            return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                return new StringBuilder(53).append("consumer group ").append(str2).append(" ignored state (wrong key: ").append(committableRecord.record().key()).append(" != ").append(stateKey).append(") from ").append(Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset))).toString();
            })).$times$greater(() -> {
                return offset.commitOrRetry(MODULE$.retries());
            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:163)").$less$times(() -> {
                return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(33).append("consumer group ").append(str2).append(" committed offset ").append(Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset))).toString();
                });
            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:164)");
        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:132)");
        return ZStream$.MODULE$.fromZIO(() -> {
            return flatMap;
        }, "tamer.Tamer.sourceStream(Tamer.scala:168)").drain("tamer.Tamer.sourceStream(Tamer.scala:168)").$plus$plus(() -> {
            return mapZIO;
        }, "tamer.Tamer.sourceStream(Tamer.scala:168)");
    }

    public final <R, K, V, SV> ZLayer<KafkaConfig, Throwable, Tamer> live(Setup<R, K, V, SV> setup, package.Tag<K> tag, package.Tag<V> tag2, package.Tag<SV> tag3) {
        return Tamer$LiveTamer$.MODULE$.getLayer(setup, tag, tag2, tag3);
    }

    public static final /* synthetic */ boolean $anonfun$sourceStream$6(Tuple2 tuple2, Tuple2 tuple22) {
        Tuple2 tuple23 = new Tuple2(tuple2, tuple22);
        if (tuple23 == null) {
            return false;
        }
        Tuple2 tuple24 = (Tuple2) tuple23._1();
        Tuple2 tuple25 = (Tuple2) tuple23._2();
        if (tuple24 == null) {
            return false;
        }
        TopicPartition topicPartition = (TopicPartition) tuple24._1();
        long _2$mcJ$sp = tuple24._2$mcJ$sp();
        if (tuple25 == null) {
            return false;
        }
        TopicPartition topicPartition2 = (TopicPartition) tuple25._1();
        Some some = (Option) tuple25._2();
        if (!(some instanceof Some)) {
            return false;
        }
        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) some.value();
        if (topicPartition != null ? topicPartition.equals(topicPartition2) : topicPartition2 == null) {
            if (_2$mcJ$sp > offsetAndMetadata.offset()) {
                return true;
            }
        }
        return false;
    }

    private static final ZIO decide$1(Map map, Map map2) {
        Ordering by = scala.package$.MODULE$.Ordering().by(topicPartition -> {
            return new Tuple2(topicPartition.topic(), BoxesRunTime.boxToInteger(topicPartition.partition()));
        }, Ordering$.MODULE$.Tuple2(Ordering$String$.MODULE$, Ordering$Int$.MODULE$));
        Ordering on = by.on(tuple2 -> {
            return (TopicPartition) tuple2._1();
        });
        Ordering on2 = by.on(tuple22 -> {
            return (TopicPartition) tuple22._1();
        });
        Function2 function2 = (tuple23, tuple24) -> {
            return BoxesRunTime.boxToBoolean($anonfun$sourceStream$6(tuple23, tuple24));
        };
        Set keySet = map2.keySet();
        Set keySet2 = map.keySet();
        if (keySet != null ? keySet.equals(keySet2) : keySet2 == null) {
            if (map.values().forall(option -> {
                return BoxesRunTime.boxToBoolean(option.isEmpty());
            })) {
                return ZIO$.MODULE$.succeed(() -> {
                    return Tamer$StartupDecision$Initialize$.MODULE$;
                }, "tamer.Tamer.sourceStream.decide(Tamer.scala:99)");
            }
        }
        return ((List) ((StrictOptimizedIterableOps) map2.toList().sorted(on)).zip((IterableOnce) map.toList().sorted(on2))).forall(function2.tupled()) ? ZIO$.MODULE$.succeed(() -> {
            return Tamer$StartupDecision$Resume$.MODULE$;
        }, "tamer.Tamer.sourceStream.decide(Tamer.scala:100)") : ZIO$.MODULE$.fail(() -> {
            return TamerError$.MODULE$.apply("Tamer is stuck, it will not proceed unless state is restored manually");
        }, "tamer.Tamer.sourceStream.decide(Tamer.scala:101)");
    }

    private Tamer$() {
    }
}
