pins/mock.js

const Pin = require("./index.js").Pin;
const exc = require("../exc.js");
const assert = require('chai').assert;
const isclose = require('../compat.js').isclose;
const inherit = require('../tools.js').inherit;
let _PINS = [];

/**
 *  A mock pin used primarily for testing. This class does *not* support PWM.
 *
 * @param {int} number - GPIO Pin to simulate.
 * @returns {MockPin} - The pin that has been instatiated or the pin already associated with this pin number.
 * @class
 * @augments Pin
 */
function MockPin(number) {
    if (number < 0 || number > 54) {
        throw new Error('invalid pin ' + number.toString() + ' specified (must be 0..53)');
    }
    const old_pin = _PINS[number];
    if (old_pin === undefined) {
        Pin.call(this);
        _PINS[number] = this;
        this._number = number;
        this._function = 'input';
        this._state = false;
        this._pull = 'floating';
        this._bounce = null;
        this._edges = 'both';
        this._when_changed = null;
        this.clear_states();
        return this;
    }
    // Ensure the pin class expected supports PWM (or not)
    //if issubclass(cls, MockPWMPin) != isinstance(old_pin, MockPWMPin):
    //    raise ValueError('pin %d is already in use as a %s' % (number, old_pin.__class__.__name__))
    return old_pin;
}
MockPin.prototype = inherit(Pin.prototype);
MockPin.prototype.constructor = MockPin;

MockPin.prototype.when_changed = function(next) {
    this._when_changed = next;
}

MockPin.prototype.clear_states = function() {
    this._last_change = (new Date()).getTime();
    this.states = [{
        time: 0.0,
        state: this._state
    }];
};

MockPin.prototype.toString = function () {
    return 'MOCK'+this._number;
}

MockPin.prototype.state_history = function() {
    return this.states;
};

MockPin.prototype.state = function(value) {
    if (value === undefined) {
        return this._state;
    }
    if (this._function !== 'output') {
        throw new exc.PinSetInput('cannot set state of pin ' + this);
    }
    if (value !== '0' && value !== '1' && value !== true && value !== false) {
        throw new exc.PinSetInput('Invalid Value - must be 1 or 0 not ' + value.toString());
    }
    this._change_state(typeof(value) !== "boolean" ? Boolean(value) : value);
};

MockPin.prototype._change_state = function(value) {
    if (this._state !== value) {
        var t = (new Date()).getTime();
        this._state = value;
        this.states.push({
            time: t - this._last_change,
            state: value
        });
        this._last_change = t;
        return true;
    }
    return false;
};

MockPin.prototype.pin_function = function(value) {
    if (value === undefined) {
        return this._function;
    }
    if (value === 'input' || value === 'output') {
        this._function = value;
        if (value === 'input') {
            //# Drive the input to the pull
            //self._set_pull(self._get_pull())
        }
    } else {
        throw new exc.PinSetInput('Invalid Value - must be input or output not ' + value.toString());
    }
};

MockPin.prototype.close = function() {
    this.when_changed = undefined;
    this._function = 'input';

};

MockPin.prototype.frequency = function(value) {
    if (value === undefined) {
        return;
    }
    throw new exc.PinPWMUnsupported();
};

MockPin.prototype.number = function() {
    return this._number;
};

MockPin.prototype.assert_states = function(expected) {
    // Tests that the pin went through the expected states (a list of values)
    for (let i = 0, len = expected.length; i < len; i++) {
        const actual = this.state_history()[i].state;
        assert(isclose(actual, expected[i], undefined, 10), actual + " not equal to " + expected[i]);
    }
};

MockPin.prototype.assert_states_and_times = function(expected) {
    // Tests that the pin went through the expected states at the expected
    // times (times are compared with a tolerance of tens-of-milliseconds as
    // that's about all we can reasonably expect in a non-realtime
    // environment on a Pi 1)
    assert(expected.length <= this.state_history().length, 'Expected length:' + expected.length + ' Actual length:' + this.state_history().length);

    for (let i = 0; i < expected.length; i++) {
        let actual = this.state_history()[i].state;
        assert(isclose(actual, expected[i].state, 0.05, undefined), actual + " not equal to " + expected[i].state);

        if (expected[i].time === 0) {
            actual = this.state_history()[i].time;
            assert(actual === expected[i].time, "Times are not equal Expected:" + expected[i].time + " Actual:" + actual);
        } else if (expected[i].time !== 1) {
            actual = this.state_history()[i].time;
            assert(isclose(actual, expected[i].time, undefined, 10), actual + " not equal to " + expected[i].time);
        }
    }
};

