/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CommandReactiveService
extends CommandAsyncService
implements CommandReactiveExecutor {
    public CommandReactiveService(ConnectionManager connectionManager) {
        super(connectionManager);
    }

    @Override
    public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
        return Flux.create(emitter -> emitter.onRequest(n -> {
            RFuture future;
            try {
                future = (RFuture)supplier.call();
            }
            catch (Exception e2) {
                emitter.error(e2);
                return;
            }
            emitter.onDispose(() -> future.cancel(true));
            future.onComplete((v, e) -> {
                if (e != null) {
                    emitter.error((Throwable)e);
                    return;
                }
                if (v != null) {
                    emitter.next(v);
                }
                emitter.complete();
            });
        })).next();
    }
}

