/*
 * Decompiled with CFR 0.152.
 */
package com.github.philippheuer.events4j.reactor;

import com.github.philippheuer.events4j.api.domain.IDisposable;
import com.github.philippheuer.events4j.api.service.IEventHandler;
import com.github.philippheuer.events4j.reactor.util.Events4JSubscriber;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.scheduler.forkjoin.ForkJoinPoolScheduler;

public class ReactorEventHandler
implements IEventHandler {
    private static final Logger log = LoggerFactory.getLogger(ReactorEventHandler.class);
    private final Scheduler scheduler;
    private final FluxProcessor<Object, Object> processor;
    private final FluxSink<Object> eventSink;

    public ReactorEventHandler() {
        this.scheduler = ForkJoinPoolScheduler.create((String)"events4j-events", (int)Math.max(Runtime.getRuntime().availableProcessors(), 4));
        this.processor = EmitterProcessor.create((int)8192, (boolean)true);
        this.eventSink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    public ReactorEventHandler(Scheduler scheduler, FluxProcessor<Object, Object> processor, FluxSink.OverflowStrategy overflowStrategy) {
        this.scheduler = scheduler;
        this.processor = processor;
        this.eventSink = processor.sink(overflowStrategy);
    }

    public void publish(Object event) {
        this.eventSink.next(event);
    }

    public <E> IDisposable onEvent(Class<E> eventClass, Consumer<E> consumer) {
        Flux flux = this.processor.publishOn(this.scheduler).ofType(eventClass);
        Events4JSubscriber<E> subscription = new Events4JSubscriber<E>(consumer);
        flux.subscribe(subscription);
        return subscription;
    }

    public void close() {
        this.eventSink.complete();
        try {
            Thread.sleep(1000L);
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
        if (!this.scheduler.isDisposed()) {
            this.scheduler.dispose();
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public FluxProcessor<Object, Object> getProcessor() {
        return this.processor;
    }

    public FluxSink<Object> getEventSink() {
        return this.eventSink;
    }
}

