package com.caucho.jms.cluster;

import com.caucho.bam.Message;
import com.caucho.bam.Query;
import com.caucho.cloud.topology.CloudServer;
import com.caucho.distcache.ClusterCache;
import com.caucho.jms.FileQueue;
import com.caucho.jms.file.FileQueueStore;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.queue.AbstractQueue;
import com.caucho.jms.queue.DestinationHandle;
import com.caucho.jms.queue.MessageCallback;
import com.caucho.jms.queue.QueueEntry;
import com.caucho.jms.queue.QueueEntrySelector;
import com.caucho.util.Alarm;
import com.caucho.util.HashKey;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/caucho/jms/cluster/ClusterQueueTriadActor.class */
public class ClusterQueueTriadActor extends ClusterQueueActor {
    private static final Logger log = Logger.getLogger(ClusterQueueTriadActor.class.getName());
    private static final String UID = "message";
    private MessageSystem _messageSystem;
    private ClusterCache _remoteConsumersCache;
    private FileQueueStore _store;
    private ClusterQueueSender _messageSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/caucho/jms/cluster/ClusterQueueTriadActor$PollListener.class */
    public class PollListener implements MessageCallback {
        private long _id;
        private String _to;
        private String _from;
        private long _timeout;
        private QueueEntrySelector _selector;

        PollListener(long j, String str, String str2, long j2, QueueEntrySelector queueEntrySelector) {
            this._id = j;
            this._to = str;
            this._from = str2;
            this._timeout = j2;
            this._selector = queueEntrySelector;
        }

        public void messageReceived(String str, Object obj) {
            ClusterQueueTriadActor.this.getBroker().queryResult(this._id, this._from, this._to, new PollResult(str, (Serializable) obj));
        }
    }

    public ClusterQueueTriadActor(CloudServer cloudServer) {
        super(cloudServer);
    }

    @Override // com.caucho.jms.cluster.ClusterQueueActor
    public void start() {
        super.start();
        this._messageSystem = MessageSystem.getCurrent();
        if (this._messageSystem == null) {
            throw new IllegalStateException();
        }
        this._remoteConsumersCache = new ClusterCache();
        this._remoteConsumersCache.setName("com.caucho.jms");
        this._remoteConsumersCache.init();
    }

    @Override // com.caucho.jms.cluster.ClusterQueueActor
    public void stop() {
    }

    protected FileQueueStore getStore() {
        if (this._store == null) {
            this._store = FileQueueStore.create();
        }
        return this._store;
    }

    @Query
    public void triadSendQueue(long j, String str, String str2, TriadSendMessage triadSendMessage) {
        if (log.isLoggable(Level.FINE)) {
            log.fine(this + " message " + triadSendMessage + " to=" + str + " from=" + str2);
        }
        this._messageSystem.getFileQueue(new HashKey(triadSendMessage.getQueue())).send(triadSendMessage.getMessageId(), triadSendMessage.getPayload(), triadSendMessage.getPriority(), Alarm.getCurrentTime() + triadSendMessage.getTimeout(), (String) null);
        getBroker().queryResult(j, str2, str, "ok");
    }

    @Query
    public void pollQuery(long j, String str, String str2, PollQuery pollQuery) {
        AbstractQueue<?> fileQueue = this._messageSystem.getFileQueue(new HashKey(pollQuery.getQueue()));
        QueueEntrySelector selector = pollQuery.getSelector();
        long timeout = pollQuery.getTimeout();
        long currentTime = timeout + Alarm.getCurrentTime();
        if (timeout > 1000) {
            scheduleListener(j, str, str2, fileQueue, timeout, selector);
            return;
        }
        Serializable serializable = (Serializable) fileQueue.receive(currentTime, true, selector);
        if (serializable != null) {
            getBroker().queryResult(j, str2, str, new PollResult(null, serializable));
        } else {
            getBroker().queryResult(j, str2, str, new PollResult(null, null));
        }
    }

    private void scheduleListener(long j, String str, String str2, AbstractQueue abstractQueue, long j2, QueueEntrySelector queueEntrySelector) {
        abstractQueue.receive(Alarm.getCurrentTime() + j2, true, queueEntrySelector, new PollListener(j, str, str2, j2, queueEntrySelector));
    }

    @Query
    public void triadRequest(long j, String str, String str2, TriadReceiveQuery triadReceiveQuery) {
        MessageImpl messageImpl;
        FileQueue fileQueue = null;
        QueueEntry queueEntry = null;
        if (this._messageSystem.getFileQueue(new HashKey(triadReceiveQuery.getQueue())) != null) {
            queueEntry = fileQueue.receiveEntry(triadReceiveQuery.getTimeout(), triadReceiveQuery.isAck(), triadReceiveQuery.getQueueEntrySelector());
        }
        ReceiveQuery receiveQuery = null;
        if (queueEntry != null && (messageImpl = (MessageImpl) queueEntry.getPayload()) != null) {
            try {
                messageImpl.setJMSDestination(new DestinationHandle(str));
            } catch (Exception e) {
                log.log(Level.WARNING, "Failed to set Destination on " + messageImpl + " " + e.toString(), (Throwable) e);
            }
            receiveQuery = new ReceiveQuery(queueEntry.getMsgId(), queueEntry.getLeaseExpires(), queueEntry.getPriority(), queueEntry.getExpiresTime(), messageImpl);
        }
        if (receiveQuery != null) {
            getBroker().queryResult(j, str2, str, receiveQuery);
        } else {
            getBroker().queryResult(j, str2, str, new ReceiveQuery());
        }
    }

    @Query
    public void processSubscribeTopic(long j, String str, String str2, SubscribeTopic subscribeTopic) {
        HashKey create = HashKey.create(subscribeTopic.getTopicId());
        List<RemoteConsumer> remoteConsumers = getRemoteConsumers(create);
        if (remoteConsumers == null) {
            remoteConsumers = new ArrayList();
        }
        RemoteConsumer remoteConsumer = new RemoteConsumer(str2, subscribeTopic.isNoLocal());
        if (!remoteConsumers.contains(remoteConsumer)) {
            remoteConsumers.add(remoteConsumer);
            addRemoteConsumers(create, remoteConsumers);
        }
        getBroker().queryResult(j, str2, str, "ok");
    }

    @Message
    public void processTriadTopicMessage(String str, String str2, TriadTopicMessage triadTopicMessage) {
        HashKey create = HashKey.create(triadTopicMessage.getQueue());
        List<RemoteConsumer> remoteConsumers = getRemoteConsumers(create);
        if (remoteConsumers == null) {
            return;
        }
        String messageId = triadTopicMessage.getMessageId();
        Serializable payload = triadTopicMessage.getPayload();
        int priority = triadTopicMessage.getPriority();
        long timeout = triadTopicMessage.getTimeout();
        String publisherId = triadTopicMessage.getPublisherId();
        TopicMessage topicMessage = new TopicMessage(messageId, triadTopicMessage.getQueue(), payload, priority, timeout, publisherId);
        for (RemoteConsumer remoteConsumer : remoteConsumers) {
            if (remoteConsumer.getId().equals(getAddress())) {
                this._messageSystem.onTopicMessage(create, messageId, payload, priority, timeout, publisherId);
            } else {
                getBroker().message(remoteConsumer.getId(), getAddress(), topicMessage);
            }
        }
    }

    private List<RemoteConsumer> getRemoteConsumers(HashKey hashKey) {
        return (List) this._remoteConsumersCache.get(hashKey);
    }

    private void addRemoteConsumers(HashKey hashKey, List<RemoteConsumer> list) {
        this._remoteConsumersCache.put(hashKey, list);
    }
}
