mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-10 23:48:01 -05:00
added deep deterministic policy gradient example (#531)
This commit is contained in:
256
examples/deep_deterministic_policy_gradient.py
Normal file
256
examples/deep_deterministic_policy_gradient.py
Normal file
@@ -0,0 +1,256 @@
|
||||
from typing import Optional, Tuple
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from tinygrad.tensor import Tensor
|
||||
import tinygrad.nn.optim as optim
|
||||
from tinygrad.helpers import getenv
|
||||
from extra.utils import get_parameters
|
||||
|
||||
import numpy as np
|
||||
import gym
|
||||
|
||||
|
||||
DEVICE = "GPU" if getenv("GPU") else "CPU"
|
||||
|
||||
|
||||
class Actor:
|
||||
def __init__(self, num_actions: int, num_states: int, hidden_size: Tuple[int, int] = (400, 300)):
|
||||
self.l1 = Tensor.glorot_uniform(num_states, hidden_size[0])
|
||||
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
|
||||
self.mu = Tensor.glorot_uniform(hidden_size[1], num_actions)
|
||||
|
||||
def forward(self, state: Tensor, upper_bound: float) -> Tensor:
|
||||
out = state.dot(self.l1).relu()
|
||||
out = out.dot(self.l2).relu()
|
||||
out = out.dot(self.mu).tanh()
|
||||
output = out * upper_bound
|
||||
|
||||
return output
|
||||
|
||||
|
||||
class Critic:
|
||||
def __init__(self, num_inputs: int, hidden_size: Tuple[int, int] = (400, 300)):
|
||||
self.l1 = Tensor.glorot_uniform(num_inputs, hidden_size[0])
|
||||
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
|
||||
self.q = Tensor.glorot_uniform(hidden_size[1], 1)
|
||||
|
||||
def forward(self, state: Tensor, action: Tensor) -> Tensor:
|
||||
inputs = state.cat(action, dim=1)
|
||||
out = inputs.dot(self.l1).relu()
|
||||
out = out.dot(self.l2).relu()
|
||||
q = out.dot(self.q)
|
||||
|
||||
return q
|
||||
|
||||
|
||||
class Buffer:
|
||||
def __init__(self, num_actions: int, num_states: int, buffer_capacity: int = 100000, batch_size: int = 64):
|
||||
self.buffer_capacity = buffer_capacity
|
||||
self.batch_size = batch_size
|
||||
|
||||
self.buffer_counter = 0
|
||||
|
||||
self.state_buffer = np.zeros((self.buffer_capacity, num_states))
|
||||
self.action_buffer = np.zeros((self.buffer_capacity, num_actions))
|
||||
self.reward_buffer = np.zeros((self.buffer_capacity, 1))
|
||||
self.next_state_buffer = np.zeros((self.buffer_capacity, num_states))
|
||||
self.done_buffer = np.zeros((self.buffer_capacity, 1))
|
||||
|
||||
def record(
|
||||
self, observations: Tuple[Tensor, NDArray, float, NDArray, bool]
|
||||
) -> None:
|
||||
index = self.buffer_counter % self.buffer_capacity
|
||||
|
||||
self.state_buffer[index] = observations[0].detach().numpy()
|
||||
self.action_buffer[index] = observations[1]
|
||||
self.reward_buffer[index] = observations[2]
|
||||
self.next_state_buffer[index] = observations[3]
|
||||
self.done_buffer[index] = observations[4]
|
||||
|
||||
self.buffer_counter += 1
|
||||
|
||||
def sample(self) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor]:
|
||||
record_range = min(self.buffer_counter, self.buffer_capacity)
|
||||
batch_indices = np.random.choice(record_range, self.batch_size)
|
||||
|
||||
state_batch = Tensor(self.state_buffer[batch_indices], device=DEVICE, requires_grad=False)
|
||||
action_batch = Tensor(self.action_buffer[batch_indices], device=DEVICE, requires_grad=False)
|
||||
reward_batch = Tensor(self.reward_buffer[batch_indices], device=DEVICE, requires_grad=False)
|
||||
next_state_batch = Tensor(self.next_state_buffer[batch_indices], device=DEVICE, requires_grad=False)
|
||||
done_batch = Tensor(self.done_buffer[batch_indices], device=DEVICE, requires_grad=False)
|
||||
|
||||
return state_batch, action_batch, reward_batch, next_state_batch, done_batch
|
||||
|
||||
|
||||
class GaussianActionNoise:
|
||||
def __init__(self, mean: NDArray, std_deviation: NDArray):
|
||||
self.mean = mean
|
||||
self.std_dev = std_deviation
|
||||
|
||||
def __call__(self) -> Tensor:
|
||||
return Tensor(
|
||||
np.random.default_rng()
|
||||
.normal(self.mean, self.std_dev, size=self.mean.shape)
|
||||
.astype(np.float32),
|
||||
device=DEVICE,
|
||||
requires_grad=False,
|
||||
)
|
||||
|
||||
|
||||
class DeepDeterministicPolicyGradient:
|
||||
"""Deep Deterministic Policy Gradient (DDPG).
|
||||
|
||||
https://arxiv.org/pdf/1509.02971.pdf
|
||||
|
||||
Args:
|
||||
env: The environment to learn from.
|
||||
lr_actor: The learning rate of the actor.
|
||||
lr_critic: The learning rate of the critic.
|
||||
gamma: The discount factor.
|
||||
buffer_capacity: The size of the replay buffer.
|
||||
tau: The soft update coefficient.
|
||||
hidden_size: The number of neurons in the hidden layers of the actor and critic networks.
|
||||
batch_size: The minibatch size for each gradient update.
|
||||
noise_stddev: The standard deviation of the exploration noise.
|
||||
|
||||
Note:
|
||||
In contrast to the original paper, actions are already included in the first layer
|
||||
of the Critic and we use a Gaussian distribution instead of an Ornstein Uhlenbeck
|
||||
process for exploration noise.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
env: gym.Env,
|
||||
lr_actor: float = 0.001,
|
||||
lr_critic: float = 0.002,
|
||||
gamma: float = 0.99,
|
||||
buffer_capacity: int = 100000,
|
||||
tau: float = 0.005,
|
||||
hidden_size: Tuple[int, int] = (400, 300),
|
||||
batch_size: int = 64,
|
||||
noise_stddev: float = 0.1,
|
||||
):
|
||||
self.num_states = env.observation_space.shape[0]
|
||||
self.num_actions = env.action_space.shape[0]
|
||||
self.max_action = env.action_space.high.item()
|
||||
self.min_action = env.action_space.low.item()
|
||||
self.gamma = gamma
|
||||
self.tau = tau
|
||||
self.memory = Buffer(
|
||||
self.num_actions, self.num_states, buffer_capacity, batch_size
|
||||
)
|
||||
self.batch_size = batch_size
|
||||
|
||||
self.noise = GaussianActionNoise(
|
||||
mean=np.zeros(self.num_actions),
|
||||
std_deviation=noise_stddev * np.ones(self.num_actions),
|
||||
)
|
||||
|
||||
self.actor = Actor(self.num_actions, self.num_states, hidden_size)
|
||||
self.critic = Critic(self.num_actions + self.num_states, hidden_size)
|
||||
self.target_actor = Actor(self.num_actions, self.num_states, hidden_size)
|
||||
self.target_critic = Critic(self.num_actions + self.num_states, hidden_size)
|
||||
|
||||
actor_params = get_parameters(self.actor)
|
||||
critic_params = get_parameters(self.critic)
|
||||
target_actor_params = get_parameters(self.target_actor)
|
||||
target_critic_params = get_parameters(self.target_critic)
|
||||
|
||||
if DEVICE == "GPU":
|
||||
[x.gpu_() for x in actor_params + critic_params + target_actor_params + target_critic_params]
|
||||
|
||||
self.actor_optimizer = optim.Adam(actor_params, lr_actor)
|
||||
self.critic_optimizer = optim.Adam(critic_params, lr_critic)
|
||||
|
||||
self.update_network_parameters(tau=1.0)
|
||||
|
||||
def update_network_parameters(self, tau: Optional[float] = None) -> None:
|
||||
"""Updates the parameters of the target networks via 'soft updates'."""
|
||||
if tau is None:
|
||||
tau = self.tau
|
||||
|
||||
for param, target_param in zip(
|
||||
get_parameters(self.actor), get_parameters(self.target_actor)
|
||||
):
|
||||
target_param.assign(param * tau + target_param * (1.0 - tau))
|
||||
|
||||
for param, target_param in zip(
|
||||
get_parameters(self.critic), get_parameters(self.target_critic)
|
||||
):
|
||||
target_param.assign(param * tau + target_param * (1.0 - tau))
|
||||
|
||||
def choose_action(self, state: Tensor, evaluate: bool = False) -> NDArray:
|
||||
mu = self.actor.forward(state, self.max_action)
|
||||
|
||||
if not evaluate:
|
||||
mu = mu.add(self.noise())
|
||||
|
||||
mu = mu.clip(self.min_action, self.max_action)
|
||||
|
||||
return mu.detach().numpy()
|
||||
|
||||
def learn(self) -> None:
|
||||
"""Performs a learning step by sampling from replay buffer and updating networks."""
|
||||
if self.memory.buffer_counter < self.batch_size:
|
||||
return
|
||||
|
||||
(
|
||||
state_batch,
|
||||
action_batch,
|
||||
reward_batch,
|
||||
next_state_batch,
|
||||
done_batch,
|
||||
) = self.memory.sample()
|
||||
|
||||
target_actions = self.target_actor.forward(next_state_batch, self.max_action)
|
||||
y = reward_batch + self.gamma * self.target_critic.forward(
|
||||
next_state_batch, target_actions.detach()
|
||||
) * (Tensor.ones(*done_batch.shape, device=DEVICE, requires_grad=False) - done_batch)
|
||||
|
||||
self.critic_optimizer.zero_grad()
|
||||
critic_value = self.critic.forward(state_batch, action_batch)
|
||||
critic_loss = y.detach().sub(critic_value).pow(2).mean()
|
||||
critic_loss.backward()
|
||||
self.critic_optimizer.step()
|
||||
|
||||
self.actor_optimizer.zero_grad()
|
||||
actions = self.actor.forward(state_batch, self.max_action)
|
||||
critic_value = self.critic.forward(state_batch, actions)
|
||||
actor_loss = -critic_value.mean()
|
||||
actor_loss.backward()
|
||||
self.actor_optimizer.step()
|
||||
|
||||
self.update_network_parameters()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
env = gym.make("Pendulum-v1")
|
||||
agent = DeepDeterministicPolicyGradient(env)
|
||||
num_episodes = 150
|
||||
|
||||
for episode in range(1, num_episodes+1):
|
||||
cumulative_reward = 0.0
|
||||
prev_state, info = env.reset() # for older gym versions only state is returned, so remove info
|
||||
done = False
|
||||
|
||||
while not done:
|
||||
prev_state = Tensor(prev_state, device=DEVICE, requires_grad=False)
|
||||
action = agent.choose_action(prev_state)
|
||||
|
||||
state, reward, done, _, info = env.step(action) # for older gym versions there is only one bool, so remove _
|
||||
|
||||
cumulative_reward += reward
|
||||
|
||||
agent.memory.record((prev_state, action, reward, state, done))
|
||||
agent.learn()
|
||||
|
||||
if done:
|
||||
break
|
||||
|
||||
prev_state = state
|
||||
|
||||
print(
|
||||
f"Episode {episode}/{num_episodes} - cumulative reward: {cumulative_reward}"
|
||||
)
|
||||
Reference in New Issue
Block a user