164 lines
6.1 KiB
Python
164 lines
6.1 KiB
Python
import numpy as np
|
|
import random
|
|
import matplotlib
|
|
import matplotlib.pyplot as plt
|
|
|
|
class KArmedBandit:
|
|
def __init__(self,
|
|
average_rewards = None,
|
|
arms = 10,
|
|
steps = 1000,
|
|
random_walk = False):
|
|
self.average_rewards = average_rewards
|
|
self.arms = arms
|
|
self.steps = steps
|
|
self.random_walk = random_walk
|
|
if self.average_rewards is None:
|
|
self.generate_average_rewards()
|
|
self.generate_rewards()
|
|
|
|
def generate_average_rewards(self):
|
|
if self.random_walk:
|
|
self.average_rewards = np.zeros(self.arms)
|
|
else:
|
|
self.average_rewards = np.random.normal(size=self.arms)
|
|
|
|
def generate_rewards(self):
|
|
if self.random_walk:
|
|
self.rewards = [[0.0] * self.steps] * self.arms
|
|
for step in range(self.steps):
|
|
# Slightly move all the averages
|
|
moves = np.random.normal(loc=0.0, scale=0.01, size=self.arms)
|
|
self.average_rewards = np.array(self.average_rewards) + moves
|
|
for action, average in enumerate(self.average_rewards):
|
|
self.rewards[action][step] = np.random.normal(loc=average, scale=1, size=1)
|
|
else:
|
|
self.rewards = self.sample_rewards(self.steps)
|
|
|
|
|
|
def sample_rewards(self, steps):
|
|
return [np.random.normal(loc=x, size=steps) for x in self.average_rewards]
|
|
|
|
def get_reward(self, action: int, step: int):
|
|
if action >= self.arms or action < 0:
|
|
ValueError("Action {} out of bounds. Actions can go from 0 to {}.".format(action, self.arms))
|
|
if step >= self.steps or step < 0:
|
|
ValueError("Step {} out of bounds. Current steps: {}".format(step, self.steps))
|
|
|
|
return self.rewards[action][step]
|
|
|
|
def get_max_reward_action(self):
|
|
return (max(self.average_rewards), np.argmax(self.average_rewards))
|
|
|
|
class EpsilonGreedy:
|
|
def __init__(self, epsilon: float, arms: int):
|
|
self.epsilon = epsilon
|
|
self.arms = arms
|
|
self.actions = range(self.arms)
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.estimated_values = [0.0] * self.arms
|
|
|
|
def update(self, action, reward, alpha):
|
|
self.estimated_values[action] = self.estimated_values[action] + alpha*(reward-self.estimated_values[action])
|
|
|
|
def choose_action(self) -> int:
|
|
explore = bool(np.random.binomial(1, self.epsilon))
|
|
if explore:
|
|
return random.choice(self.actions)
|
|
else:
|
|
# Select action with highest estimated reward. If multiple highest exist: select a random one.
|
|
max_estimate = max(self.estimated_values)
|
|
return np.random.choice([index for index, x in enumerate(self.estimated_values) if x == max_estimate])
|
|
|
|
class Average:
|
|
def __init__(self):
|
|
self.value = 0.0
|
|
self.count = 1
|
|
|
|
def update(self, value):
|
|
self.value = self.value + (1/self.count) * (value - self.value)
|
|
self.count += 1
|
|
|
|
class Plotter:
|
|
def __init__(self, karmedbandit: KArmedBandit, epsilongreedy: EpsilonGreedy, steps: int):
|
|
self.karmedbandit = karmedbandit
|
|
self.agent = epsilongreedy
|
|
self.steps = steps
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.average_rewards = [Average() for _ in range(steps)]
|
|
self.actions_counter = [0] * self.karmedbandit.arms
|
|
self.total_steps = 0
|
|
self.optimal_action_counter = 0
|
|
self.optimal_action_fraction = [Average() for _ in range(steps)]
|
|
|
|
def count_action(self, action):
|
|
self.actions_counter[action] += 1
|
|
self.total_steps += 1
|
|
|
|
def count_optimal_action(self, is_optimal: bool, step):
|
|
self.optimal_action_counter += 1
|
|
if is_optimal:
|
|
self.optimal_action_fraction[step].update(1)
|
|
else:
|
|
self.optimal_action_fraction[step].update(0)
|
|
|
|
def update_optimal_action(self, optimal_action, step):
|
|
# self.optimal_action_fraction[step].update(self.actions_counter[optimal_action] / (step+1))
|
|
self.optimal_action_fraction[step].update(self.optimal_action_counter / (step+1))
|
|
|
|
def run(self, runs=1):
|
|
for i in range(runs):
|
|
self.karmedbandit.generate_rewards()
|
|
# optimal_action = self.karmedbandit.get_max_reward_action()[1]
|
|
self.agent.reset()
|
|
self.actions_counter = [0] * self.karmedbandit.arms
|
|
self.optimal_action_counter = 0
|
|
for step in range(steps):
|
|
optimal_action = self.karmedbandit.get_max_reward_action()[1]
|
|
action = self.agent.choose_action()
|
|
reward = self.karmedbandit.get_reward(action, step)
|
|
# self.agent.update(action, reward, 1/(self.actions_counter[action]+1))
|
|
self.agent.update(action, reward, 0.1)
|
|
self.average_rewards[step].update(reward)
|
|
self.count_action(action)
|
|
if action == optimal_action:
|
|
self.count_optimal_action(True, step)
|
|
else:
|
|
self.count_optimal_action(False, step)
|
|
# self.update_optimal_action(optimal_action, step)
|
|
|
|
|
|
def plot(self):
|
|
fig, (ax1, ax2, ax3) = plt.subplots(nrows=3)
|
|
|
|
violin_data = self.karmedbandit.sample_rewards(steps=10000)
|
|
bp = ax1.violinplot(violin_data, showmeans=True, showmedians=False,
|
|
showextrema=False)
|
|
ax1.set_ylim([-4,4])
|
|
|
|
ax2.plot(range(self.steps), [x.value for x in self.average_rewards])
|
|
|
|
ylabel = "Average reward. Max average reward: {}".format(self.karmedbandit.get_max_reward_action()[0])
|
|
ax2.set(ylabel=ylabel, title='')
|
|
|
|
ax3.plot(range(self.steps), [x.value for x in self.optimal_action_fraction])
|
|
ax3.set_ylim([0,1])
|
|
|
|
plt.xlabel("Steps")
|
|
fig.savefig("exercise_2-5.png")
|
|
plt.show()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
arms = 10
|
|
steps = 1000
|
|
armedbandit = KArmedBandit(arms=arms, steps=steps, random_walk=False)
|
|
greedy = EpsilonGreedy(0.1, arms)
|
|
|
|
plotter = Plotter(armedbandit, greedy, steps)
|
|
plotter.run(runs=100)
|
|
plotter.plot() |