package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:io/vertx/redis/client/impl/RedisClient.class */
public class RedisClient implements Redis, ParserHandler {
    private static final Logger LOG = LoggerFactory.getLogger(RedisClient.class);
    private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
    private final ArrayQueue waiting;
    private final NetClient netClient;
    private final SocketAddress socketAddress;
    private final RedisOptions options;
    private Handler<Void> onEnd;
    private Handler<Response> onMessage;
    private NetSocket netSocket;
    private Handler<Throwable> onException = th -> {
        LOG.error("Unhandled Error", th);
    };
    private boolean connected = false;

    public static Redis create(Vertx vertx, RedisOptions redisOptions) {
        return create(vertx, redisOptions, redisOptions.getEndpoint());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Redis create(Vertx vertx, RedisOptions redisOptions, SocketAddress socketAddress) {
        return new RedisClient(vertx, redisOptions, socketAddress);
    }

    private static void authenticate(Redis redis, RedisOptions redisOptions, Handler<AsyncResult<Void>> handler) {
        if (redisOptions.getPassword() == null) {
            handler.handle(Future.succeededFuture());
        } else {
            redis.send(Request.cmd(Command.AUTH).arg(redisOptions.getPassword()), asyncResult -> {
                if (asyncResult.failed()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    handler.handle(Future.succeededFuture());
                }
            });
        }
    }

    private static void select(Redis redis, RedisOptions redisOptions, Handler<AsyncResult<Void>> handler) {
        if (redisOptions.getSelect() == null) {
            handler.handle(Future.succeededFuture());
        } else {
            redis.send(Request.cmd(Command.SELECT).arg(redisOptions.getSelect().intValue()), asyncResult -> {
                if (asyncResult.failed()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    handler.handle(Future.succeededFuture());
                }
            });
        }
    }

    private RedisClient(Vertx vertx, RedisOptions redisOptions, SocketAddress socketAddress) {
        this.netClient = vertx.createNetClient(redisOptions.getNetClientOptions());
        this.waiting = new ArrayQueue(redisOptions.getMaxWaitingHandlers());
        this.socketAddress = socketAddress;
        this.options = redisOptions;
    }

    @Override // io.vertx.redis.client.Redis
    public Redis connect(Handler<AsyncResult<Redis>> handler) {
        if (this.connected) {
            handler.handle(Future.succeededFuture(this));
            return this;
        }
        this.netClient.connect(this.socketAddress, asyncResult -> {
            if (asyncResult.failed()) {
                this.netClient.close();
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                this.netSocket = (NetSocket) asyncResult.result();
                this.netSocket.handler(new RESPParser(this, this.options.getMaxNestedArrays())).closeHandler(r4 -> {
                    this.netClient.close();
                    cleanupQueue(CONNECTION_CLOSED);
                    this.connected = false;
                    if (this.onEnd != null) {
                        this.onEnd.handle(r4);
                    }
                }).exceptionHandler(th -> {
                    this.netSocket.close();
                    this.netClient.close();
                    cleanupQueue(th);
                    this.connected = false;
                    if (this.onException != null) {
                        this.onException.handle(th);
                    }
                });
                authenticate(this, this.options, asyncResult -> {
                    if (asyncResult.failed()) {
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                    } else {
                        select(this, this.options, asyncResult -> {
                            if (asyncResult.failed()) {
                                handler.handle(Future.failedFuture(asyncResult.cause()));
                            } else {
                                this.connected = true;
                                handler.handle(Future.succeededFuture(this));
                            }
                        });
                    }
                });
            }
        });
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    public void close() {
        if (this.netSocket != null) {
            this.netSocket.close();
        }
        this.connected = false;
    }

    @Override // io.vertx.redis.client.Redis
    public Redis exceptionHandler(Handler<Throwable> handler) {
        this.onException = handler;
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    public Redis endHandler(Handler<Void> handler) {
        this.onEnd = handler;
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    public Redis handler(Handler<Response> handler) {
        this.onMessage = handler;
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: pause */
    public Redis mo4pause() {
        this.netSocket.pause();
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: resume */
    public Redis mo3resume() {
        this.netSocket.resume();
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: fetch */
    public Redis mo2fetch(long j) {
        return this;
    }

    private void cleanupQueue(Throwable th) {
        while (true) {
            Handler handler = (Handler) this.waiting.poll();
            if (handler == null) {
                return;
            }
            if (th != null) {
                try {
                    handler.handle(Future.failedFuture(th));
                } catch (RuntimeException e) {
                    LOG.warn("Exception during cleanup", e);
                }
            }
        }
    }

    @Override // io.vertx.redis.client.Redis
    public Redis send(Request request, Handler<AsyncResult<Response>> handler) {
        if (!this.connected) {
            handler.handle(Future.failedFuture("Redis connection is broken."));
            return this;
        }
        if (this.waiting.isFull()) {
            handler.handle(Future.failedFuture("Redis waiting Queue is full"));
            return this;
        }
        try {
            Buffer encode = ((RequestImpl) request).encode();
            this.waiting.offer(handler);
            this.netSocket.write(encode);
        } catch (RuntimeException e) {
            this.onException.handle(e);
        }
        return this;
    }

    @Override // io.vertx.redis.client.Redis
    public Redis batch(List<Request> list, Handler<AsyncResult<List<Response>>> handler) {
        if (this.waiting.freeSlots() < list.size()) {
            handler.handle(Future.failedFuture("Redis waiting Queue is full"));
            return this;
        }
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList arrayList2 = new ArrayList(list.size());
        AtomicInteger atomicInteger = new AtomicInteger(list.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Buffer buffer = Buffer.buffer();
        for (int i = 0; i < list.size(); i++) {
            int i2 = i;
            ((RequestImpl) list.get(i2)).encode(buffer);
            arrayList.add(i2, asyncResult -> {
                if (atomicBoolean.get()) {
                    return;
                }
                if (asyncResult.failed()) {
                    atomicBoolean.set(true);
                    if (handler != null) {
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                        return;
                    }
                    return;
                }
                arrayList2.add(i2, asyncResult.result());
                if (atomicInteger.decrementAndGet() != 0 || handler == null) {
                    return;
                }
                handler.handle(Future.succeededFuture(arrayList2));
            });
        }
        this.netSocket.write(buffer, asyncResult2 -> {
            if (asyncResult2.succeeded()) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.waiting.offer((Handler) it.next());
                }
                return;
            }
            try {
                handler.handle(Future.failedFuture(asyncResult2.cause()));
            } catch (Throwable th) {
                fail(th);
            }
        });
        return this;
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void handle(Response response) {
        if (this.waiting.isEmpty()) {
            if (this.onMessage != null) {
                this.onMessage.handle(response);
                return;
            } else {
                LOG.warn("No handler waiting for message: " + response);
                return;
            }
        }
        Handler handler = (Handler) this.waiting.poll();
        if (handler == null) {
            LOG.error("No handler waiting for message: " + response);
            return;
        }
        if (response == null) {
            try {
                handler.handle(Future.succeededFuture());
                return;
            } catch (RuntimeException e) {
                this.onException.handle(e);
                return;
            }
        }
        if (response.type() == ResponseType.ERROR) {
            try {
                handler.handle(Future.failedFuture((ErrorType) response));
                return;
            } catch (RuntimeException e2) {
                this.onException.handle(e2);
                return;
            }
        }
        try {
            handler.handle(Future.succeededFuture(response));
        } catch (RuntimeException e3) {
            this.onException.handle(e3);
        }
    }

    @Override // io.vertx.redis.client.Redis
    public SocketAddress socketAddress() {
        return this.socketAddress;
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void fail(Throwable th) {
        if (this.onException != null) {
            this.onException.handle(th);
        } else {
            LOG.error("External failure", th);
        }
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void fatal(Throwable th) {
        if (this.onException != null) {
            this.onException.handle(th);
        } else {
            LOG.error("External failure", th);
        }
        close();
        cleanupQueue(th);
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo1endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo5handler(Handler handler) {
        return handler((Handler<Response>) handler);
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.redis.client.Redis
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo7exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
