package com.caucho.jms.cluster;

import com.caucho.cloud.topology.TriadOwner;
import com.caucho.config.ConfigException;
import com.caucho.jms.FileTopic;
import com.caucho.jms.queue.AbstractQueue;
import com.caucho.jms.queue.AbstractTopic;
import com.caucho.server.cluster.ProServer;
import com.caucho.server.cluster.Server;
import com.caucho.server.distcache.HashManager;
import com.caucho.util.Alarm;
import com.caucho.util.HashKey;
import com.caucho.util.L10N;
import com.caucho.vfs.Path;
import java.io.Serializable;
import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/* loaded from: input_file:com/caucho/jms/cluster/ClusterTopicImpl.class */
public class ClusterTopicImpl<E extends Serializable> extends AbstractTopic<E> implements ServerDestination<E> {
    private static final L10N L = new L10N(ClusterTopicImpl.class);
    private static final Logger log = Logger.getLogger(ClusterTopicImpl.class.getName());
    private MessageSystem _messageSystem;
    private FileTopic<E> _fileTopic;
    private long _subscriberTimeout;
    private HashMap<String, ClusterTopicImpl<E>.Subscriber<E>> _subscriberMap;
    private TriadOwner _topicOwner;
    private ClusterQueueActor _messageActor;
    private ClusterQueueSender _messageSender;
    private HashKey _topicHash;
    private boolean _isSubscribed;
    private boolean _noLocal;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/caucho/jms/cluster/ClusterTopicImpl$Subscriber.class */
    public class Subscriber<E1> {
        private final String _name;
        private final AbstractQueue<E1> _queue;
        private long _expires;

        Subscriber(String str, AbstractQueue<E1> abstractQueue) {
            this._name = str;
            this._queue = abstractQueue;
            this._expires = Alarm.getCurrentTime() + ClusterTopicImpl.this._subscriberTimeout;
        }

        boolean isExpired() {
            return this._expires < Alarm.getCurrentTime();
        }

        void access() {
            this._expires = Alarm.getCurrentTime() + ClusterTopicImpl.this._subscriberTimeout;
        }

        AbstractQueue<E1> getQueue() {
            return this._queue;
        }

        public String toString() {
            return getClass().getSimpleName() + "[" + this._name + "]";
        }
    }

    public ClusterTopicImpl() {
        this._subscriberTimeout = 900000L;
        this._subscriberMap = new HashMap<>();
        this._messageSystem = MessageSystem.getCurrent();
        if (this._messageSystem == null) {
            throw new IllegalStateException(L.l("{0} requires an active {1}", getClass().getSimpleName(), MessageSystem.class.getSimpleName()));
        }
        this._fileTopic = new FileTopic<>();
    }

    public ClusterTopicImpl(String str) {
        this();
        setName(str);
    }

    public void setPath(Path path) {
        this._fileTopic.setPath(path);
    }

    public String getUrl() {
        return "cluster:name=" + getName();
    }

    public void setCluster(String str) {
    }

    @PostConstruct
    protected void init() {
        super.init();
        if (getName() == null || "".equals(getName())) {
            throw new ConfigException(L.l("ServerTopic requires a non-null name."));
        }
        this._fileTopic.setName(getName());
        this._fileTopic.init();
        ProServer proServer = (ProServer) Server.getCurrent();
        if (proServer == null) {
            throw new ConfigException(L.l("Cluster Topic must be in a Resin Server context"));
        }
        this._topicHash = new HashManager().generateHash(getName());
        this._messageActor = this._messageSystem.getActor();
        proServer.getPod();
        this._topicOwner = TriadOwner.getHashOwner(this._topicHash.getHash());
        this._messageSender = this._messageActor.createMessageSender(proServer.getPod(), this._topicHash.getHash());
        subscribeTopic();
        this._messageSystem.subscribeTopic(this._topicHash, this);
    }

    @PreDestroy
    protected void destroy() {
        this._messageSystem.unsubscribeTopic(this._topicHash);
    }

    public void subscribeTopic() {
        if (log.isLoggable(Level.FINE)) {
            log.fine(Server.getCurrent() + " is subscribing to " + getName() + "ClusterTopic");
        }
        this._isSubscribed = true;
        this._messageSender.sendSubscribeTopicMessage(new SubscribeTopic(this._topicHash.getHash()), this._topicOwner);
    }

    public void send(String str, E e, int i, long j, String str2) {
        this._messageSender.sendTriadTopicMessage(new TriadTopicMessage(str, this._topicHash.getHash(), e, i, j - Alarm.getCurrentTime(), str2));
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public void rollback(String str) {
        this._fileTopic.rollback(str);
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public void acknowledge(String str) {
        this._fileTopic.acknowledge(str);
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public void sendLocal(String str, E e, int i, long j, String str2) {
        this._fileTopic.send(str, e, i, j, str2);
        notifyMessageAvailable();
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public void sendLocalAsBackup(String str, Serializable serializable, long j) {
        throw new UnsupportedOperationException(getClass().getSimpleName());
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public E receiveLocal(String str, boolean z) {
        return (E) getSubscriber(str).getQueue().receive(0L, true);
    }

    @Override // com.caucho.jms.cluster.ServerDestination
    public boolean hasMessage(String str) {
        return getSubscriber(str).getQueue().hasMessage();
    }

    private ClusterTopicImpl<E>.Subscriber<E> getSubscriber(String str) {
        ClusterTopicImpl<E>.Subscriber<E> subscriber;
        synchronized (this._subscriberMap) {
            ClusterTopicImpl<E>.Subscriber<E> subscriber2 = this._subscriberMap.get(str);
            if (subscriber2 == null) {
                subscriber2 = new Subscriber<>(str, createSubscriber(null, str, false));
                this._subscriberMap.put(str, subscriber2);
            } else {
                subscriber2.access();
            }
            subscriber = subscriber2;
        }
        return subscriber;
    }

    public void removeCluster(String str) {
    }

    public void notifyMessageAvailable() {
    }

    public AbstractQueue<E> createSubscriber(String str, String str2, boolean z) {
        return this._fileTopic.createSubscriber(str, str2, z);
    }

    public void closeSubscriber(AbstractQueue<E> abstractQueue) {
        this._fileTopic.closeSubscriber(abstractQueue);
    }

    @PreDestroy
    public void close() {
    }
}
