package org.codehaus.wadi.group.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.group.Address;
import org.codehaus.wadi.group.Dispatcher;
import org.codehaus.wadi.group.Envelope;
import org.codehaus.wadi.group.MessageExchangeException;
import org.codehaus.wadi.group.Peer;
import org.codehaus.wadi.group.Quipu;
import org.codehaus.wadi.group.ServiceEndpoint;

/* loaded from: input_file:org/codehaus/wadi/group/impl/AbstractDispatcher.class */
public abstract class AbstractDispatcher implements Dispatcher {
    protected final ThreadPool _executor;
    protected final Log _messageLog;
    protected Log _log;
    protected final Map _rvMap;
    private final MessageDispatcherManager inboundMessageDispatcher;
    protected final SimpleCorrelationIDFactory _factory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/codehaus/wadi/group/impl/AbstractDispatcher$SimpleCorrelationIDFactory.class */
    public class SimpleCorrelationIDFactory {
        protected final SynchronizedInt _count = new SynchronizedInt(0);
        private final AbstractDispatcher this$0;

        SimpleCorrelationIDFactory(AbstractDispatcher abstractDispatcher) {
            this.this$0 = abstractDispatcher;
        }

        public String create() {
            return Integer.toString(this._count.increment());
        }
    }

    public AbstractDispatcher(ThreadPool threadPool) {
        this._messageLog = LogFactory.getLog("org.codehaus.wadi.MESSAGES");
        this._log = LogFactory.getLog(getClass());
        this._rvMap = new ConcurrentHashMap();
        this._factory = new SimpleCorrelationIDFactory(this);
        this._executor = threadPool;
        this.inboundMessageDispatcher = new BasicMessageDispatcherManager(this, this._executor);
    }

