Source code for ivxj.ivxj

import numpy as np
import pandas as pd

from ivxj.split_mat_into_cells import split_mat_into_cells
from ivxj.xj import xj
from ivxj.delete_period_obs import delete_period_obs
from ivxj.gen_ivx import gen_ivx
from ivxj.within_trans import within_trans


[docs] def ivxj(data, rhoz, identity=None, time=None, y_name=None, x_name=None): """ IVXJ Estimation for Unbalanced Panel Data (Univariate Case). This function performs Instrumental Variable with XJ (IVXJ) estimation on unbalanced panel data in a univariate setting. It sorts the panel data, extracts dependent and independent variables, and applies the IVXJ method. The function returns the IVX and IVXJ estimates of the coefficient, the standard error, and the XJ estimate of rho. The method is designed for use in unbalanced panel data, where the number of time periods may differ across individual entities. Parameters ---------- data : pandas.DataFrame A DataFrame containing unbalanced panel data. It must include columns for an entity identifier, a time variable, and both dependent and independent variables. rhoz : float A user-defined IVX parameter, denoted as rho_z, controlling the strength of persistence in the instruments. identity : str, optional The name of the column in `data` representing the individual entity (cross-sectional unit). If None, the first column of `data` is used as the identity column. time : str, optional The name of the column in `data` representing the time dimension. If None, the second column of `data` is used as the time variable. y_name : str, optional The name of the column in `data` representing the dependent variable. If None, the third column of `data` is used as the dependent variable. x_name : str, optional The name of the column in `data` representing the independent variable. If None, the fourth column of `data` is used as the independent variable. Returns ------- btaHat : numpy.ndarray The IVX estimate of the coefficient. btaHatDebias : numpy.ndarray The IVXJ estimate of the coefficient. se : numpy.ndarray The standard error of the IVXJ estimate. rhoHat : float The XJ estimate of rho. Raises ------ KeyError If the specified column names for identity, time, y, or x do not exist in the `data` DataFrame. ValueError If the `data` does not contain enough columns to assign variables when default column indices are used. Examples -------- Example 1: Applying IVXJ to an unbalanced panel dataset >>> import pandas as pd >>> import numpy as np >>> data = pd.DataFrame({ ... 'id': np.repeat([1, 2], 21), ... 'time': np.tile(np.arange(1, 22), 2), ... 'y': np.random.randint(0, 2, 42), ... 'x': np.round(np.random.uniform(1, 3, 42), 1) ... }) >>> rhoz = 0.9 >>> btaHat, btaHatDebias, se, rhoHat = ivxj(data, rhoz, 'id', 'time', 'y', 'x') >>> print(btaHat, btaHatDebias, se, rhoHat) Example 2: Using default columns for entity, time, dependent, and independent variables >>> ivxj(data, rhoz) References ---------- For more details on the IVXJ method, see the original paper by Liao, Mei and Shi (2024). """ # Default to first columns if no identity, time, y, x names are provided if identity is None and time is None and y_name is None and x_name is None: identity = data.columns[0] time = data.columns[1] y_name = data.columns[2] x_name = data.columns[3] # Sort the data by identity and time columns data_sorted = data.sort_values(by=[identity, time]) # Extract y (dependent variable) and x (independent variable) as numpy arrays y = data_sorted[y_name].to_numpy(dtype=np.float64) x = data_sorted[x_name].to_numpy(dtype=np.float64) # Group by 'identity' and count occurrences (Tlens is the number of time periods for # each entity) identity_counts = data_sorted.groupby(identity).size() # Convert counts to a numpy array (Tlens) Tlens = np.array(identity_counts.values, dtype=int) # Call raw_ivxj to perform the IVXJ estimation btaHat, btaHatDebias, se, rhoHat = raw_ivxj(y, x, rhoz, Tlens) return btaHat, btaHatDebias, se, rhoHat
[docs] def raw_ivxj(y, x, rhoz, Tlens): """ Compute IVXJ estimates for unbalanced panel data in the univariate case. This function calculates IVXJ estimates using other helper functions in the package. It is designed to handle unbalanced panel data with different time lengths for each individual. Parameters ---------- y : array-like of shape (n_total,), dtype=float64 Dependent variable, a stacked column vector of all individuals: (y_1, ..., y_n). x : array-like of shape (n_total,), dtype=float64 Regressor, a stacked column vector of all individuals: (x_1, ..., x_n)'. rhoz : float User-defined IVX parameter (rho_z) for IVX generation. Tlens : array-like of shape (n,), dtype=int Vector of individual time lengths: (T_1, ..., T_n). Returns ------- btaHat : float IVX estimate of the coefficient beta. btaHatDebias : float Debiased IVXJ estimate of the coefficient beta. se : float Standard error of the estimate. rhoHat : float XJ estimate of rho. """ # Ensure everything is in float64 for consistency y = np.array(y, dtype=np.float64) x = np.array(x, dtype=np.float64) rhoz = np.float64(rhoz) Tlens = Tlens.astype(int) # Estimate rho rhoHat = xj(x, Tlens) # Lost one period due to lag y = delete_period_obs(y, Tlens, 1) xLag = delete_period_obs(x, Tlens, 1, False) x = delete_period_obs(x, Tlens, 1) # Update Tlens Tlens -= 1 obs_total = np.sum(Tlens) # Self-generated instrument zLag = gen_ivx(xLag, rhoz, Tlens) # Within transformation xTilde = within_trans(x, Tlens) xLagTilde = within_trans(xLag, Tlens) zLagTilde = within_trans(zLag, Tlens) yTilde = within_trans(y, Tlens) # Denominator ZX ZX = np.dot(zLagTilde.flatten(), xLag.flatten()) # Estimate of beta btaHat = np.dot(zLagTilde.flatten(), y.flatten()) / ZX # Residuals uTilde = yTilde - btaHat * xLagTilde vTilde = xTilde - rhoHat * xLagTilde # Estimate of omega11 omg11Hat = np.dot(uTilde.flatten(), uTilde.flatten()) / obs_total # Estimate of omega12 omg12Hat = np.dot(vTilde.flatten(), uTilde.flatten()) / obs_total # Estimate of Nickell bias lam_seq = ( (rhoz - rhoz**Tlens) / (1 - rhoz) - (rhoHat - rhoHat**Tlens) / (1 - rhoHat) ) / (rhoz - rhoHat) b = omg12Hat * np.sum(lam_seq / Tlens) / ZX # Standard error of betaHat se = np.sqrt( omg11Hat * ( np.dot(zLag.flatten(), zLag.flatten()) - (rhoHat >= 1) * sum_of_mean_sq(zLag, Tlens) ) ) / abs(ZX) # Debiased beta btaHatDebias = btaHat + b return btaHat, btaHatDebias, se, rhoHat
[docs] def sum_of_mean_sq(A, Tlens): """ Sum of within mean squares for unbalanced panel. """ subMatList = split_mat_into_cells(A, Tlens) B = np.sum([np.mean(x) ** 2 for x in subMatList] * (Tlens**0.95)) return B