package de.hallobtf.Kai.server;

import de.hallobtf.Basics.B2Parameter;
import de.hallobtf.Basics.B2Protocol;
import de.hallobtf.Kai.pojo.KaiSSE;
import j$.time.Duration;
import jakarta.annotation.PreDestroy;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Component
/* loaded from: classes.dex */
public class KaiSSEPublisher implements Serializable, CacheEntryCreatedListener<Object, Object>, CacheEntryUpdatedListener<Object, Object>, CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
    private static int pingInterval;
    private ExecutorService executor;
    private DelayQueue<KaiSSE> delayQueue = new DelayQueue<>();
    private ConcurrentHashMap<String, Sinks.Many<ServerSentEvent<KaiSSE>>> queueMap = new ConcurrentHashMap<>();

    static {
        try {
            pingInterval = Integer.parseInt(B2Parameter.getInstance().get("SSE_PingInterval", "10000"));
        } catch (NumberFormatException unused) {
            B2Protocol.getInstance().severe("Parameter SSE_PingInterval ungültig: " + B2Parameter.getInstance().get("SSE_PingInterval"));
            pingInterval = 1000;
        }
    }

    public KaiSSEPublisher() {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        this.executor = newSingleThreadExecutor;
        newSingleThreadExecutor.submit(new Runnable() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda1
            @Override // java.lang.Runnable
            public final void run() {
                KaiSSEPublisher.this.lambda$new$3();
            }
        });
    }

    private ServerSentEvent<KaiSSE> createSSE(KaiSSE kaiSSE) {
        ServerSentEvent.Builder builder = ServerSentEvent.builder();
        long nanoTime = System.nanoTime();
        StringBuilder sb = new StringBuilder();
        sb.append(nanoTime);
        return builder.id(sb.toString()).event("message").data(kaiSSE).build();
    }

    private void handleCacheEvent(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
        if (this.queueMap.size() > 0) {
            iterable.forEach(new Consumer() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda9
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    KaiSSEPublisher.this.lambda$handleCacheEvent$8((CacheEntryEvent) obj);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ Sinks.Many lambda$generate$4(String str, String str2) {
        return registerObserver(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ ServerSentEvent lambda$generate$7(String str, Long l) {
        return createSSE(new KaiSSE(str, null, null, "PING", "MESSAGE", l));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$handleCacheEvent$8(CacheEntryEvent cacheEntryEvent) {
        B2Protocol.getInstance().info("[KaiSSEPublisher.handleCacheEvent]: " + cacheEntryEvent.getSource().getName() + " " + cacheEntryEvent.getEventType() + "\t" + cacheEntryEvent.getKey());
        publishCacheEvent(cacheEntryEvent.getSource().getName(), cacheEntryEvent.getEventType().name(), cacheEntryEvent.getKey().toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$0(KaiSSE kaiSSE, Map.Entry entry) {
        sendSSE((Sinks.Many) entry.getValue(), createSSE(new KaiSSE((String) entry.getKey(), kaiSSE)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$1(KaiSSE kaiSSE, Sinks.Many many) {
        sendSSE(many, createSSE(kaiSSE));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$new$2(KaiSSE kaiSSE) {
        B2Protocol.getInstance().severe("Message not sent, no sink for: " + kaiSSE.getGuid());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$3() {
        while (true) {
            try {
                final KaiSSE take = this.delayQueue.take();
                if (take.getGuid() == null) {
                    this.queueMap.entrySet().forEach(new Consumer() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda2
                        @Override // java.util.function.Consumer
                        public final void accept(Object obj) {
                            KaiSSEPublisher.this.lambda$new$0(take, (Map.Entry) obj);
                        }
                    });
                } else {
                    KaiSSEPublisher$$ExternalSyntheticBackport0.m(Optional.ofNullable(this.queueMap.get(take.getGuid())), new Consumer() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda3
                        @Override // java.util.function.Consumer
                        public final void accept(Object obj) {
                            KaiSSEPublisher.this.lambda$new$1(take, (Sinks.Many) obj);
                        }
                    }, new Runnable() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda4
                        @Override // java.lang.Runnable
                        public final void run() {
                            KaiSSEPublisher.lambda$new$2(KaiSSE.this);
                        }
                    });
                }
            } catch (InterruptedException unused) {
                B2Protocol.getInstance().severe("[KaiSSEPublisher.publishCacheEvent] delayed queue interrupted");
                return;
            }
        }
    }

    private void publishCacheEvent(String str, String str2, String str3) {
        String str4;
        String substring;
        int indexOf;
        int indexOf2 = str3.indexOf("[et=");
        int indexOf3 = str3.indexOf("]");
        if (indexOf2 < 0 || indexOf3 <= indexOf2 || !str3.substring(indexOf2, indexOf3).contains(str2)) {
            return;
        }
        if (str3.length() <= indexOf3 || (indexOf = (substring = str3.substring(indexOf3 + 1)).indexOf("[m=")) < 0) {
            str4 = null;
        } else {
            String substring2 = substring.substring(indexOf + 3);
            str4 = substring2.substring(0, substring2.indexOf("]")).split("\\|", 2)[0];
        }
        KaiSSE kaiSSE = new KaiSSE(null, null, null, "CACHE_" + str2, str, str4);
        if (this.delayQueue.remove(kaiSSE)) {
            B2Protocol.getInstance().info("[KaiSSEPublisher.publishCacheEvent] refresh delayed Event: " + kaiSSE.toString());
        }
        this.delayQueue.offer((DelayQueue<KaiSSE>) kaiSSE);
    }

    private void sendSSE(Sinks.Many<ServerSentEvent<KaiSSE>> many, ServerSentEvent<KaiSSE> serverSentEvent) {
        Sinks.EmitResult tryEmitNext = many.tryEmitNext(serverSentEvent);
        if (tryEmitNext.isSuccess()) {
            return;
        }
        B2Protocol.getInstance().severe("[KaiSSEPublisher.sendSSE FAIL] " + tryEmitNext);
    }

    /* renamed from: deregisterObserver, reason: merged with bridge method [inline-methods] */
    public void lambda$generate$6(String str) {
        Sinks.Many<ServerSentEvent<KaiSSE>> many = this.queueMap.get(str);
        if (many == null) {
            B2Protocol.getInstance().severe("[KaiSSEPublisher.deregisterObserver] Observer not registered: " + str);
            return;
        }
        Sinks.EmitResult tryEmitComplete = many.tryEmitComplete();
        if (!tryEmitComplete.isSuccess()) {
            B2Protocol.getInstance().severe("[KaiSSEPublisher.deregisterObserver] Error deregistering Observer: " + str + "->" + tryEmitComplete);
            return;
        }
        this.queueMap.remove(str);
        B2Protocol.getInstance().info("[KaiSSEPublisher.deregisterObserver] Observer deregistered: " + str + "->" + tryEmitComplete);
    }

    @PreDestroy
    public void destroy() {
        ExecutorService executorService = this.executor;
        if (executorService != null) {
            executorService.shutdownNow();
        }
    }

    public Flux<ServerSentEvent<KaiSSE>> generate(final String str) {
        return Flux.merge(new Publisher[]{this.queueMap.computeIfAbsent(str, new Function() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda5
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                Sinks.Many lambda$generate$4;
                lambda$generate$4 = KaiSSEPublisher.this.lambda$generate$4(str, (String) obj);
                return lambda$generate$4;
            }
        }).asFlux().doOnSubscribe(new Consumer() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda6
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                KaiSSEPublisher.this.lambda$generate$5(str, (Subscription) obj);
            }
        }).doOnCancel(new Runnable() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda7
            @Override // java.lang.Runnable
            public final void run() {
                KaiSSEPublisher.this.lambda$generate$6(str);
            }
        }), Flux.interval(Duration.ofMillis(pingInterval)).map(new Function() { // from class: de.hallobtf.Kai.server.KaiSSEPublisher$$ExternalSyntheticLambda8
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                ServerSentEvent lambda$generate$7;
                lambda$generate$7 = KaiSSEPublisher.this.lambda$generate$7(str, (Long) obj);
                return lambda$generate$7;
            }
        })});
    }

    public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
    }

    public void onExpired(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
    }

    public void onRemoved(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
        handleCacheEvent(iterable);
    }

    public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
    }

    public void publishEvent(String str, Long l, String str2, String str3, String str4, Object obj) {
        KaiSSE kaiSSE = new KaiSSE(str, l, str2, str3, str4, obj);
        if (this.delayQueue.remove(kaiSSE)) {
            B2Protocol.getInstance().info("[KaiSSEPublisher.publishEvent] refresh delayed Event: " + kaiSSE.toString());
        }
        this.delayQueue.offer((DelayQueue<KaiSSE>) kaiSSE);
    }

    public Sinks.Many<ServerSentEvent<KaiSSE>> registerObserver(String str) {
        B2Protocol.getInstance().info("[KaiSSEPublisher.registerObserver] " + str);
        return Sinks.many().multicast().onBackpressureBuffer();
    }

    /* renamed from: subscribe, reason: merged with bridge method [inline-methods] */
    public void lambda$generate$5(String str, Subscription subscription) {
        B2Protocol.getInstance().info("[KaiSSEPublisher.subscribe] " + str + "->" + subscription);
        publishEvent(str.toString(), null, null, "MESSAGE", "GUID", str.toString());
    }
}
