Source code for catsim.simulation

"""Module containing functions relevant to the process of simulating the
application of adaptive tests. Most of this module is based on the work of
[Bar10]_."""

import time
from abc import ABCMeta, abstractmethod
from typing import List, Optional, Union

import numpy
import tqdm

from . import cat, irt


[docs] class Simulable(metaclass=ABCMeta): """Base class representing one of the Simulator components that will receive a reference back to it.""" def __init__(self): super(Simulable).__init__() self._simulator = None @property def simulator(self): if self._simulator is not None and not isinstance(self._simulator, Simulator): raise ValueError("simulator has to be of type catsim.simulation.Simulator") return self._simulator @simulator.setter def simulator(self, x: "Simulator"): if not isinstance(x, Simulator): raise ValueError("simulator has to be of type catsim.simulation.Simulator") self._simulator = x self.preprocess()
[docs] def preprocess(self): """Override this method to initialize any static values the `Simulable` might use for the duration of the simulation. `preprocess` is called after a value is set for the `simulator` property. If a new value if attributed to `simulator`, this method is called again, guaranteeing that internal properties of the `Simulable` are re-initialized as necessary."""
def _prepare_args( self, return_items=False, return_response_vector=False, return_est_theta=False, **kwargs ): using_simulator_props = kwargs.get("index") is not None and self.simulator is not None if not using_simulator_props and ( kwargs.get("items") is None or kwargs.get("administered_items") is None or (return_est_theta and kwargs.get("est_theta") is None) ): raise ValueError( "Either pass an index for the simulator or all of the other " "optional parameters to use this component independently." ) result = [] if using_simulator_props: index = kwargs["index"] if return_items: result.append(self.simulator.items) result.append(self.simulator.administered_items[index]) if return_response_vector: result.append(self.simulator.response_vectors[index]) if return_est_theta: result.append(self.simulator.latest_estimations[index]) else: if return_items: result.append(kwargs["items"]) result.append(kwargs["administered_items"]) if return_response_vector: result.append(kwargs["response_vector"]) if return_est_theta: result.append(kwargs["est_theta"]) return tuple(result)
[docs] class Initializer(Simulable, metaclass=ABCMeta): """Base class for CAT initializers""" def __init__(self): super().__init__()
[docs] @abstractmethod def initialize(self, index: int) -> float: """Selects an examinee's initial :math:`\\theta` value :param index: the index of the current examinee :returns: examinee's initial :math:`\\theta` value """
[docs] class Selector(Simulable, metaclass=ABCMeta): """Base class representing a CAT item selector.""" def __init__(self): super().__init__() @staticmethod def _get_non_administered( item_indices: List[int], administered_item_indices: List[int] ) -> list: """Gets a list of items that were not administered from a list of indices :param item_indices: a list of integers, corresponding to item indices :type item_indices: List[int] :param administered_item_indices: a list of integers, corresponding to the indices of items that were alredy administered to a given examinee :type administered_item_indices: List[int] :return: a list of items, corresponding to the indices that are in `item_indices` but not in `administered_item_indices`, in the same order they were passed in `item_indices` :rtype: List[int] """ return [x for x in item_indices if x not in administered_item_indices] @staticmethod def _sort_by_info(items: numpy.ndarray, est_theta: float) -> list: """Sort items by their information value, given a ability value :param items: an item parameter matrix :type items: numpy.ndarray :param est_theta: an examinee's ability :type est_theta: float :return: List[int] containing the indices of items, sorted in descending order by their information values (much like the return of `numpy.argsort`) :rtype: List[int] """ if irt.detect_model(items) == 1: # when the logistic model has the number of parameters <= 2, # all items have highest information where theta = b ordered_items = Selector._sort_by_b(items, est_theta) else: # else, sort item indexes by their information value descending and remove indexes of administered items ordered_items = [x for x in (-irt.inf_hpc(est_theta, items)).argsort()] return ordered_items @staticmethod def _sort_by_b(items: numpy.ndarray, est_theta: float) -> list: """Sort items by how close their difficulty parameter is in relaiton to an examinee's ability :param items: an item parameter matrix :type items: numpy.ndarray :param est_theta: an examinee's ability :type est_theta: float :return: list containing the indices of items, sorted by how close their difficulty parameter is in relaiton to :param:`est_theta` (much like the return of `numpy.argsort`) :rtype: List[int] """ return list(numpy.abs(items[:, 1] - est_theta).argsort())
[docs] @abstractmethod def select(self, index: int = None) -> Union[int, None]: """Returns the index of the next item to be administered. :param index: the index of the current examinee in the simulator. :returns: index of the next item to be applied or `None` if there are no more items to be presented. """
[docs] class FiniteSelector(Selector, metaclass=ABCMeta): """Base class representing a CAT item selector.""" def __init__(self, test_size): self._test_size = test_size self._overlap_rate = None super().__init__() @property def test_size(self) -> int: return self._test_size @property def overlap_rate(self) -> float: return self._overlap_rate
[docs] class Estimator(Simulable, metaclass=ABCMeta): """Base class for ability estimators""" def __init__(self, verbose: bool = False): super().__init__() self._calls = 0 self._evaluations = 0 self._verbose = verbose
[docs] @abstractmethod def estimate(self, index: int) -> float: """Returns the theta value that minimizes the negative log-likelihood function, given the current state of the test for the given examinee. :param index: index of the current examinee in the simulator :returns: the current :math:`\\hat\\theta` """
@property def calls(self): """How many times the estimator has been called to maximize/minimize the log-likelihood function :returns: number of times the estimator has been called to maximize/minimize the log-likelihood function """ return self._calls @property def evaluations(self): """Total number of times the estimator has evaluated the log-likelihood function during its existence :returns: number of function evaluations""" return self._evaluations @property def avg_evaluations(self): """Average number of function evaluations for all tests the estimator has been used :returns: average number of function evaluations""" return self._evaluations / self._calls
[docs] class Stopper(Simulable, metaclass=ABCMeta): """Base class for CAT stop criterion""" def __init__(self): super().__init__()
[docs] @abstractmethod def stop(self, index: int) -> bool: """Checks whether the test reached its stopping criterion for the given user :param index: the index of the current examinee :returns: `True` if the test met its stopping criterion, else `False`"""
[docs] class Simulator: """Class representing the simulator. It gathers several objects that describe the full simulation process and simulates one or more computerized adaptive tests :param items: a matrix containing item parameters :param examinees: an integer with the number of examinees, whose real :math:`\\theta` values will be sampled from a normal distribution; or a :py:type:list containing said :math:`\\theta_0` values """ def __init__( self, items: numpy.ndarray, examinees: Union[int, list, numpy.ndarray], initializer: Initializer = None, selector: Selector = None, estimator: Estimator = None, stopper: Stopper = None, ): irt.validate_item_bank(items) # adds a column for each item's exposure rate if items.shape[1] < 5: items = numpy.append(items, numpy.zeros([items.shape[0], 1]), axis=1) self._duration = 0.0 self._items = items self._bias = 0.0 self._mse = 0.0 self._rmse = 0.0 self._overlap_rate = 0.0 self._initializer = initializer self._selector = selector self._estimator = estimator self._stopper = stopper # `examinees` is passed to its special setter self._examinees = self._to_distribution(examinees) self._estimations = [[] for _ in range(self.examinees.shape[0])] # type: List[List[int]] self._administered_items = [ [] for _ in range(self.examinees.shape[0]) ] # type: List[List[int]] self._response_vectors = [ [] for _ in range(self.examinees.shape[0]) ] # type: List[List[bool]] @property def items(self) -> numpy.ndarray: """Item matrix used by the simulator. If the simulation already occurred, a column containing item exposure rates will be added to the matrix.""" return self._items @property def administered_items(self) -> list: """List of lists containing the indexes of items administered to each examinee during the simulation.""" return self._administered_items @property def estimations(self) -> list: """List of lists containing all estimated :math:`\\hat\\theta` values for all examinees during each step of the test.""" return self._estimations @property def response_vectors(self) -> list: """List of boolean lists containing the examinees answers to all items.""" return self._response_vectors @property def latest_estimations(self) -> list: """Final estimated :math:`\\hat\\theta` values for all examinees.""" return [ests[-1] if len(ests) > 0 else None for ests in self._estimations] @property def duration(self) -> float: """Duration of the simulation, in seconds.""" return self._duration @property def overlap_rate(self) -> float: """Overlap rate of the test, if it is of finite length.""" return self._overlap_rate @property def initializer(self) -> Optional[Initializer]: return self._initializer @property def selector(self) -> Optional[Selector]: return self._selector @property def estimator(self) -> Optional[Estimator]: return self._estimator @property def stopper(self) -> Optional[Stopper]: return self._stopper @property def bias(self) -> float: """Bias between the estimated and true abilities. This property is only available after :py:func:`simulate` has been successfully called. For more information on estimation bias, see :py:func:`catsim.cat.bias`""" return self._bias @property def mse(self) -> float: """Mean-squared error between the estimated and true abilities. This property is only available after :py:func:`simulate` has been successfully called. For more information on the mean-squared error of estimation, see :py:func:`catsim.cat.mse`""" return self._mse @property def rmse(self) -> float: """Root mean-squared error between the estimated and true abilities. This property is only available after :py:func:`simulate` has been successfully called. For more information on the root mean-squared error of estimation, see :py:func:`catsim.cat.rmse`""" return self._rmse @property def examinees(self) -> numpy.ndarray: """:py:type:numpy.ndarray containing examinees true ability values (:math:`\\theta`).""" return self._examinees @examinees.setter def examinees(self, x: Union[int, list, numpy.ndarray]): self._examinees = self._to_distribution(x) def _to_distribution(self, x): # generate examinees from a normal distribution # if an int was passed if isinstance(x, int): if self._items is not None: mean = numpy.mean(self._items[:, 1]) stddev = numpy.std(self._items[:, 1]) dist = numpy.random.normal(mean, stddev, x) else: dist = numpy.random.normal(0, 1, x) elif isinstance(x, list): dist = numpy.array(x) elif isinstance(x, numpy.ndarray) and x.ndim == 1: dist = x else: raise ValueError( "Examinees must be an int, list of floats or one-dimensional numpy array" ) return dist
[docs] def simulate( self, initializer: Initializer = None, selector: Selector = None, estimator: Estimator = None, stopper: Stopper = None, verbose: bool = False, ): """Simulates a computerized adaptive testing application to one or more examinees :param initializer: an initializer that selects examinees :math:`\\theta_0` :param selector: a selector that selects new items to be presented to examinees :param estimator: an estimator that reestimates examinees abilities after each item is applied :param stopper: an object with a stopping criteria for the test :param verbose: whether to periodically print a message regarding the progress of the simulation. Good for longer simulations. >>> from catsim.initialization import RandomInitializer >>> from catsim.selection import MaxInfoSelector >>> from catsim.estimation import NumericalSearchEstimator >>> from catsim.stopping import MaxItemStopper >>> from catsim.simulation import Simulator >>> from catsim.cat import generate_item_bank >>> initializer = RandomInitializer() >>> selector = MaxInfoSelector() >>> estimator = NumericalSearchEstimator() >>> stopper = MaxItemStopper(20) >>> Simulator(generate_item_bank(100), 10).simulate(initializer, selector, estimator, stopper) """ if initializer is not None: self._initializer = initializer if selector is not None: self._selector = selector if estimator is not None: self._estimator = estimator if stopper is not None: self._stopper = stopper assert self._initializer is not None assert self._selector is not None assert self._estimator is not None assert self._stopper is not None for s in [self._initializer, self._selector, self._estimator, self._stopper]: s.simulator = self if verbose: print( f"Starting simulation: {self._initializer} {self._selector} {self._estimator} {self._stopper} {self._items.shape[0]} items" ) pbar = tqdm.tqdm(total=len(self.examinees)) start_time = time.time() for current_examinee, true_theta in enumerate(self.examinees): if verbose: pbar.update() est_theta = self._initializer.initialize(current_examinee) self._estimations[current_examinee].append(est_theta) while not self._stopper.stop(current_examinee): selected_item = self._selector.select(current_examinee) # if the selector returns None, it means the selector and not the stopper, is asking the test to stop # this happens e.g. if the item bank or or the available strata end before the minimum error is achieved if selected_item is None: break # simulates the examinee's response via the four-parameter # logistic function response = ( irt.icc( true_theta, self.items[selected_item][0], self.items[selected_item][1], self.items[selected_item][2], self.items[selected_item][3], ) >= numpy.random.uniform() ) self._response_vectors[current_examinee].append(response) # adds the item selected by the selector to the pool of administered items self._administered_items[current_examinee].append(selected_item) # estimate the new theta using the given estimator est_theta = self._estimator.estimate(current_examinee) # count occurrences of this item in all tests item_occurrences = numpy.sum( [ selected_item in administered_list for administered_list in self._administered_items ] ) # update the exposure value for this item # r = number of tests item has been used on / total number of tests self.items[selected_item, 4] = item_occurrences / len(self.examinees) self._estimations[current_examinee].append(est_theta) self._duration = time.time() - start_time if verbose: pbar.close() print(f"Simulation took {self._duration} seconds") self._bias = cat.bias(self.examinees, self.latest_estimations) self._mse = cat.mse(self.examinees, self.latest_estimations) self._rmse = cat.rmse(self.examinees, self.latest_estimations) # overlap is computed only if all examinees answered the same amount of items # maybe there is a way to calculate it with tests of different lengths, # but I did not find it in the literature test_size = None len_first = len(self._administered_items[0]) if self._administered_items else None if isinstance(selector, FiniteSelector): test_size = selector.test_size elif all(len(i) == len_first for i in self._administered_items): test_size = len_first if test_size is not None: self._overlap_rate = cat.overlap_rate(self.items[:, 4], test_size)
if __name__ == "__main__": import doctest doctest.testmod()