package com.caucho.distcache.cluster;

import com.caucho.bam.BamError;
import com.caucho.bam.Message;
import com.caucho.bam.MessageError;
import com.caucho.bam.Query;
import com.caucho.bam.query.QueryCallback;
import com.caucho.cloud.bam.AbstractCloudActor;
import com.caucho.cloud.network.ClusterServer;
import com.caucho.cloud.topology.CloudCluster;
import com.caucho.cloud.topology.CloudServer;
import com.caucho.env.distcache.CacheDataBacking;
import com.caucho.server.distcache.CacheData;
import com.caucho.server.distcache.CacheMnodeListener;
import com.caucho.server.distcache.MnodeValue;
import com.caucho.util.Alarm;
import com.caucho.util.AlarmListener;
import com.caucho.util.HashKey;
import com.caucho.util.L10N;
import com.caucho.util.WeakAlarm;
import com.caucho.vfs.StreamSource;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/caucho/distcache/cluster/CacheReplicationActor.class */
public class CacheReplicationActor extends AbstractCloudActor {
    private static final L10N L = new L10N(CacheReplicationActor.class);
    private static final Logger log = Logger.getLogger(CacheReplicationActor.class.getName());
    private static final String ACTOR_NAME = "cluster-cache-replication";
    private static final long ALARM_TIMEOUT = 120000;
    private static final long ALARM_SHORT_TIMEOUT = 5000;
    private CloudServer _self;
    private ConcurrentMap<String, Set<HashKey>> _sourceMap;
    private ClusterCacheManagerImpl _cacheManager;
    private ActorAlarm _actorAlarm;
    private Alarm _alarm;
    private boolean _isReplicationAck;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/caucho/distcache/cluster/CacheReplicationActor$ActorAlarm.class */
    public class ActorAlarm implements AlarmListener {
        ActorAlarm() {
        }

