Skip to content

Commit

Permalink
REF: typing continuation
Browse files Browse the repository at this point in the history
  • Loading branch information
attack68 committed Jan 17, 2025
1 parent 04408ba commit 9bee848
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 30 deletions.
2 changes: 1 addition & 1 deletion python/rateslib/instruments/bonds/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def cms(
# build a curve for pricing
today = self.basket[0].leg1.schedule.calendar.lag(
settlement,
-self.basket[0].kwargs["settle"], # type: ignore[arg-type, operator]
-self.basket[0].kwargs["settle"],
False,
)
unsorted_nodes = {
Expand Down
65 changes: 36 additions & 29 deletions python/rateslib/instruments/bonds/securities.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BondMixin:

leg1: FixedLeg | FloatLeg | IndexFixedLeg
kwargs: dict[str, Any]
calc_mode: str | BondCalcMode
calc_mode: BondCalcMode

def _period_index(self, settlement: datetime) -> int:
"""
Expand Down Expand Up @@ -314,12 +314,13 @@ def _generic_price_from_ytm(
d += self._period_cashflow(self.leg1._regular_periods[p_idx], curve) * v2**i * v1

# Add the redemption payment discounted by relevant factors
redemption: Cashflow | IndexCashflow = self.leg1._exchange_periods[1] # type: ignore[assignment]
if i == 0: # only looped 1 period, only use the last discount
d += self._period_cashflow(self.leg1._exchange_periods[1], curve) * v1
d += self._period_cashflow(redemption, curve) * v1
elif i == 1: # only looped 2 periods, no need for v2
d += self._period_cashflow(self.leg1._exchange_periods[1], curve) * v3 * v1
d += self._period_cashflow(redemption, curve) * v3 * v1
else: # looped more than 2 periods, regular formula applied
d += self._period_cashflow(self.leg1._exchange_periods[1], curve) * v2 ** (i - 1) * v3 * v1
d += self._period_cashflow(redemption, curve) * v2 ** (i - 1) * v3 * v1

# discount all by the first period factor and scaled to price
p = d / -self.leg1.notional * 100
Expand Down Expand Up @@ -366,10 +367,10 @@ def fwd_from_repo(
Any intermediate (non ex-dividend) cashflows between ``settlement`` and
``forward_settlement`` will also be assumed to accrue at ``repo_rate``.
"""
convention = _drb(defaults.convention, convention)
dcf_ = dcf(settlement, forward_settlement, convention)
convention_ = _drb(defaults.convention, convention)
dcf_ = dcf(settlement, forward_settlement, convention_)
if not dirty:
d_price = price + self.accrued(settlement)
d_price = price + self._accrued(settlement, self.calc_mode._settle_acc_frac_func)
else:
d_price = price
if self.leg1.amortization != 0:
Expand Down Expand Up @@ -397,23 +398,26 @@ def fwd_from_repo(

for p_idx in range(settlement_idx, fwd_settlement_idx):
# deduct accrued coupon from dirty price
c_period = self.leg1._regular_periods[p_idx]
c_cashflow: DualTypes = c_period.cashflow # type: ignore[assignment]
# TODO handle FloatPeriod cashflow fetch if need a curve.
if method.lower() == "proceeds":
dcf_ = dcf(self.leg1.periods[p_idx].payment, forward_settlement, convention)
accrued_coup = self.leg1.periods[p_idx].cashflow * (1 + dcf_ * repo_rate / 100)
dcf_ = dcf(c_period.payment, forward_settlement, convention_)
accrued_coup = c_cashflow * (1 + dcf_ * repo_rate / 100)
total_rtn -= accrued_coup
elif method.lower() == "compounded":
r_bar, d, _ = average_rate(settlement, forward_settlement, convention, repo_rate)
n = (forward_settlement - self.leg1.periods[p_idx].payment).days
accrued_coup = self.leg1.periods[p_idx].cashflow * (1 + d * r_bar / 100) ** n
r_bar, d, _ = average_rate(settlement, forward_settlement, convention_, repo_rate)
n = (forward_settlement - c_period.payment).days
accrued_coup = c_cashflow * (1 + d * r_bar / 100) ** n
total_rtn -= accrued_coup
else:
raise ValueError("`method` must be in {'proceeds', 'compounded'}.")

forward_price = total_rtn / -self.leg1.notional * 100
forward_price: DualTypes = total_rtn / -self.leg1.notional * 100
if dirty:
return forward_price
else:
return forward_price - self.accrued(forward_settlement)
return forward_price - self._accrued(forward_settlement, self.calc_mode._settle_acc_frac_func)

def repo_from_fwd(
self,
Expand Down Expand Up @@ -452,15 +456,15 @@ def repo_from_fwd(
Any intermediate (non ex-dividend) cashflows between ``settlement`` and
``forward_settlement`` will also be assumed to accrue at ``repo_rate``.
"""
convention = _drb(defaults.convention, convention)
convention_ = _drb(defaults.convention, convention)
# forward price from repo is linear in repo_rate so reverse calculate with AD
if not dirty:
p_t = forward_price + self.accrued(forward_settlement)
p_0 = price + self.accrued(settlement)
p_t = forward_price + self._accrued(forward_settlement, self.calc_mode._settle_acc_frac_func)
p_0 = price + self._accrued(settlement, self.calc_mode._settle_acc_frac_func)
else:
p_t, p_0 = forward_price, price

dcf_ = dcf(settlement, forward_settlement, convention)
dcf_ = dcf(settlement, forward_settlement, convention_)
numerator = p_t - p_0
denominator = p_0 * dcf_

Expand All @@ -483,19 +487,22 @@ def repo_from_fwd(

for p_idx in range(settlement_idx, fwd_settlement_idx):
# deduct accrued coupon from dirty price
dcf_ = dcf(self.leg1.periods[p_idx].payment, forward_settlement, convention)
numerator += 100 * self.leg1.periods[p_idx].cashflow / -self.leg1.notional
denominator -= 100 * dcf_ * self.leg1.periods[p_idx].cashflow / -self.leg1.notional
c_period = self.leg1._regular_periods[p_idx]
c_cashflow: DualTypes = c_period.cashflow # type: ignore[assignment]
# TODO handle FloatPeriod if it needs a Curve to forecast cashflow
dcf_ = dcf(c_period.payment, forward_settlement, convention_)
numerator += 100 * c_cashflow / -self.leg1.notional
denominator -= 100 * dcf_ * c_cashflow / -self.leg1.notional

return numerator / denominator * 100

def _npv_local(
self,
curve: Curve | LineCurve,
curve: Curve,
disc_curve: Curve,
settlement: datetime,
projection: datetime,
):
) -> DualTypes:
"""
Return the NPV (local) of the security by summing cashflow valuations.
Expand Down Expand Up @@ -2805,7 +2812,7 @@ def rate(
float, Dual, Dual2
"""
curves, fx_, base_ = _get_curves_fx_and_base_maybe_from_solver(
curves_, fx_, base_ = _get_curves_fx_and_base_maybe_from_solver(
self.curves,
solver,
curves,
Expand All @@ -2818,27 +2825,27 @@ def rate(
if metric in ["clean_price", "dirty_price", "spread", "ytm"]:
if isinstance(forward_settlement, NoInput):
settlement = self.leg1.schedule.calendar.lag(
curves[1].node_dates[0], # discount curve
curves_[1].node_dates[0], # discount curve
self.kwargs["settle"],
True,
)
else:
settlement = forward_settlement
npv = self._npv_local(curves[0], curves[1], settlement, settlement)
npv = self._npv_local(curves_[0], curves_[1], settlement, settlement)
# scale price to par 100 (npv is already projected forward to settlement)
dirty_price = npv * 100 / -self.leg1.notional

if metric == "dirty_price":
return dirty_price
elif metric == "clean_price":
return dirty_price - self.accrued(settlement, curve=curves[0])
return dirty_price - self.accrued(settlement, curve=curves_[0])
elif metric == "spread":
_ = self.leg1._spread(-(npv + self.leg1.notional), curves[0], curves[1])
_ = self.leg1._spread(-(npv + self.leg1.notional), curves_[0], curves_[1])
z = _drb(0.0, self.float_spread)
return _ + z
elif metric == "ytm":
return self.ytm(
price=dirty_price, settlement=settlement, dirty=True, curve=curves[0]
price=dirty_price, settlement=settlement, dirty=True, curve=curves_[0]
)

raise ValueError("`metric` must be in {'dirty_price', 'clean_price', 'spread', 'ytm'}.")
Expand Down

0 comments on commit 9bee848

Please sign in to comment.