import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class UdpModeledConnection {
    private static final double DROP_CERTAIN_MILLIS = 5;
    private static final double DROP_MIN_PROB = 0.1;
    private static final double DROP_LOG_KEEP_MILLIS = 200;
    private static final int DELAY_MEAN_MILLIS = 50;
    private static final int DELAY_DEVIATION_MILLIS = 10;

    private class ReceivePacket implements Runnable {
        private DatagramPacket packet;
        
        ReceivePacket(DatagramPacket pack) {
            packet = pack;
        }
        
        public void run() {
            try {
                receiveQueue.put(this);
            } catch (InterruptedException e) {
                System.err.println("*** unexpected interruption ***");
            }
        }
    }

    private class ReceiveError implements Runnable {
        private IOException e;
        
        ReceiveError(IOException e) {
            this.e = e;
        }
        
        public void run() {
            try {
                receiveQueue.put(this);
            } catch (InterruptedException e) {
                System.err.println("*** unexpected interruption ***");
            }
        }
    }
    
    private class PacketReceiver implements Runnable {
        public void run() {
            try {
                while (true) {
                    DatagramPacket packet = new DatagramPacket(new byte[600], 600);
                    connection.receive(packet);
                    packetCount.incrementAndGet();
                    int delay = getDelay();
                    receiveExecutor.schedule(new ReceivePacket(packet), delay, TimeUnit.MILLISECONDS);
                }
            } catch (IOException e) {
                int delay = getDelay();
                receiveExecutor.schedule(new ReceiveError(e), delay, TimeUnit.MILLISECONDS);
            }
        }
    }
    
    private class PacketSender implements Runnable {
        private DatagramPacket packet;
        
        PacketSender(DatagramPacket packet) {
            this.packet = packet;
        }
        
        public void run() {
            try {
                connection.send(packet);
            } catch (IOException e) {
                log("I/O exception during send");
            }
        }
    }
    
    private DatagramSocket connection;
    private AtomicInteger packetCount = new AtomicInteger();
    private AtomicInteger dropCount = new AtomicInteger();
    private Object lock = new Object();
    private Random randSequence = new Random();
    private LinkedList<Long> timeLog = new LinkedList<Long>();
    private ScheduledThreadPoolExecutor receiveExecutor = new ScheduledThreadPoolExecutor(1);
    private ScheduledThreadPoolExecutor sendExecutor = new ScheduledThreadPoolExecutor(1);
    private LinkedBlockingQueue<Runnable> receiveQueue = new LinkedBlockingQueue<Runnable>();
    
    public UdpModeledConnection() throws SocketException {
        connection = new DatagramSocket();
        new Thread(new PacketReceiver()).start();
    }
    
    public int getTotalPackets() {
        return packetCount.get();
    }
    
    public int getDroppedPackets() {
        return dropCount.get();
    }
    
    public void sendSafely(DatagramPacket packet) throws IOException {
        packetCount.incrementAndGet();
        computeDropProbability();
        sendExecutor.schedule(new PacketSender(packet), getDelay(), TimeUnit.MILLISECONDS);
    }
    
    public void send(DatagramPacket packet) throws IOException {
        packetCount.incrementAndGet();
        double dropProb = computeDropProbability();
        if (randSequence.nextDouble() < dropProb) {
            log("*** outgoing packet dropped [P=%4.1f%%] ***", 100.0 * dropProb);
            dropCount.incrementAndGet();
        } else {
            sendExecutor.schedule(new PacketSender(packet), getDelay(), TimeUnit.MILLISECONDS);
        }
    }
    
    public void setSoTimeout(int millis) throws SocketException {
        connection.setSoTimeout(millis);
    }
    
    public DatagramPacket receiveTimeout(int millis) throws IOException {
        while (true) {
            Runnable next;
            try {
                if (millis < 0) {
                    next = receiveQueue.take();
                } else {
                    next = receiveQueue.poll(millis, TimeUnit.MILLISECONDS);
                    if (next == null) {
                        return null;
                    }
                }
            } catch (InterruptedException e) {
                System.err.println("unexpected InterruptedException");
                return null;
            }
            if (next instanceof ReceiveError) {
                throw ((ReceiveError) next).e;
            }
            double dropProb = computeDropProbability();
            if (randSequence.nextDouble() < dropProb) {
                log("*** incoming packet dropped [P=%4.1f%%] ***", 100.0 * dropProb);
                dropCount.incrementAndGet();
            } else {
                return ((ReceivePacket) next).packet;
            }
        }
    }
    
    public DatagramPacket receive() throws IOException {
        return receiveTimeout(-1);
    }
    
    public void close() {
        synchronized (lock) {
            sendExecutor.shutdown();
            try {
                sendExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) { }
            connection.close();
        }
    }
    
    private void log(String message, Object... parms) {
        System.out.printf("UDP: %s\n", String.format(message, parms));
    }
    
    private int getDelay() {
        int delay = -1;
        while (delay <= 0) {
            delay = (int) Math.round(DELAY_MEAN_MILLIS + DELAY_DEVIATION_MILLIS * randSequence.nextGaussian());
        }
        return delay;
    }

    private double computeDropProbability() {
        long now = System.nanoTime();
        synchronized (lock) {
            timeLog.add(Long.valueOf(now));
            while (now - timeLog.peekFirst().longValue() > DROP_LOG_KEEP_MILLIS * 1000000) {
                timeLog.removeFirst();
            }
            double avgTime = (double) DROP_LOG_KEEP_MILLIS / timeLog.size();
            double dropProb = Math.min(1.0, DROP_CERTAIN_MILLIS / avgTime + DROP_MIN_PROB);
            return dropProb;
        }
    }
}
