A common term in finance and reinforcement learning is the discounted cumulative reward C[i]
based on a time series of raw rewards R[i]
. Given an array R
, we'd like to calculate C[i]
satisfying the recurrence C[i] = R[i] + discount * C[i+1]
with C[-1] = R[-1]
(and return the full array C
).
A numerically stable way of calculating this in python with numpy arrays might be:
import numpy as np
def cumulative_discount(rewards, discount):
future_cumulative_reward = 0
assert np.issubdtype(rewards.dtype, np.floating), rewards.dtype
cumulative_rewards = np.empty_like(rewards)
for i in range(len(rewards) - 1, -1, -1):
cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
future_cumulative_reward = cumulative_rewards[i]
return cumulative_rewards
However, this relies on a python loop. Given that this is such a common calculation, surely there's an existing vectorized solution relying on some other standard functions without resorting to cythonization.
Note that any solution using something like np.power(discount, np.arange(len(rewards))
won't be stable.
You could use scipy.signal.lfilter to solve the recurrence relation:
def alt(rewards, discount):
"""
C[i] = R[i] + discount * C[i+1]
signal.lfilter(b, a, x, axis=-1, zi=None)
a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
- a[1]*y[n-1] - ... - a[N]*y[n-N]
"""
r = rewards[::-1]
a = [1, -discount]
b = [1]
y = signal.lfilter(b, a, x=r)
return y[::-1]
This script tests that the result is the same:
import scipy.signal as signal
import numpy as np
def orig(rewards, discount):
future_cumulative_reward = 0
cumulative_rewards = np.empty_like(rewards, dtype=np.float64)
for i in range(len(rewards) - 1, -1, -1):
cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
future_cumulative_reward = cumulative_rewards[i]
return cumulative_rewards
def alt(rewards, discount):
"""
C[i] = R[i] + discount * C[i+1]
signal.lfilter(b, a, x, axis=-1, zi=None)
a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
- a[1]*y[n-1] - ... - a[N]*y[n-N]
"""
r = rewards[::-1]
a = [1, -discount]
b = [1]
y = signal.lfilter(b, a, x=r)
return y[::-1]
# test that the result is the same
np.random.seed(2017)
for i in range(100):
rewards = np.random.random(10000)
discount = 1.01
expected = orig(rewards, discount)
result = alt(rewards, discount)
if not np.allclose(expected,result):
print('FAIL: {}({}, {})'.format('alt', rewards, discount))
break
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With