package com.caucho.cloud.bam;

import com.caucho.bam.BamError;
import com.caucho.bam.BamException;
import com.caucho.bam.broker.Broker;
import com.caucho.bam.mailbox.MultiworkerMailbox;
import com.caucho.bam.packet.Packet;
import com.caucho.bam.stream.MessageStream;
import com.caucho.bam.stream.NullMessageStream;
import com.caucho.cloud.network.ClusterServer;
import com.caucho.cloud.security.SecurityService;
import com.caucho.hmtp.HmtpWebSocketWriter;
import com.caucho.network.balance.ClientSocket;
import com.caucho.network.balance.ClientSocketFactory;
import com.caucho.util.Alarm;
import com.caucho.util.L10N;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/caucho/cloud/bam/HmuxQueue.class */
public class HmuxQueue extends MultiworkerMailbox {
    private static final Logger log = Logger.getLogger(HmuxQueue.class.getName());
    private static final L10N L = new L10N(HmuxQueue.class);
    private Broker _broker;
    private SecurityService _security;
    private ClusterServer _clusterServer;
    private ClientSocketFactory _pool;
    private HmtpPool _hmtpPool;
    private final AtomicInteger _writeCount;
    private MessageStream _connErrorStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HmuxQueue(String str, Broker broker, ClusterServer clusterServer) {
        super(str, new NullMessageStream(str, broker), broker, Alarm.isTest() ? 1 : 5);
        this._writeCount = new AtomicInteger();
        this._broker = broker;
        this._security = SecurityService.getCurrent();
        this._clusterServer = clusterServer;
        this._pool = this._clusterServer.getClusterSocketPool();
        this._connErrorStream = new ConnectErrorBamStream(broker.getAddress(), broker, this._pool.getDebugId());
        this._hmtpPool = HmtpPool.create(this._clusterServer, getThreadMax());
    }

    public void dispatch(Packet packet) {
        long currentTime;
        Packet dequeue;
        HmtpStream hmtpStream = null;
        try {
            try {
                try {
                    try {
                        HmtpStream openStream = openStream();
                        if (openStream == null) {
                            packet.dispatch(this._connErrorStream, getBroker());
                            if (1 == 0) {
                                packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
                            }
                            if (openStream != null) {
                                openStream.close();
                                return;
                            }
                            return;
                        }
                        HmtpWebSocketWriter hmtpWriter = openStream.getHmtpWriter();
                        do {
                            this._writeCount.incrementAndGet();
                            try {
                                currentTime = Alarm.getCurrentTime();
                                packet.dispatch(hmtpWriter, getBroker());
                                this._writeCount.decrementAndGet();
                                dequeue = dequeue();
                                packet = dequeue;
                            } finally {
                            }
                        } while (dequeue != null);
                        this._writeCount.incrementAndGet();
                        try {
                            hmtpWriter.flush();
                            this._writeCount.decrementAndGet();
                            this._hmtpPool.free(openStream, currentTime);
                            HmtpStream hmtpStream2 = null;
                            if (1 == 0) {
                                packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
                            }
                            if (0 != 0) {
                                hmtpStream2.close();
                            }
                        } finally {
                        }
                    } catch (IOException e) {
                        log.log(Level.INFO, e.toString(), (Throwable) e);
                        packet.dispatchError(this._connErrorStream, getBroker(), BamError.create(e));
                        if (1 == 0) {
                            packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
                        }
                        if (0 != 0) {
                            hmtpStream.close();
                        }
                    }
                } catch (Exception e2) {
                    log.log(Level.WARNING, e2.toString(), (Throwable) e2);
                    packet.dispatchError(this._connErrorStream, getBroker(), BamError.create(e2));
                    if (1 == 0) {
                        packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
                    }
                    if (0 != 0) {
                        hmtpStream.close();
                    }
                }
            } catch (BamException e3) {
                log.log(Level.FINER, e3.toString(), e3);
                packet.dispatchError(this._connErrorStream, getBroker(), e3.createActorError());
                if (1 == 0) {
                    packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
                }
                if (0 != 0) {
                    hmtpStream.close();
                }
            }
        } catch (Throwable th) {
            if (0 == 0) {
                packet.dispatchError(this._connErrorStream, getBroker(), new BamError(L.l("{0}: unknown error sending hmux message", this)));
            }
            if (0 != 0) {
                hmtpStream.close();
            }
            throw th;
        }
    }

    private HmtpStream openStream() throws IOException {
        HmtpStream allocate = this._hmtpPool.allocate();
        if (allocate == null) {
            ClientSocket openIfHeartbeatActive = this._pool.openIfHeartbeatActive();
            if (openIfHeartbeatActive == null) {
                if (!log.isLoggable(Level.FINE)) {
                    return null;
                }
                log.fine(this + " cannot connect to " + this._pool + "\n  heartbeat: " + this._pool.isHeartbeatActive());
                return null;
            }
            this._pool.success();
            allocate = new HmtpStream(openIfHeartbeatActive, this._broker, this._security);
        }
        return allocate;
    }

    public void close() {
        super.close();
        this._hmtpPool.close();
    }
}