        public void handleAlarm(Alarm alarm) {
            if (CacheReplicationActor.this._cacheManager.isClosed()) {
                return;
            }
            try {
                CacheReplicationActor.this.sendStartRequests();
                if (CacheReplicationActor.this._cacheManager.isClosed()) {
                    return;
                }
                if (CacheReplicationActor.this._isReplicationAck) {
                    alarm.queue(CacheReplicationActor.ALARM_TIMEOUT);
                } else {
                    alarm.queue(CacheReplicationActor.ALARM_SHORT_TIMEOUT);
                }
            } catch (Throwable th) {
                if (!CacheReplicationActor.this._cacheManager.isClosed()) {
                    if (CacheReplicationActor.this._isReplicationAck) {
                        alarm.queue(CacheReplicationActor.ALARM_TIMEOUT);
                    } else {
                        alarm.queue(CacheReplicationActor.ALARM_SHORT_TIMEOUT);
                    }
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/caucho/distcache/cluster/CacheReplicationActor$CacheDataCallback.class */
    class CacheDataCallback implements QueryCallback {
        private CachePut _put;

        CacheDataCallback(CachePut cachePut) {
            this._put = cachePut;
        }

        public void onQueryResult(String str, String str2, Serializable serializable) {
            DataPut dataPut = (DataPut) serializable;
            CacheReplicationActor.this._cacheManager.putData(HashKey.create(dataPut.getValue()), dataPut.getStreamSource());
            CacheReplicationActor.this._cacheManager.putReplication(this._put);
        }

        public void onQueryError(String str, String str2, Serializable serializable, BamError bamError) {
            CacheReplicationActor.this._cacheManager.putReplication(this._put);
        }
    }

    /* loaded from: input_file:com/caucho/distcache/cluster/CacheReplicationActor$ReplicationCacheListener.class */
    class ReplicationCacheListener implements CacheMnodeListener {
        private CloudCluster _cluster;
        private String _to;

        ReplicationCacheListener(CloudCluster cloudCluster) {
            this._cluster = cloudCluster;
            if (cloudCluster == null) {
                throw new NullPointerException();
            }
            this._to = "cluster-cache-replication@" + ((ClusterServer) this._cluster.getPodList()[0].getServer(0).getData(ClusterServer.class)).getBamAdminName();
        }

        public void onPut(HashKey hashKey, MnodeValue mnodeValue) {
            CacheReplicationActor.this.getBroker().message(this._to, CacheReplicationActor.this.getAddress(), new CachePut(hashKey.getHash(), mnodeValue.getValueHash(), mnodeValue.getCacheHash(), mnodeValue.getFlags(), mnodeValue.getVersion(), mnodeValue.getExpireTimeout(), mnodeValue.getIdleTimeout(), mnodeValue.getLeaseTimeout(), mnodeValue.getLocalReadTimeout(), 0, new StreamSource(CacheReplicationActor.this._cacheManager.createDataSource(mnodeValue.getValueHashKey())), false));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CacheReplicationActor(CloudServer cloudServer, ClusterCacheManagerImpl clusterCacheManagerImpl, CacheMnodeManager cacheMnodeManager) {
        super(ACTOR_NAME, cloudServer.getPod());
        this._sourceMap = new ConcurrentHashMap();
        this._self = cloudServer;
        this._cacheManager = clusterCacheManagerImpl;
        if (clusterCacheManagerImpl == null) {
            throw new NullPointerException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this._actorAlarm = new ActorAlarm();
        this._alarm = new WeakAlarm(this._actorAlarm);
        this._alarm.queue(ALARM_SHORT_TIMEOUT);
        requestStartupUpdates();
    }

    private CacheDataBacking getDataBacking() {
        return this._cacheManager.getDataBacking();
    }

    void stop() {
        this._alarm.dequeue();
    }

    public void startReplication(HashKey hashKey, String str) {
        if (this._self.getSystem().findCluster(str) == null) {
            throw new IllegalStateException(L.l("{0}: '{1}' is an unknown cluster", this, str));
        }
        this._self.getCluster().getId();
        Set<HashKey> set = this._sourceMap.get(str);
        if (set == null) {
            this._sourceMap.putIfAbsent(str, Collections.synchronizedSet(new HashSet()));
            set = this._sourceMap.get(str);
        }
        if (set.contains(hashKey)) {
            return;
        }
        set.add(hashKey);
        sendStartMessage(hashKey, str);
        requestStartupUpdates(hashKey, str);
    }

    public void sendStartMessage(HashKey hashKey, String str) {
        CloudCluster findCluster = this._self.getSystem().findCluster(str);
        if (findCluster == null) {
            throw new IllegalStateException(L.l("{0}: '{1}' is an unknown cluster", this, str));
        }
        getSender().message("cluster-cache-replication@" + ((ClusterServer) findCluster.getPodList()[0].getServerList()[0].getData(ClusterServer.class)).getBamAdminName(), new ReplicationStart(this._self.getCluster().getId(), hashKey.getHash()));
    }

    private String getTargetAddress(String str) {
        CloudCluster findCluster = this._self.getSystem().findCluster(str);
        if (findCluster == null) {
            throw new IllegalStateException(L.l("{0}: '{1}' is an unknown cluster", this, str));
        }
        return "cluster-cache-replication@" + ((ClusterServer) findCluster.getPodList()[0].getServerList()[0].getData(ClusterServer.class)).getBamAdminName();
    }

    private void requestStartupUpdates() {
        if (this._sourceMap.size() == 0) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this._sourceMap.keySet());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            Iterator<HashKey> it2 = this._sourceMap.get(str).iterator();
            while (it2.hasNext()) {
                requestStartupUpdates(it2.next(), str);
            }
        }
    }

    private void requestStartupUpdates(HashKey hashKey, String str) {
        ReplicationStartUpdates replicationStartUpdates = new ReplicationStartUpdates(getAddress(), hashKey.getHash(), -((Alarm.getCurrentTime() - getDataBacking().getStartupLastUpdateTime(hashKey)) + ALARM_TIMEOUT));
        getSender().message(getTargetAddress(str), replicationStartUpdates);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendStartRequests() {
        if (this._sourceMap.size() == 0) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this._sourceMap.keySet());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            Iterator<HashKey> it2 = this._sourceMap.get(str).iterator();
            while (it2.hasNext()) {
                sendStartMessage(it2.next(), str);
            }
        }
    }

    @Message
    public void replicationStart(String str, String str2, ReplicationStart replicationStart) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " replicationStart " + replicationStart);
        }
        this._cacheManager.addCacheListener(new HashKey(replicationStart.getCacheKey()), new ReplicationCacheListener(this._self.getSystem().findCluster(replicationStart.getCluster())));
    }