MockPin.prototype.pull = function (value) {
  if (value === undefined) {
      return this._pull;
  }
  assert(this._function === 'input', 'Pin function is not set to input');
  assert(['floating', 'up', 'down'].indexOf(value) !== -1, 'Invalid pull value');
  this._pull = value;
  if (value === 'up') {
      this.drive_high();
  } else if (value === 'down') {
      this.drive_low();
  }
};

MockPin.prototype.drive_high = function () {
    assert(this._function === 'input', 'Pin function is not set to input');
    if (this._change_state(true)) {
        if (['both', 'rising'].indexOf(this._edges) !== -1) {
            if(this._when_changed !== null) {
                this._when_changed();
            }
        }
    }
};

MockPin.prototype.drive_low = function () {
    assert(this._function === 'input', 'Pin function is not set to input');
    if (this._change_state(false)) {
        if (['both', 'rising'].indexOf(this._edges) !== -1) {
            if(this._when_changed !== null) {
                this._when_changed();
            }
        }
    }
};

exports.MockPin = MockPin;


function clear_pins() {
    _PINS = {};
}

exports.clear_pins = clear_pins;


function MockPWMPin(number) {
    MockPin.call(this, number);
    this._state = 0.0;
    this.clear_states();
    this._frequency = undefined;
}

MockPWMPin.prototype = inherit(MockPin.prototype);
MockPWMPin.prototype.constructor = MockPWMPin;

exports.MockPWMPin = MockPWMPin;

MockPWMPin.prototype.close = function() {
    this.frequency(undefined);
    MockPin.prototype.close.call(this);
};

MockPWMPin.prototype.frequency = function(value) {
    if (value === undefined) {
        return this._frequency;
    }
    if (this._function !== 'output') {
        throw new exc.PinSetInput("Pin is not set for output function");
    }
    if (value === -1) {
        this._frequency = undefined;
        this._change_state(0.0);
    } else {
        this._frequency = value;
    }
};

MockPWMPin.prototype.state = function(value) {
    if (value === undefined) {
        return this._state;
    }
    if (this._function !== 'output') {
        throw new exc.PinSetInput('cannot set state of pin ' + this);
    }
    if (value < 0 || value > 1) {
        throw new exc.OutputDeviceBadValue("initial_value must be between 0 and 1, actual=:" + value);
    }
    this._change_state(parseFloat(value));
};

MockPWMPin.prototype.blink = function(on_time, off_time, fade_in_time, fade_out_time, n, fps, callback) {
    this.on_time = (on_time === undefined ? 1 : on_time);
    this.off_time = (off_time === undefined ? 1 : off_time);
    this.fade_in_time = (fade_in_time === undefined ? 0 : fade_in_time);
    this.fade_out_time = (fade_out_time === undefined ? 0 : fade_out_time);
    this.fps = (fps === undefined ? 50 : fps);
    this.n = (n === undefined ? 0 : n);
    this.sequence = [];
    this.blink_callback = callback;
    var i = 0;

    if (this.fade_in_time > 0) {
        for (i = 0; i < this.fps * this.fade_in_time; i++) {
            this.sequence.push({
                value: i * (1 / this.fps) / this.fade_in_time,
                delay: 1 / this.fps
            });
        }
    }
    this.sequence.push({
        value: 1,
        delay: this.on_time
    });

    if (this.fade_out_time > 0) {
        for (i = 0; i < this.fps * this.fade_out_time; i++) {
            this.sequence.push({
                value: 1 - (i * (1 / this.fps)) / this.fade_out_time,
                delay: 1 / this.fps
            });
        }
    }
    this.sequence.push({
        value: 0,
        delay: this.off_time
    });

    if (this.n > 0) {
        for (i = 0; i < (this.n - 1); i++) {
            this.sequence = this.sequence.concat(this.sequence);
        }

        var nextStep = this.sequence.pop();
        this.state(nextStep.value);
        var that = this;
        this._blink_timer = setTimeout(that._run_blink, nextStep.delay, this.sequence, this);
    }
};

MockPWMPin.prototype._run_blink = function(sequence, that) {
    if (sequence.length > 0) {
        var nextStep = sequence.pop();
        that.state(nextStep.value);
        that._blink_timer = setTimeout(that._run_blink, nextStep.delay, sequence, that);
    } else if (that.blink_callback !== undefined) {
        that.blink_callback();
    }
};

/**
 * This derivative of {@link MockPin} emulates a pin with a physical pull-up resistor.
 *
 * @param {int} number - GPIO Pin number that is being simulated.
 * @class
 * @augments MockPin
 */