    public AbstractDispatcher(long j) {
        this(new PooledExecutorAdapter(10));
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void register(ServiceEndpoint serviceEndpoint) {
        this.inboundMessageDispatcher.register(serviceEndpoint);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void unregister(ServiceEndpoint serviceEndpoint, int i, long j) {
        this.inboundMessageDispatcher.unregister(serviceEndpoint, i, j);
    }

    @Override // org.codehaus.wadi.group.MessageListener
    public void onMessage(Envelope envelope) {
        if (this._messageLog.isTraceEnabled()) {
            this._messageLog.trace(new StringBuffer().append("incoming: ").append(envelope.getPayload()).append(" {").append(envelope.getReplyTo()).append("->").append(envelope.getAddress()).append("} - ").append(envelope.getTargetCorrelationId()).append("/").append(envelope.getSourceCorrelationId()).append(" on ").append(Thread.currentThread().getName()).toString());
        }
        this.inboundMessageDispatcher.onMessage(envelope);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void addRendezVousEnvelope(Envelope envelope) {
        String targetCorrelationId = envelope.getTargetCorrelationId();
        if (null == targetCorrelationId) {
            throw new IllegalStateException("No targetCorrelationId");
        }
        Quipu quipu = (Quipu) this._rvMap.get(targetCorrelationId);
        if (null == quipu) {
            if (this._log.isTraceEnabled()) {
                this._log.trace(new StringBuffer().append("no one waiting for [").append(targetCorrelationId).append("]").toString());
            }
        } else {
            if (this._log.isTraceEnabled()) {
                this._log.trace(new StringBuffer().append("successful correlation [").append(targetCorrelationId).append("]").toString());
            }
            quipu.putResult(envelope);
        }
    }

    public Quipu setRendezVous(String str, int i) {
        Quipu quipu = new Quipu(i, str);
        this._rvMap.put(str, quipu);
        return quipu;
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Quipu newRendezVous(int i) {
        return setRendezVous(this._factory.create(), i);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope attemptRendezVous(Quipu quipu, long j) throws MessageExchangeException {
        Collection attemptMultiRendezVous = attemptMultiRendezVous(quipu, j);
        if (attemptMultiRendezVous.size() > 1) {
            throw new MessageExchangeException(new StringBuffer().append("[").append(attemptMultiRendezVous.size()).append("] result messages. Expected only one.").toString());
        }
        return (Envelope) attemptMultiRendezVous.iterator().next();
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Collection attemptMultiRendezVous(Quipu quipu, long j) throws MessageExchangeException {
        Collection collection = null;
        do {
            try {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (quipu.waitFor(j)) {
                        collection = quipu.getResults();
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (this._log.isTraceEnabled()) {
                            this._log.trace(new StringBuffer().append("successful message exchange within timeframe (").append(currentTimeMillis2).append("<").append(j).append(" millis) {").append(quipu).append("}").toString());
                        }
                    } else {
                        this._log.debug(new StringBuffer().append("unsuccessful message exchange within timeframe (").append(j).append(" millis) {").append(quipu).append("}").toString(), new Exception());
                    }
                } catch (TimeoutException e) {
                    this._log.debug(new StringBuffer().append("no response to request within timeout (").append(j).append(" millis)").toString());
                } catch (InterruptedException e2) {
                    this._log.debug("waiting for response - interruption ignored");
                }
            } finally {
                this._rvMap.remove(quipu.getCorrelationId());
            }
        } while (Thread.interrupted());
        if (null == collection) {
            throw new MessageExchangeException(new StringBuffer().append("No correlated messages received within [").append(j).append("]ms").toString());
        }
        return collection;
    }

    public Envelope exchangeSend(Address address, Object obj, long j) throws MessageExchangeException {
        return exchangeSend(address, (Serializable) obj, j);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSend(Address address, Serializable serializable, long j) throws MessageExchangeException {
        return exchangeSend(address, serializable, j, (String) null);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSendLink(Peer peer, Serializable serializable, long j, int i) throws MessageExchangeException {
        String create = this._factory.create();
        for (int i2 = 0; i2 < i; i2++) {
            Envelope exchangeSend = exchangeSend(peer.getAddress(), create, serializable, j);
            if (exchangeSend != null) {
                return exchangeSend;
            }
        }
        throw new MessageExchangeException("no reply to repeated message");
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void reply(Envelope envelope, Serializable serializable) throws MessageExchangeException {
        Envelope createMessage = createMessage();
        createMessage.setPayload(serializable);
        reply(envelope, createMessage);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void reply(Envelope envelope, Envelope envelope2) throws MessageExchangeException {
        envelope2.setReplyTo(getCluster().getLocalPeer().getAddress());
        Address replyTo = envelope.getReplyTo();
        envelope2.setAddress(replyTo);
        envelope2.setTargetCorrelationId(envelope.getSourceCorrelationId());
        if (this._log.isTraceEnabled()) {
            this._log.trace(new StringBuffer().append("reply [").append(envelope2).append("]").toString());
        }
        send(replyTo, envelope2);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void send(Address address, Serializable serializable) throws MessageExchangeException {
        try {
            Envelope createMessage = createMessage();
            createMessage.setReplyTo(getCluster().getLocalPeer().getAddress());
            createMessage.setAddress(address);
            createMessage.setPayload(serializable);
            send(address, createMessage);
        } catch (Exception e) {
            this._log.error(new StringBuffer().append("problem sending ").append(serializable).toString(), e);
        }
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void send(Address address, String str, Serializable serializable) throws MessageExchangeException {
        try {
            Envelope createMessage = createMessage();
            createMessage.setReplyTo(getCluster().getLocalPeer().getAddress());
            createMessage.setAddress(address);
            createMessage.setPayload(serializable);
            createMessage.setSourceCorrelationId(str);
            send(address, createMessage);
        } catch (Exception e) {
            this._log.error(new StringBuffer().append("problem sending ").append(serializable).toString(), e);
        }
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void send(Address address, Address address2, String str, Serializable serializable) throws MessageExchangeException {
        Envelope createMessage = createMessage();
        createMessage.setReplyTo(address);
        createMessage.setAddress(address2);
        createMessage.setSourceCorrelationId(str);
        createMessage.setPayload(serializable);
        if (this._log.isTraceEnabled()) {
            this._log.trace(new StringBuffer().append("send {").append(str).append("}: ").append(getPeerName(address)).append(" -> ").append(getPeerName(address2)).append(" : ").append(serializable).toString());
        }
        send(address2, createMessage);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSend(Address address, Serializable serializable, long j, String str) throws MessageExchangeException {
        Envelope createMessage = createMessage();
        createMessage.setPayload(serializable);
        return exchangeSend(address, createMessage, j, str);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSend(Address address, Envelope envelope, long j) throws MessageExchangeException {
        return exchangeSend(address, envelope, j, (String) null);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSend(Address address, Envelope envelope, long j, String str) throws MessageExchangeException {
        envelope.setReplyTo(getCluster().getLocalPeer().getAddress());
        envelope.setAddress(address);
        Quipu newRendezVous = newRendezVous(1);
        envelope.setSourceCorrelationId(newRendezVous.getCorrelationId());
        if (str != null) {
            envelope.setTargetCorrelationId(str);
        }
        if (this._log.isTraceEnabled()) {
            this._log.trace(new StringBuffer().append("exchangeSend [").append(envelope).append("]").toString());
        }
        send(address, envelope);
        return attemptRendezVous(newRendezVous, j);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public Envelope exchangeSend(Address address, String str, Serializable serializable, long j) {
        Quipu rendezVous = setRendezVous(str, 1);
        try {
            send(getCluster().getLocalPeer().getAddress(), address, str, serializable);
            return attemptRendezVous(rendezVous, j);
        } catch (MessageExchangeException e) {
            return null;
        }
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void reply(Address address, Address address2, String str, Serializable serializable) throws MessageExchangeException {
        Envelope createMessage = createMessage();
        createMessage.setReplyTo(address);
        createMessage.setAddress(address2);
        createMessage.setTargetCorrelationId(str);
        createMessage.setPayload(serializable);
        if (this._log.isTraceEnabled()) {
            this._log.trace(new StringBuffer().append("reply: ").append(getPeerName(address)).append(" -> ").append(getPeerName(address2)).append(" {").append(str).append("} : ").append(serializable).toString());
        }
        send(address2, createMessage);
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void forward(Envelope envelope, Address address) throws MessageExchangeException {
        forward(envelope, address, envelope.getPayload());
    }

    @Override // org.codehaus.wadi.group.Dispatcher
    public void forward(Envelope envelope, Address address, Serializable serializable) throws MessageExchangeException {
        send(envelope.getReplyTo(), address, envelope.getSourceCorrelationId(), serializable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hook() {
        AbstractCluster._cluster.set(getCluster());
    }

    public ThreadPool getExecutor() {
        return this._executor;
    }
}