    @Message
    public void replicationAck(String str, String str2, ReplicationAck replicationAck) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " message " + replicationAck);
        }
        this._isReplicationAck = true;
        if (this._cacheManager.isClosed()) {
            return;
        }
        this._alarm.queue(ALARM_TIMEOUT);
    }

    @MessageError
    public void replicationStartError(String str, String str2, ReplicationStart replicationStart, BamError bamError) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " replicationStartError " + replicationStart + "\n  " + bamError);
        }
        this._isReplicationAck = false;
        if (this._cacheManager.isClosed()) {
            return;
        }
        this._alarm.queue(ALARM_SHORT_TIMEOUT);
    }

    @Message
    public void replicationUpdate(String str, String str2, ReplicationStartUpdates replicationStartUpdates) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " message " + replicationStartUpdates);
        }
        HashKey create = HashKey.create(replicationStartUpdates.getCacheKey());
        String cluster = replicationStartUpdates.getCluster();
        long currentTime = Alarm.getCurrentTime() + replicationStartUpdates.getUpdateDelta();
        int i = 0;
        while (true) {
            ArrayList updates = getDataBacking().getUpdates(create, currentTime, i);
            if (updates == null) {
                return;
            }
            i += updates.size();
            Iterator it = updates.iterator();
            while (it.hasNext()) {
                sendStartupPut(cluster, (CacheData) it.next());
            }
        }
    }

    private void sendStartupPut(String str, CacheData cacheData) {
        byte[] hash = cacheData.getKey().getHash();
        byte[] hash2 = cacheData.getValue() != null ? cacheData.getValue().getHash() : null;
        getBroker().message(str, getAddress(), new StartupCachePut(hash, hash2, cacheData.getCacheKey() != null ? cacheData.getCacheKey().getHash() : null, cacheData.getFlags(), cacheData.getVersion(), cacheData.getExpireTimeout(), cacheData.getIdleTimeout(), cacheData.getLeaseTimeout(), cacheData.getLocalReadTimeout(), -1, cacheData.getAccessTime() - Alarm.getCurrentTime()));
    }

    @Message
    public void cachePut(String str, String str2, CachePut cachePut) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " message " + cachePut);
        }
        this._cacheManager.putReplication(cachePut);
    }

    @Message
    public void startupCachePut(String str, String str2, StartupCachePut startupCachePut) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " message " + startupCachePut);
        }
        HashKey create = HashKey.create(startupCachePut.getValueHash());
        if (this._cacheManager.isDataAvailable(create) || startupCachePut.getStreamSource() != null) {
            this._cacheManager.putReplication(startupCachePut);
        } else {
            getSender().query(str2, new DataGet(getAddress(), create.getHash()), new CacheDataCallback(startupCachePut));
        }
    }

    @Query
    public DataPut dataGet(DataGet dataGet) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(dbgId() + " query " + dataGet);
        }
        return new DataPut(dataGet.getValue(), new StreamSource(this._cacheManager.createDataSource(HashKey.create(dataGet.getValue()))));
    }
}