function MockPulledUpPin(number) {
    MockPin.call(this, number);
}
MockPulledUpPin.prototype = inherit (MockPin.prototype);
MockPulledUpPin.prototype.constructor = MockPulledUpPin;

MockPulledUpPin.prototype.pull = function (value) {
    if (value !== 'up') {
        throw new exc.PinFixedPull('pin has a physical pull-up resistor');
    }
}

exports.MockPulledUpPin = MockPulledUpPin;

/*    
    def pi_info(cls):
        return pi_info('a21041') # Pretend we're a Pi 2B       

    def __repr__(self):
        return 'MOCK%d' % self._number

    @property
    def number(self):
        return self._number

    def close(self):
        self.when_changed = None
        self.function = 'input'

    def _get_frequency(self):
        return None

    def _set_frequency(self, value):
        if value is not None:
            raise PinPWMUnsupported()

    def _get_pull(self):
        return self._pull

    def _set_pull(self, value):
        assert self._function == 'input'
        assert value in ('floating', 'up', 'down')
        self._pull = value
        if value == 'up':
            self.drive_high()
        elif value == 'down':
            self.drive_low()

    def _get_bounce(self):
        return self._bounce

    def _set_bounce(self, value):
        # XXX Need to implement this
        self._bounce = value

    def _get_edges(self):
        return self._edges

    def _set_edges(self, value):
        assert value in ('none', 'falling', 'rising', 'both')
        self._edges = value

    def _get_when_changed(self):
        return self._when_changed

    def _set_when_changed(self, value):
        self._when_changed = value

    def drive_high(self):
        assert self._function == 'input'
        if self._change_state(True):
            if self._edges in ('both', 'rising') and self._when_changed is not None:
                self._when_changed()

    def drive_low(self):
        assert self._function == 'input'
        if self._change_state(False):
            if self._edges in ('both', 'falling') and self._when_changed is not None:
                self._when_changed()

    def clear_states(self):
        self._last_change = time()
        self.states = [PinState(0.0, self._state)]

    def assert_states(self, expected_states):
        # Tests that the pin went through the expected states (a list of values)
        for actual, expected in zip(self.states, expected_states):
            assert actual.state == expected

    def assert_states_and_times(self, expected_states):
        # Tests that the pin went through the expected states at the expected
        # times (times are compared with a tolerance of tens-of-milliseconds as
        # that's about all we can reasonably expect in a non-realtime
        # environment on a Pi 1)
        for actual, expected in zip(self.states, expected_states):
            assert isclose(actual.timestamp, expected[0], rel_tol=0.05, abs_tol=0.05)
            assert isclose(actual.state, expected[1])

class MockChargingPin(MockPin):
    """
    This derivative of :class:`MockPin` emulates a pin which, when set to
    input, waits a predetermined length of time and then drives itself high
    (as if attached to, e.g. a typical circuit using an LDR and a capacitor
    to time the charging rate).
    """
    def __init__(self, number):
        super(MockChargingPin, self).__init__()
        self.charge_time = 0.01 # dark charging time
        self._charge_stop = Event()
        self._charge_thread = None

    def _set_function(self, value):
        super(MockChargingPin, self)._set_function(value)
        if value == 'input':
            if self._charge_thread:
                self._charge_stop.set()
                self._charge_thread.join()
            self._charge_stop.clear()
            self._charge_thread = Thread(target=self._charge)
            self._charge_thread.start()
        elif value == 'output':
            if self._charge_thread:
                self._charge_stop.set()
                self._charge_thread.join()

    def _charge(self):
        if not self._charge_stop.wait(self.charge_time):
            try:
                self.drive_high()
            except AssertionError:
                # Charging pins are typically flipped between input and output
                # repeatedly; if another thread has already flipped us to
                # output ignore the assertion-error resulting from attempting
                # to drive the pin high
                pass


class MockTriggerPin(MockPin):
    """
    This derivative of :class:`MockPin` is intended to be used with another
    :class:`MockPin` to emulate a distance sensor. Set :attr:`echo_pin` to the
    corresponding pin instance. When this pin is driven high it will trigger
    the echo pin to drive high for the echo time.
    """
    def __init__(self, number):
        super(MockTriggerPin, self).__init__()
        self.echo_pin = None
        self.echo_time = 0.04 # longest echo time
        self._echo_thread = None

    def _set_state(self, value):
        super(MockTriggerPin, self)._set_state(value)
        if value:
            if self._echo_thread:
                self._echo_thread.join()
            self._echo_thread = Thread(target=self._echo)
            self._echo_thread.start()

    def _echo(self):
        sleep(0.001)
        self.echo_pin.drive_high()
        sleep(self.echo_time)
        self.echo_pin.drive_low()





class MockSPIClockPin(MockPin):
    """
    This derivative of :class:`MockPin` is intended to be used as the clock pin
    of a mock SPI device. It is not intended for direct construction in tests;
    rather, construct a :class:`MockSPIDevice` with various pin numbers, and
    this class will be used for the clock pin.
    """
    def __init__(self, number):
        super(MockSPIClockPin, self).__init__()
        if not hasattr(self, 'spi_devices'):
            self.spi_devices = []

    def _set_state(self, value):
        super(MockSPIClockPin, self)._set_state(value)
        for dev in self.spi_devices:
            dev.on_clock()


class MockSPISelectPin(MockPin):
    """
    This derivative of :class:`MockPin` is intended to be used as the select
    pin of a mock SPI device. It is not intended for direct construction in
    tests; rather, construct a :class:`MockSPIDevice` with various pin numbers,
    and this class will be used for the select pin.
    """
    def __init__(self, number):
        super(MockSPISelectPin, self).__init__()
        if not hasattr(self, 'spi_device'):
            self.spi_device = None

    def _set_state(self, value):
        super(MockSPISelectPin, self)._set_state(value)
        if self.spi_device:
            self.spi_device.on_select()


class MockSPIDevice(object):
    def __init__(
            self, clock_pin, mosi_pin, miso_pin, select_pin=None,
            clock_polarity=False, clock_phase=False, lsb_first=False,
            bits_per_word=8, select_high=False):
        self.clock_pin = MockSPIClockPin(clock_pin)
        self.mosi_pin = None if mosi_pin is None else MockPin(mosi_pin)
        self.miso_pin = None if miso_pin is None else MockPin(miso_pin)
        self.select_pin = None if select_pin is None else MockSPISelectPin(select_pin)
        self.clock_polarity = clock_polarity
        self.clock_phase = clock_phase
        self.lsb_first = lsb_first
        self.bits_per_word = bits_per_word
        self.select_high = select_high
        self.rx_bit = 0
        self.rx_buf = []
        self.tx_buf = []
        self.clock_pin.spi_devices.append(self)
        self.select_pin.spi_device = self

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.close()

    def close(self):
        if self in self.clock_pin.spi_devices:
            self.clock_pin.spi_devices.remove(self)
        if self.select_pin is not None:
            self.select_pin.spi_device = None

    def on_select(self):
        if self.select_pin.state == self.select_high:
            self.on_start()

    def on_clock(self):
        # Don't do anything if this SPI device isn't currently selected
        if self.select_pin is None or self.select_pin.state == self.select_high:
            # The XOR of the clock pin's values, polarity and phase indicates
            # whether we're meant to be acting on this edge
            if self.clock_pin.state ^ self.clock_polarity ^ self.clock_phase:
                self.rx_bit += 1
                if self.mosi_pin is not None:
                    self.rx_buf.append(self.mosi_pin.state)
                if self.miso_pin is not None:
                    try:
                        tx_value = self.tx_buf.pop(0)
                    except IndexError:
                        tx_value = 0
                    if tx_value:
                        self.miso_pin.drive_high()
                    else:
                        self.miso_pin.drive_low()
                self.on_bit()

    def on_start(self):
        """
        Override this in descendents to detect when the mock SPI device's
        select line is activated.
        """
        self.rx_bit = 0
        self.rx_buf = []
        self.tx_buf = []

    def on_bit(self):
        """
        Override this in descendents to react to receiving a bit.

        The :attr:`rx_bit` attribute gives the index of the bit received (this
        is reset to 0 by default by :meth:`on_select`). The :attr:`rx_buf`
        sequence gives the sequence of 1s and 0s that have been recevied so
        far. The :attr:`tx_buf` sequence gives the sequence of 1s and 0s to
        transmit on the next clock pulses. All these attributes can be modified
        within this method.

        The :meth:`rx_word` and :meth:`tx_word` methods can also be used to
        read and append to the buffers using integers instead of bool bits.
        """
        pass

    def rx_word(self):
        result = 0
        bits = reversed(self.rx_buf) if self.lsb_first else self.rx_buf
        for bit in bits:
            result <<= 1
            result |= bit
        return result

    def tx_word(self, value, bits_per_word=None):
        if bits_per_word is None:
            bits_per_word = self.bits_per_word
        bits = [0] * bits_per_word
        for bit in range(bits_per_word):
            bits[bit] = value & 1
            value >>= 1
        assert not value
        if not self.lsb_first:
            bits = reversed(bits)
        self.tx_buf.extend(bits)

*/