/*
 * Decompiled with CFR 0.152.
 */
package org.rzo.netty.ahessian.io;

import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.rzo.netty.ahessian.Constants;
import org.rzo.netty.ahessian.io.InputStreamBuffer;
import org.rzo.netty.ahessian.io.InputStreamConsumer;
import org.rzo.netty.ahessian.stopable.StopableHandler;

public class PullInputStreamConsumer
extends SimpleChannelUpstreamHandler
implements StopableHandler {
    final InputStreamConsumer _consumer;
    final Executor _executor;
    final Lock _lock = new ReentrantLock();
    final Condition _hasData = this._lock.newCondition();
    volatile boolean _stop = false;
    volatile ChannelHandlerContext _ctx;
    volatile InputStream _inputStream;
    volatile boolean _waiting = false;
    static AtomicInteger _threadCounter = new AtomicInteger(0);
    private boolean _stopEnabled = true;

    public PullInputStreamConsumer(InputStreamConsumer consumer, Executor executor) {
        this._consumer = consumer;
        this._executor = executor;
        this._executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                String tName = Thread.currentThread().getName();
                Thread.currentThread().setName("ahessian-PullInputStreamConsumer-#" + _threadCounter.incrementAndGet());
                try {
                    PullInputStreamConsumer.this.waitForData();
                    while (!PullInputStreamConsumer.this._stop) {
                        PullInputStreamConsumer.this._consumer.consume(PullInputStreamConsumer.this._ctx, PullInputStreamConsumer.this._inputStream);
                        PullInputStreamConsumer.this.waitForData();
                    }
                }
                finally {
                    Thread.currentThread().setName(tName);
                    _threadCounter.decrementAndGet();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForData() {
        while (!(this._stop || this._consumer != null && !this._consumer.isBufferEmpty() && this._ctx != null && this._ctx.getChannel().isConnected())) {
            this._lock.lock();
            try {
                this._waiting = true;
                this._hasData.await(500L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Constants.ahessianLogger.warn("", (Throwable)e);
            }
            finally {
                this._waiting = false;
                this._lock.unlock();
            }
        }
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evnt) throws Exception {
        if (this._ctx != ctx) {
            this._ctx = ctx;
        }
        if (this._inputStream != evnt.getMessage()) {
            this._inputStream = (InputStream)evnt.getMessage();
            ((InputStreamBuffer)this._inputStream).setReadTimeout(-1L);
        }
        if (this._waiting) {
            this._lock.lock();
            this._hasData.signal();
            this._lock.unlock();
        }
    }

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this._lock.lock();
        this._consumer.setContext(ctx);
        this._ctx = ctx;
        this._lock.unlock();
        ctx.sendUpstream((ChannelEvent)e);
    }

    public boolean isStopEnabled() {
        return this._stopEnabled;
    }

    public void setStopEnabled(boolean stopEnabled) {
        this._stopEnabled = stopEnabled;
    }

    public void stop() {
        this._stop = true;
    }
}

