/*
 * Decompiled with CFR 0.152.
 */
package com.influxdb.client.internal;

import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WriteConsistency;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.internal.MeasurementMapper;
import com.influxdb.client.internal.RetryAttempt;
import com.influxdb.client.internal.flowable.BackpressureBatchesBufferStrategy;
import com.influxdb.client.internal.flowable.FlowableBufferTimedFlushable;
import com.influxdb.client.service.WriteService;
import com.influxdb.client.write.Point;
import com.influxdb.client.write.WriteParameters;
import com.influxdb.client.write.events.AbstractWriteEvent;
import com.influxdb.client.write.events.BackpressureEvent;
import com.influxdb.client.write.events.WriteErrorEvent;
import com.influxdb.client.write.events.WriteRetriableErrorEvent;
import com.influxdb.client.write.events.WriteSuccessEvent;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.internal.AbstractRestClient;
import com.influxdb.utils.Arguments;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableOperator;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeSource;
import io.reactivex.rxjava3.core.Notification;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.util.ArrayListSupplier;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import retrofit2.HttpException;
import retrofit2.Response;

public abstract class AbstractWriteClient
extends AbstractRestClient
implements AutoCloseable {
    private static final Logger LOG = Logger.getLogger(AbstractWriteClient.class.getName());
    private static final String CLOSED_EXCEPTION = "WriteApi is closed. Data should be written before calling InfluxDBClient.close or WriteApi.close.";
    private static final int DEFAULT_WAIT = 30000;
    private static final int DEFAULT_SLEEP = 25;
    private final WriteOptions writeOptions;
    protected final InfluxDBClientOptions options;
    private final PublishProcessor<BatchWriteItem> processor;
    private final PublishProcessor<Boolean> flushPublisher;
    private final PublishSubject<AbstractWriteEvent> eventPublisher;
    protected final MeasurementMapper measurementMapper = new MeasurementMapper();
    private final WriteService service;
    private final Collection<AutoCloseable> autoCloseables;
    private AtomicBoolean finished = new AtomicBoolean(false);

    public AbstractWriteClient(@Nonnull WriteOptions writeOptions, @Nonnull InfluxDBClientOptions options, @Nonnull Scheduler processorScheduler, @Nonnull WriteService service, @Nonnull Collection<AutoCloseable> autoCloseables) {
        Arguments.checkNotNull((Object)options, (String)"options");
        this.writeOptions = writeOptions;
        this.options = options;
        this.service = service;
        this.autoCloseables = autoCloseables;
        this.flushPublisher = PublishProcessor.create();
        this.eventPublisher = PublishSubject.create();
        this.processor = PublishProcessor.create();
        this.processor.onBackpressureBuffer((long)writeOptions.getBufferLimit(), () -> this.publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.FAST_EMITTING)), writeOptions.getBackpressureStrategy()).groupBy(it -> ((BatchWriteItem)it).writeParameters).flatMap(group -> group.compose(source -> new FlowableBufferTimedFlushable(source, (Publisher<Boolean>)this.flushPublisher, writeOptions.getFlushInterval(), TimeUnit.MILLISECONDS, writeOptions.getBatchSize(), processorScheduler, ArrayListSupplier.asSupplier())).map(batchItems -> {
            BatchWriteDataGrouped batch = new BatchWriteDataGrouped((WriteParameters)group.getKey());
            for (BatchWriteItem item : batchItems) {
                try {
                    batch.append(item.data.toLineProtocol());
                }
                catch (Exception e) {
                    this.publish(new WriteErrorEvent(e));
                }
            }
            return new BatchWriteItem(batch.group, batch);
        })).filter(batch -> batch.length() > 0L).flatMap(Flowable::just, Integer.MAX_VALUE).lift((FlowableOperator)new BackpressureBatchesBufferStrategy(writeOptions.getBufferLimit(), () -> this.publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES)), writeOptions.getBackpressureStrategy())).concatMapMaybe((Function)new ToWritePointsMaybe(processorScheduler, writeOptions)).doFinally(() -> this.finished.set(true)).subscribe(responseNotification -> {
            if (responseNotification.isOnError()) {
                this.publish(new WriteErrorEvent(this.toInfluxException(responseNotification.getError())));
            }
        }, throwable -> this.publish(new WriteErrorEvent(this.toInfluxException((Throwable)throwable))));
        autoCloseables.add(this);
    }

    @Nonnull
    protected <T extends AbstractWriteEvent> Observable<T> addEventListener(@Nonnull Class<T> eventType) {
        Objects.requireNonNull(eventType, "EventType is required");
        return this.eventPublisher.ofType(eventType);
    }

    public void flush() {
        this.flushPublisher.offer((Object)true);
    }

    @Override
    public void close() {
        LOG.log(Level.FINE, "Flushing any cached BatchWrites before shutdown.");
        this.autoCloseables.remove(this);
        this.processor.onComplete();
        this.flushPublisher.onComplete();
        this.eventPublisher.onComplete();
        AbstractWriteClient.waitToCondition(() -> this.finished.get(), 30000);
    }

    public void writePoints(@Nonnull WriteParameters writeParameters, @Nonnull Flowable<BatchWriteDataPoint> stream) {
        if (this.processor.hasComplete()) {
            throw new InfluxException(CLOSED_EXCEPTION);
        }
        Flowable flowable = Flowable.fromPublisher(stream).map(it -> new BatchWriteItem(writeParameters.copy(((BatchWriteDataPoint)it).point.getPrecision(), this.options), (BatchWriteData)it));
        this.write((Flowable<BatchWriteItem>)flowable);
    }

    public void write(@Nonnull WriteParameters writeParameters, @Nonnull Publisher<BatchWriteData> stream) {
        Arguments.checkNotNull((Object)writeParameters, (String)"writeParameters");
        Arguments.checkNotNull(stream, (String)"data to write");
        if (this.processor.hasComplete()) {
            throw new InfluxException(CLOSED_EXCEPTION);
        }
        Flowable flowable = Flowable.fromPublisher(stream).map(it -> new BatchWriteItem(writeParameters, (BatchWriteData)it));
        this.write((Flowable<BatchWriteItem>)flowable);
    }

    private void write(@Nonnull Flowable<BatchWriteItem> stream) {
        if (this.processor.hasComplete()) {
            throw new InfluxException(CLOSED_EXCEPTION);
        }
        stream.subscribe(arg_0 -> this.processor.onNext(arg_0), throwable -> this.publish(new WriteErrorEvent((Throwable)throwable)));
    }

    private <T extends AbstractWriteEvent> void publish(@Nonnull T event) {
        Arguments.checkNotNull(event, (String)"event");
        event.logEvent();
        this.eventPublisher.onNext(event);
    }

    @Nonnull
    public static <T> FlowableTransformer<T, T> jitter(@Nonnull Scheduler scheduler, @Nonnull WriteApi.RetryOptions retryOptions) {
        Arguments.checkNotNull((Object)retryOptions, (String)"JitterOptions is required");
        Arguments.checkNotNull((Object)scheduler, (String)"Jitter scheduler is required");
        return source -> {
            if (retryOptions.getJitterInterval() <= 0) {
                return source;
            }
            return source.delay(pointFlowable -> {
                int delay = RetryAttempt.jitterDelay(retryOptions.getJitterInterval());
                LOG.log(Level.FINEST, "Generated Jitter dynamic delay: {0}", delay);
                return Flowable.timer((long)delay, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)scheduler);
            });
        };
    }

    @Nonnull
    public static Function<Flowable<Throwable>, Publisher<?>> retry(@Nonnull Scheduler retryScheduler, @Nonnull WriteApi.RetryOptions retryOptions, @Nonnull BiConsumer<Throwable, Long> notify) {
        Objects.requireNonNull(retryOptions, "RetryOptions are required");
        Objects.requireNonNull(retryScheduler, "RetryScheduler is required");
        return errors -> errors.zipWith((Publisher)Flowable.range((int)1, (int)(retryOptions.getMaxRetries() + 1)), (throwable, count) -> new RetryAttempt((Throwable)throwable, (int)count, retryOptions)).flatMap(attempt -> {
            Throwable throwable = attempt.getThrowable();
            if (attempt.isRetry()) {
                long retryInterval = attempt.getRetryInterval();
                notify.accept(throwable, retryInterval);
                return Flowable.just((Object)"notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
            }
            return Flowable.error((Throwable)throwable);
        });
    }

    static void waitToCondition(Supplier<Boolean> condition, int millis) {
        long start = System.currentTimeMillis();
        while (!condition.get().booleanValue()) {
            try {
                Thread.sleep(25L);
            }
            catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Interrupted during wait to dispose.", e);
            }
            if (System.currentTimeMillis() - start <= (long)millis) continue;
            LOG.severe("The WriteApi can't be gracefully dispose! - " + millis + "ms elapsed.");
            break;
        }
    }

    private final class ToWritePointsMaybe
    implements Function<BatchWriteItem, Maybe<Notification<Response>>> {
        private final Scheduler retryScheduler;
        private final WriteApi.RetryOptions retryOptions;

        private ToWritePointsMaybe(@Nonnull Scheduler retryScheduler, WriteApi.RetryOptions retryOptions) {
            this.retryScheduler = retryScheduler;
            this.retryOptions = retryOptions;
        }

        public Maybe<Notification<Response>> apply(BatchWriteItem batchWrite) {
            Maybe requestSource;
            String content = batchWrite.data.toLineProtocol();
            if (content == null || content.isEmpty()) {
                return Maybe.empty();
            }
            String organization = batchWrite.writeParameters.orgSafe(AbstractWriteClient.this.options);
            String bucket = batchWrite.writeParameters.bucketSafe(AbstractWriteClient.this.options);
            WritePrecision precision = batchWrite.writeParameters.precisionSafe(AbstractWriteClient.this.options);
            WriteConsistency consistency = batchWrite.writeParameters.consistencySafe(AbstractWriteClient.this.options);
            Single<Response<Void>> postWriteRx = AbstractWriteClient.this.service.postWriteRx(organization, bucket, content, null, null, "text/plain; charset=utf-8", null, "application/json", null, precision, consistency);
            if (this.retryOptions.getJitterInterval() > 0) {
                int delay = RetryAttempt.jitterDelay(this.retryOptions.getJitterInterval());
                LOG.log(Level.FINEST, "Generated Jitter dynamic delay: {0}", delay);
                requestSource = Maybe.timer((long)delay, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)this.retryScheduler).flatMap(it -> postWriteRx.toMaybe());
            } else {
                requestSource = postWriteRx.toMaybe();
            }
            return requestSource.map(response -> {
                if (!response.isSuccessful()) {
                    throw new HttpException(response);
                }
                return response;
            }).retryWhen(AbstractWriteClient.retry(this.retryScheduler, AbstractWriteClient.this.writeOptions, (throwable, retryInterval) -> AbstractWriteClient.this.publish(new WriteRetriableErrorEvent(AbstractWriteClient.this.toInfluxException(throwable), (Long)retryInterval)))).timeout((long)AbstractWriteClient.this.writeOptions.getMaxRetryTime(), TimeUnit.MILLISECONDS, this.retryScheduler, (MaybeSource)Maybe.error((Throwable)new TimeoutException("Max retry time exceeded."))).map(response -> {
                if (response.isSuccessful()) {
                    return Notification.createOnNext((Object)response);
                }
                return Notification.createOnError((Throwable)new HttpException(response));
            }).doOnSuccess(responseNotification -> {
                if (!responseNotification.isOnError()) {
                    AbstractWriteClient.this.publish(this.toSuccessEvent(batchWrite, content));
                }
            }).onErrorResumeNext(throwable -> Maybe.just((Object)Notification.createOnError((Throwable)throwable)));
        }

        @Nonnull
        private WriteSuccessEvent toSuccessEvent(@Nonnull BatchWriteItem batchWrite, String lineProtocol) {
            return new WriteSuccessEvent(batchWrite.writeParameters.orgSafe(AbstractWriteClient.this.options), batchWrite.writeParameters.bucketSafe(AbstractWriteClient.this.options), batchWrite.writeParameters.precisionSafe(AbstractWriteClient.this.options), lineProtocol);
        }
    }

    public static final class BatchWriteItem {
        private WriteParameters writeParameters;
        private BatchWriteData data;

        public BatchWriteItem(@Nonnull WriteParameters writeParameters, @Nonnull BatchWriteData data) {
            Arguments.checkNotNull((Object)writeParameters, (String)"writeParameters");
            Arguments.checkNotNull((Object)data, (String)"data");
            this.writeParameters = writeParameters;
            this.data = data;
        }

        public long length() {
            return this.data.length();
        }

        @Nullable
        public String toLineProtocol() {
            return this.data.toLineProtocol();
        }
    }

    public static final class BatchWriteDataMeasurement
    implements BatchWriteData {
        private final Object measurement;
        private final WritePrecision precision;
        private final InfluxDBClientOptions options;
        private final MeasurementMapper measurementMapper;

        public BatchWriteDataMeasurement(@Nullable Object measurement, @Nonnull WritePrecision precision, @Nonnull InfluxDBClientOptions options, @Nonnull MeasurementMapper measurementMapper) {
            this.measurement = measurement;
            this.precision = precision;
            this.options = options;
            this.measurementMapper = measurementMapper;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            if (this.measurement == null) {
                return null;
            }
            Point point = this.measurementMapper.toPoint(this.measurement, this.precision);
            if (!point.hasFields()) {
                LOG.warning("The measurement: " + this.measurement + "doesn't contains any fields, skipping");
                return null;
            }
            return point.toLineProtocol(this.options.getPointSettings());
        }

        @Override
        @Nonnull
        public Long length() {
            return 1L;
        }
    }

    public static final class BatchWriteDataPoint
    implements BatchWriteData {
        private static final Logger LOG = Logger.getLogger(BatchWriteDataPoint.class.getName());
        private final Point point;
        private final WritePrecision precision;
        private final InfluxDBClientOptions options;

        public BatchWriteDataPoint(@Nonnull Point point, @Nonnull InfluxDBClientOptions options) {
            this(point, point.getPrecision(), options);
        }

        public BatchWriteDataPoint(@Nonnull Point point, @Nonnull WritePrecision precision, @Nonnull InfluxDBClientOptions options) {
            this.point = point;
            this.precision = precision;
            this.options = options;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            if (!this.point.hasFields()) {
                LOG.warning("The point: " + this.point + "doesn't contains any fields, skipping");
                return null;
            }
            return this.point.toLineProtocol(this.options.getPointSettings(), this.precision);
        }

        @Override
        @Nonnull
        public Long length() {
            return 1L;
        }
    }

    public static final class BatchWriteDataGrouped
    implements BatchWriteData {
        private final WriteParameters group;
        private final StringBuilder sb = new StringBuilder();
        private Long length = 0L;

        public BatchWriteDataGrouped(@Nonnull WriteParameters group) {
            this.group = group;
        }

        @Override
        public String toLineProtocol() {
            return this.sb.toString();
        }

        @Override
        @Nonnull
        public Long length() {
            return this.length;
        }

        public void append(@Nullable String lineProtocol) {
            if (lineProtocol == null) {
                return;
            }
            if (this.sb.length() > 0) {
                this.sb.append("\n");
            }
            this.sb.append(lineProtocol);
            Long l = this.length;
            Long l2 = this.length = Long.valueOf(this.length + 1L);
        }
    }

    public static final class BatchWriteDataRecord
    implements BatchWriteData {
        private final String record;

        public BatchWriteDataRecord(@Nullable String record) {
            this.record = record;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            return this.record;
        }

        @Override
        @Nonnull
        public Long length() {
            return 1L;
        }
    }

    public static interface BatchWriteData {
        @Nullable
        public String toLineProtocol();

        @Nonnull
        public Long length();
    }
}

