/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.rca.net.handler;

import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.grpc.PublishResponse;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import org.opensearch.performanceanalyzer.rca.net.NodeStateManager;
import org.opensearch.performanceanalyzer.rca.net.ReceivedFlowUnitStore;
import org.opensearch.performanceanalyzer.rca.net.tasks.FlowUnitRxTask;

public class PublishRequestHandler {
    private static final Logger LOG = LogManager.getLogger(PublishRequestHandler.class);
    private final AtomicReference<ExecutorService> executorReference;
    private final NodeStateManager nodeStateManager;
    private final ReceivedFlowUnitStore receivedFlowUnitStore;
    private List<SendDataClientStreamUpdateConsumer> dataClientStreamList = Collections.synchronizedList(new ArrayList());

    public PublishRequestHandler(NodeStateManager nodeStateManager, ReceivedFlowUnitStore receivedFlowUnitStore, AtomicReference<ExecutorService> executorReference) {
        this.executorReference = executorReference;
        this.nodeStateManager = nodeStateManager;
        this.receivedFlowUnitStore = receivedFlowUnitStore;
    }

    public StreamObserver<FlowUnitMessage> getClientStream(StreamObserver<PublishResponse> serviceResponse) {
        SendDataClientStreamUpdateConsumer streamUpdateConsumer = new SendDataClientStreamUpdateConsumer(serviceResponse);
        this.dataClientStreamList.add(streamUpdateConsumer);
        return streamUpdateConsumer;
    }

    public void terminateUpstreamConnections() {
        for (SendDataClientStreamUpdateConsumer streamUpdateConsumer : this.dataClientStreamList) {
            StreamObserver<PublishResponse> responseStream = streamUpdateConsumer.getServiceResponse();
            if (streamUpdateConsumer.isCompleted()) continue;
            responseStream.onNext((Object)PublishResponse.newBuilder().setDataStatus(PublishResponse.PublishResponseStatus.NODE_SHUTDOWN).build());
            responseStream.onCompleted();
        }
        this.dataClientStreamList.clear();
    }

    protected List<SendDataClientStreamUpdateConsumer> getDataClientStreamList() {
        return this.dataClientStreamList;
    }

    protected class SendDataClientStreamUpdateConsumer
    implements StreamObserver<FlowUnitMessage> {
        private final StreamObserver<PublishResponse> serviceResponse;
        private boolean isCompleted;

        SendDataClientStreamUpdateConsumer(StreamObserver<PublishResponse> serviceResponse) {
            this.serviceResponse = serviceResponse;
        }

        public StreamObserver<PublishResponse> getServiceResponse() {
            return this.serviceResponse;
        }

        public void onNext(FlowUnitMessage flowUnitMessage) {
            ExecutorService executorService = PublishRequestHandler.this.executorReference.get();
            if (executorService != null) {
                try {
                    executorService.execute(new FlowUnitRxTask(PublishRequestHandler.this.nodeStateManager, PublishRequestHandler.this.receivedFlowUnitStore, flowUnitMessage));
                    ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat((MeasurementSet)RcaGraphMetrics.NET_BYTES_IN, flowUnitMessage.getGraphNode(), (Number)flowUnitMessage.getSerializedSize());
                }
                catch (RejectedExecutionException ree) {
                    LOG.warn("Dropped handling received flow unit because the netwwork threadpool queue is full");
                    StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
                }
            }
        }

        public void onError(Throwable throwable) {
            LOG.error("Client ran into an error while streaming flow units:", throwable);
        }

        public void onCompleted() {
            LOG.debug("Client finished streaming flow units");
            this.serviceResponse.onNext((Object)this.buildDataResponse(PublishResponse.PublishResponseStatus.SUCCESS));
            this.serviceResponse.onCompleted();
            this.isCompleted = true;
        }

        public boolean isCompleted() {
            return this.isCompleted;
        }

        private PublishResponse buildDataResponse(PublishResponse.PublishResponseStatus status) {
            return PublishResponse.newBuilder().setDataStatus(status).build();
        }
    }
}

