# mypy: ignore-errors
# pylint: skip-file
"""
A Box2D environment for Gymnasium which emulates the Falcon 9 barge landing
Original environment created by Reuben Ferrante
Modifications by Dylan Vogel and Gerasimos Maltezos for the 2023 Computational Control course offered at ETH Zurich
"""
import copy
import warnings
from typing import Dict, List, Tuple
import Box2D
import gymnasium as gym
import numpy as np
import pygame
from Box2D.b2 import (
circleShape,
contactListener,
edgeShape,
fixtureDef,
polygonShape,
revoluteJointDef,
)
from gymnasium import spaces
from do_dpc.environments.legacy_coco_rocket.env_cfg import State, EnvConfig, UserArgs
# for warnings
YELLOW = "\x1b[33;20m"
ENDL = "\x1b[0m"
# for referencing different parts of the state
DEGTORAD = np.pi / 180
XX = State.x.value
YY = State.y.value
X_DOT = State.x_dot.value
Y_DOT = State.y_dot.value
THETA = State.theta.value
THETA_DOT = State.theta_dot.value
LEFT_GROUND_CONTACT = State.left_ground_contact.value
RIGHT_GROUND_CONTACT = State.right_ground_contact.value
## CONTACT DETECTOR
## GYMNASIUM METHODS
[docs]
class RocketLander(gym.Env):
"""Gymnasum environment which models a Falcon 9 barge landing"""
# required by gymnasium
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 60}
def __init__(self, args: Dict, render_mode=None):
"""Constructor method"""
# load the environment configuration
self.cfg = EnvConfig()
self.args = UserArgs(**args)
# environment
self.world = Box2D.b2World(gravity=(0, self.cfg.gravity))
# set gymnasium action and observation spaces
self.action_space = spaces.Box(
low=np.array(
[
self.cfg.main_engine_thrust_limits[0],
self.cfg.side_engine_thrust_limits[0],
self.cfg.nozzle_angle_limits[0],
]
),
high=np.array(
[
self.cfg.main_engine_thrust_limits[1],
self.cfg.side_engine_thrust_limits[1],
self.cfg.nozzle_angle_limits[1],
]
),
dtype=np.float64,
)
self.observation_space = spaces.Box(
low=np.array([0, 0, -np.inf, -np.inf, -self.cfg.theta_limit, -np.inf, 0, 0]),
high=np.array(
[
self.cfg.width,
self.cfg.height,
np.inf,
np.inf,
self.cfg.theta_limit,
-np.inf,
1,
1,
]
),
dtype=np.float64,
)
# bodies
self.lander = None
self.legs = []
self.nozzle = None
self.barge = None
self.particles = []
self.lander_drawlist = []
# polygons
self.sea = None
self.sea_polys = []
self.sky_polys = []
self.clouds = []
# rendering
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
self.window = None
self.clock = None
self.canvas = None
# state variables
self.state = []
self.previous_state = None
self.game_over = False
self.contact_flag = False
self.prev_shaping = None
self.wind_idx = None
self.barge_idx = None
self.initial_barge_position = None
self.reset()
def reset(self, seed=None, options=None):
"""Reset the environment"""
super().reset(seed=seed)
np.random.seed(seed=seed)
self._destroy()
self.world.contactListener = ContactDetector(self)
# state variables
self.state = []
self.game_over = False
self.contact_flag = False
self.prev_shaping = None
self.wind_idx = np.random.randint(low=-9999, high=9999)
self.barge_idx = np.random.randint(low=-9999, high=9999)
# rendering
self.lander_drawlist = []
# set rocket initial position
new_state = None
# NOTE:
# - Directly setting the position of dynamic box2-py objects results in strange behaviour
# - We simply create the rocket at the correct initial position
# - Other state values can be set relatively error-free
pos_x, pos_y = 0.5 * self.cfg.width, 0.9 * self.cfg.height
if self.args.initial_state is not None:
assert len(self.args.initial_state) == 6, "Initial state is of incorrect length, should be length 6"
if self.args.random_initial_position:
warnings.warn(YELLOW + "WARN: initial_state is set but the initial position will be randomized" + ENDL)
if self.args.initial_position:
warnings.warn(YELLOW + "WARN: ignoring the initial_position setting since initial_state is set" + ENDL)
pos_x = self.args.initial_state[0] * self.cfg.width
pos_y = self.args.initial_state[1] * self.cfg.height
new_state = {
"x_dot": self.args.initial_state[2],
"y_dot": self.args.initial_state[3],
"theta": self.args.initial_state[4],
"theta_dot": self.args.initial_state[5],
}
elif self.args.initial_position is not None:
assert (
len(self.args.initial_position) == 3
), "Initial position is of incorrect length, should be length 3 (x, y, theta)"
if self.args.random_initial_position:
warnings.warn(YELLOW + "WARN: initial_state is set but the initial position will be randomized" + ENDL)
pos_x = self.args.initial_position[0] * self.cfg.width
pos_y = self.args.initial_position[1] * self.cfg.height
new_state = {
"theta": self.args.initial_position[2],
}
if self.args.random_initial_position:
pos_x = np.random.uniform(0, 1) * self.cfg.width
pos_y = np.random.uniform(0.4, 1) * self.cfg.height
new_state = {"theta": np.random.uniform(-1, 1) * (self.cfg.theta_limit / 2)}
# create dynamic bodies
self._create_rocket(pos_x, pos_y)
self.lander_drawlist = self.legs + [self.nozzle] + [self.lander]
# create environment objects
sea_y_values, sea_x_values = self._create_sea_height(self.cfg.sea_chunks_x)
self._create_environment(sea_y_values, sea_x_values)
self._create_clouds()
self._create_barge()
if self.args.initial_barge_position is not None:
self.barge.position = (
self.args.initial_barge_position[0] * self.cfg.width,
self.barge.position[1],
)
self.barge.angle = self.args.initial_barge_position[1]
self.initial_barge_position = list(self.barge.position) + [float(self.barge.angle)]
if new_state is not None:
self.adjust_dynamics(self.lander, **new_state)
obs, _, _, _, info = self.step(np.array([0, 0, 0]))
return obs, info
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]:
"""Simulate the environment forward by one step
Args:
action (np.ndarray): actions to apply
Returns:
Tuple[np.ndarray, float, bool, bool, dict]: observation, reward, done, _, info
"""
if isinstance(action, list):
action = np.array(action)
assert (
action.shape == self.action_space.shape
), f"Incorrect action shape, expected: {self.action_space.shape} but got: {action.shape}"
# adjust nozzle angle
nozzle_angle = float(action[2])
nozzle_angle = (
np.clip(
nozzle_angle,
a_min=self.cfg.nozzle_angle_limits[0],
a_max=self.cfg.nozzle_angle_limits[1],
)
* self.cfg.max_nozzle_angle
)
self.nozzle.angle = self.lander.angle + nozzle_angle
# apply forces
_ = self.__main_engines_force_computation(action)
_, _ = self.__side_engines_force_computation(action)
# apply disturbances
self.__apply_wind_disturbance()
self.__apply_barge_disturbance()
# perform simulation step
self.previous_state = self.state
self.state = self.__generate_state()
self._update_particles() # delete old particles; aesthetic
reward = self.__compute_reward(self.state, action)
# check ground contact
if (self.legs[0].ground_contact or self.legs[1].ground_contact) and self.contact_flag is False:
self.contact_flag = True
# check termination conditions
# add a small factor in case the controller goes right to the edge
done = False
if any(
[
self.game_over,
abs(self.state[THETA]) > self.cfg.theta_limit + 0.1,
self.state[XX] > self.cfg.width + 0.1,
self.state[XX] < 0 - 0.1,
self.state[YY] > self.cfg.height + 0.1,
]
):
done = True
reward = -100
# check if simulation finished in general
if not self.lander.awake:
done = True
reward = +100
# render if necessary
if self.render_mode == "human":
self._render_frame()
return (
np.array(self.state),
reward,
done,
False,
{},
)
def close(self):
super().close()
self._destroy()
if self.window is not None:
pygame.display.quit()
## QUERY FUNCTIONS
def get_mass_properties(self) -> Tuple[float, float]:
"""Get approximate mass and inertia of rocket body
We assume that most of the inertia is in the main body (rectangle) and neglect
contributions of the lander legs and nozzle
Returns:
Tuple[float, float]: mass, inertia
"""
return (
self.lander.mass + self.nozzle.mass + self.legs[0].mass + self.legs[1].mass,
self.lander.inertia,
)
def get_dimensional_properties(self) -> Tuple[float, float]:
"""Get the l1 and l2 dimensions of the rocket
- l1: longitudinal distance from rocket center to where the nozzle force is applied
- l2: longitudinal distance from rocket center to height at which the side engine force is applied
l1 and l2 are measured along the centerline of the rocket
Returns:
Tuple[float, float]: l1, l2
"""
l1 = (self.cfg.lander_length / 2) / self.cfg.scale
l2 = (self.cfg.side_engine_y_offset) / self.cfg.scale
return l1, l2
def get_landing_position(self) -> Tuple[float, float, float]:
"""Get the landing point at the center of the barge, including tilt
Returns:
Tuple[float, float, float]: x, y, theta position of the barge
"""
assert self.barge is not None, "Please call reset() first!"
pos = list(self.barge.position)
angle = copy.deepcopy(self.barge.angle) # prevent users from changing the barge angle
pos[0] -= np.sin(angle) * self.cfg.barge_height / 2
pos[1] += np.cos(angle) * self.cfg.barge_height / 2
rocket_base_to_center_len = (
np.cos(self.cfg.leg_initial_angle * DEGTORAD) * (self.cfg.leg_height / 2) + (self.cfg.lander_length / 2)
) / self.cfg.scale
rocket_base_to_center_len += self.cfg.landing_vertical_calibration
rocket_base_to_center = [
-np.sin(angle) * rocket_base_to_center_len,
np.cos(angle) * rocket_base_to_center_len,
]
pos[0] += rocket_base_to_center[0]
pos[1] += rocket_base_to_center[1]
pos.append(angle)
return pos
## ENVIRONMENT HELPER FUNCTIONS
def _destroy(self):
if self.barge is None:
# nothing has been created yet
return
# clean up environment
self.world.contactListener = None
self._clean_particles(True)
self.world.DestroyBody(self.barge)
self.barge = None
self.world.DestroyBody(self.lander)
self.lander = None
self.world.DestroyBody(self.nozzle)
self.nozzle = None
self.world.DestroyBody(self.legs[0])
self.world.DestroyBody(self.legs[1])
self.world.DestroyBody(self.sea)
self.sea = None
self.lander_drawlist = []
def __compute_reward(self, state: list, action: np.ndarray) -> float:
"""Generate an environment reward based on the current state and actions
Our reward is based on the lunar lander continuous reward
Args:
state (list): current state
action (np.ndarray): current action
Returns:
float: reward
"""
landing_pos = self.get_landing_position()
x_pos_norm = (state[0] - landing_pos[0]) / self.cfg.width
y_pos_norm = (state[1] - landing_pos[1]) / self.cfg.height
x_vel_norm = state[2] / self.cfg.width
y_vel_norm = state[3] / self.cfg.height
reward = 0
shaping = (
-100 * np.sqrt(x_pos_norm**2 + y_pos_norm**2)
- 10 * np.sqrt(x_vel_norm**2 + y_vel_norm**2)
- 100 * abs(state[4])
+ 20 * state[6]
+ 20 * state[7]
)
if self.prev_shaping is not None:
reward = shaping - self.prev_shaping
self.prev_shaping = shaping
reward -= action[0] * 0.30
reward -= action[1] * 0.03
return reward
def __main_engines_force_computation(self, action):
cmd_engine_thrust = 0
if action[0] > 0:
sin = np.sin(self.nozzle.angle)
cos = np.cos(self.nozzle.angle)
clipped_action = np.clip(
action[0],
a_min=self.cfg.main_engine_thrust_limits[0],
a_max=self.cfg.main_engine_thrust_limits[1],
)
cmd_engine_thrust = clipped_action * self.cfg.main_engine_thrust # inertial forces scale with the cubic
# apply at the base of the lander, not on the nozzle
force_pos = (
self.lander.position[0] + np.sin(self.lander.angle) * (self.cfg.lander_length / 2) / self.cfg.scale,
self.lander.position[1] - np.cos(self.lander.angle) * (self.cfg.lander_length / 2) / self.cfg.scale,
)
force_vector = (-sin * cmd_engine_thrust, cos * cmd_engine_thrust)
self.lander.ApplyForce(force_vector, force_pos, True)
# visual particle effects
# particle dispersion
angle_dispersion = self.np_random.uniform(-1.0, 1.0) * 15 * DEGTORAD
force_dispersion = self.np_random.uniform(0.7, 1.0)
particle_force_vector = (
np.sin(self.nozzle.angle + angle_dispersion) * cmd_engine_thrust * force_dispersion,
-np.cos(self.nozzle.angle + angle_dispersion) * cmd_engine_thrust * force_dispersion,
)
# scale with the lander_density
particle = self._create_particle(
self.cfg.lander_density
* np.pi
* (30 / self.cfg.lander_scaling) ** 2
/ self.cfg.scale**2, # 30 is a 'look good' factor, chosen qualitatively for the mass scaling
force_pos[0],
force_pos[1]
- np.cos(self.lander.angle)
* (self.cfg.lander_length / 24)
/ self.cfg.scale, # offset to bottom of nozzle
1.5, # particle life decrements by 0.1 every interval, dies at 0
radius=1.5 * self.cfg.lander_scaling,
)
particle.ApplyForce(particle_force_vector, force_pos, True)
return cmd_engine_thrust
def __side_engines_force_computation(self, action):
cmd_side_engine_thrust = 0
side_engine_dir = 1
if np.abs(action[1]) > 0:
clipped_action = np.clip(
action[1],
a_min=self.cfg.side_engine_thrust_limits[0],
a_max=self.cfg.side_engine_thrust_limits[1],
)
cmd_side_engine_thrust = (
clipped_action * self.cfg.side_engine_thrust
) # inertial forces scale with the cubic
# convention is that Fs = Fl - Fr
# Therefore if sign is positive we fire the left thruster
side_engine_dir = np.sign(cmd_side_engine_thrust)
cmd_side_engine_thrust = np.abs(cmd_side_engine_thrust)
# shorten some equations
sin = np.sin(self.lander.angle)
cos = np.cos(self.lander.angle)
# apply at side_engine_y_offset up the lander from the center, with some left/right offset
force_pos = (
self.lander.position[0]
+ ((-sin * self.cfg.side_engine_y_offset) - (side_engine_dir * cos * self.cfg.side_engine_x_offset))
/ self.cfg.scale,
self.lander.position[1]
+ ((cos * self.cfg.side_engine_y_offset) - (side_engine_dir * sin * self.cfg.side_engine_x_offset))
/ self.cfg.scale,
)
force_vector = (
side_engine_dir * cos * cmd_side_engine_thrust,
side_engine_dir * sin * cmd_side_engine_thrust,
)
self.lander.ApplyForce(force_vector, force_pos, True)
# visual particle effects
# particle dispersion
angle_dispersion = self.np_random.uniform(-1.0, 1.0) * 10 * DEGTORAD
force_dispersion = self.np_random.uniform(0.4, 1.0)
# opposite direction to force applied on the lander
particle_force_vector = (
-side_engine_dir
* np.cos(self.lander.angle + angle_dispersion)
* cmd_side_engine_thrust
* force_dispersion,
-side_engine_dir
* np.sin(self.lander.angle + angle_dispersion)
* cmd_side_engine_thrust
* force_dispersion,
)
# scale with the lander_density
particle = self._create_particle(
self.cfg.lander_density
* np.pi
* (20 / self.cfg.lander_scaling) ** 2
/ self.cfg.scale**2, # 20 is a 'look good' factor, chosen qualitatively for the mass scaling
force_pos[0],
force_pos[1],
0.5, # particle 'life' decrements by 0.1 each interval, expires at 0
radius=0.75 * self.cfg.lander_scaling,
)
particle.ApplyForce(particle_force_vector, force_pos, True)
return cmd_side_engine_thrust, side_engine_dir
def __generate_state(self):
"""Take one simulation step forward and return the new state
Returns:
list: x, y, x_dot, y_dot, theta, theta_dot, left_contact, right_contact
"""
self.world.Step(1.0 / self.cfg.fps, 6 * 30, 6 * 30)
pos = self.lander.position
vel = self.lander.linearVelocity
state = [
pos.x,
pos.y,
vel.x,
vel.y,
self.lander.angle,
self.lander.angularVelocity,
1 if self.legs[0].ground_contact else 0,
1 if self.legs[1].ground_contact else 0,
]
return state
def __apply_wind_disturbance(self):
"""Apply a wind force to the lander along the x-axis"""
if self.args.enable_wind and not (self.legs[0].ground_contact or self.legs[1].ground_contact):
wind_mag = (
np.tanh(np.sin(0.02 * self.wind_idx) + (np.sin(np.pi * 0.01 * self.wind_idx))) * self.cfg.wind_force
)
self.wind_idx += 1
self.lander.ApplyForceToCenter(
(wind_mag, 0.0),
True,
)
def __apply_barge_disturbance(self):
"""Apply x, y, theta offsets to the barge from the initial position"""
if self.args.enable_moving_barge:
x_rate = 0.01
y_rate = 0.006
theta_rate = 0.004
barge_x = (
np.tanh(np.sin(x_rate * self.barge_idx) + (np.sin(np.pi * (x_rate / 2) * self.barge_idx)))
* self.cfg.barge_width
/ 16
) + self.initial_barge_position[0]
barge_y = (
np.tanh(np.sin(y_rate * self.barge_idx) + (np.sin(np.pi * (y_rate / 2) * self.barge_idx)))
* self.cfg.barge_height
/ 4
) + self.initial_barge_position[1]
barge_theta = (
np.tanh(np.sin(theta_rate * self.barge_idx) + (np.sin(np.pi * (theta_rate / 2) * self.barge_idx)))
* 5
* DEGTORAD
) + self.initial_barge_position[2]
self.barge_idx += 1
self.barge.position = (barge_x, barge_y)
self.barge.angle = barge_theta
## RENDERING HELPERS
def _create_rocket(self, pos_x: float, pos_y: float):
"""Create the dynamic rocket object"""
# variables
lander_body_color = (255, 255, 255)
# lander body
self.lander = self.world.CreateDynamicBody(
position=(pos_x, pos_y),
angle=0.0,
fixtures=fixtureDef(
shape=polygonShape(
vertices=[(x / self.cfg.scale, y / self.cfg.scale) for x, y in self.cfg.lander_poly]
),
density=self.cfg.lander_density,
friction=0.1,
categoryBits=0x0010,
maskBits=0x0003, # contact ground and sea
restitution=0.0,
),
)
self.lander.color1 = lander_body_color
self.lander.color2 = (0, 0, 0)
# lander legs
self.legs = []
for i in [-1, +1]:
leg = self.world.CreateDynamicBody(
# will move when we attach the revolute joint
position=(pos_x, pos_y),
angle=0,
fixtures=fixtureDef(
# box assumes half-width and half-lengths
shape=polygonShape(
box=(
self.cfg.leg_width / (2 * self.cfg.scale),
self.cfg.leg_height / (2 * self.cfg.scale),
)
),
density=self.cfg.lander_density, # cannot assume massless because for some reason we need density to apply torque?
friction=0.1,
restitution=0.0,
categoryBits=0x0020,
maskBits=0x0003, # contact ground and sea
),
)
leg.ground_contact = False
leg.color1 = lander_body_color
leg.color2 = (0, 0, 0)
# join legs to lander body
revolute_joint = revoluteJointDef(
bodyA=self.lander,
bodyB=leg,
# basically, put the legs at 1/4 ond 3/4 across the body (radius/2), and align the center
localAnchorA=(
(i * self.cfg.lander_radius / 2) / self.cfg.scale,
(-self.cfg.lander_length / 2 + self.cfg.leg_height / 2) / self.cfg.scale,
),
localAnchorB=(0, (self.cfg.leg_height / 2) / self.cfg.scale),
enableMotor=True,
enableLimit=True,
maxMotorTorque=self.cfg.leg_spring_torque,
motorSpeed=1.0 * i,
)
# set angle limits
if i == -1:
revolute_joint.lowerAngle = -(self.cfg.leg_initial_angle + 2) * DEGTORAD
revolute_joint.upperAngle = -self.cfg.leg_initial_angle * DEGTORAD
else:
revolute_joint.lowerAngle = self.cfg.leg_initial_angle * DEGTORAD
revolute_joint.upperAngle = (self.cfg.leg_initial_angle + 2) * DEGTORAD
leg.joint = self.world.CreateJoint(revolute_joint)
self.legs.append(leg)
# nozzle
self.nozzle = self.world.CreateDynamicBody(
position=(pos_x, pos_y),
angle=0.0,
fixtures=fixtureDef(
shape=polygonShape(
vertices=[(x / self.cfg.scale, y / self.cfg.scale) for x, y in self.cfg.nozzle_poly]
),
density=self.cfg.lander_density,
friction=0.1,
categoryBits=0x0040,
maskBits=0x0003, # contact sea and ground
restitution=0.0,
),
)
self.nozzle.color1 = (0, 0, 0)
self.nozzle.color2 = (0, 0, 0)
# join nozzle to lander body
revolute_joint = revoluteJointDef(
bodyA=self.lander,
bodyB=self.nozzle,
localAnchorA=(0, -(self.cfg.lander_length / 2) / self.cfg.scale),
localAnchorB=(0, 0),
enableMotor=True,
enableLimit=False,
maxMotorTorque=self.cfg.nozzle_torque,
motorSpeed=0,
referenceAngle=0,
lowerAngle=self.cfg.max_nozzle_angle * self.cfg.nozzle_angle_limits[0],
upperAngle=self.cfg.max_nozzle_angle * self.cfg.nozzle_angle_limits[1],
)
self.nozzle.joint = self.world.CreateJoint(revolute_joint)
return
def _create_barge(self):
"""Create the landing barge"""
# start it in the center of the screen
barge_center = (0.5 * self.cfg.width, self.cfg.barge_y_pos)
self.barge = self.world.CreateStaticBody(
position=barge_center,
angle=0.0,
fixtures=fixtureDef(
shape=polygonShape(box=(self.cfg.barge_width / 2, self.cfg.barge_height / 2)),
categoryBits=0x0001,
maskBits=0xFFFF,
friction=self.cfg.barge_friction,
),
)
self.barge.color1 = (40, 40, 40)
self.barge.color2 = (25, 25, 25)
def _create_sea_height(self, chunks: int) -> Tuple[list, list]:
"""Create the sea height
Args:
chunks (int): number of chunks to divide the sea into
Returns:
_type_: _description_
"""
# ocean height
sea_height = np.random.normal(self.cfg.barge_y_pos, 0.5, size=(chunks + 1,))
sea_x = [self.cfg.width / (chunks - 1) * i for i in range(chunks)]
# set the height of the sea near the barge to be the barge height
sea_height[chunks // 2 - 2] = self.cfg.barge_y_pos
sea_height[chunks // 2 - 1] = self.cfg.barge_y_pos
sea_height[chunks // 2 + 0] = self.cfg.barge_y_pos
sea_height[chunks // 2 + 1] = self.cfg.barge_y_pos
sea_height[chunks // 2 + 2] = self.cfg.barge_y_pos
smoothed_sea_height = [
0.33 * (sea_height[i - 1] + sea_height[i + 0] + sea_height[i + 1]) for i in range(chunks)
]
return smoothed_sea_height, sea_x
def _create_environment(self, sea_y_values, sea_x_values):
"""Create the environment polygons"""
assert len(sea_y_values) == self.cfg.sea_chunks_x and len(sea_x_values) == self.cfg.sea_chunks_x
num_segments = self.cfg.sea_chunks_x
self.sea = self.world.CreateStaticBody(
# create bottom edge of the sea
shapes=edgeShape(vertices=[(0, 0), (self.cfg.width, 0)])
)
self.sky_polys = []
self.sea_polys = [[] for _ in range(self.cfg.sea_chunks_y)] # purely aesthetic
for i in range(num_segments - 1):
# p1 and p2 define the top edge of the sea chunk
p1 = (sea_x_values[i], sea_y_values[i])
p2 = (sea_x_values[i + 1], sea_y_values[i + 1])
# create collisions, add sky
self.sea.CreateEdgeFixture(
vertices=[p1, p2],
density=0,
friction=0.1,
categoryBits=0x0002,
maskBits=0x00FF,
) # create sea collisions
self.sky_polys.append([p1, p2, (p2[0], self.cfg.height), (p1[0], self.cfg.height)]) # draw sky above sea
# make sea aesthetic by adding gradient polygons
for j in range(self.cfg.sea_chunks_y):
# draw from top to bottom
k = 1 - j / self.cfg.sea_chunks_y
self.sea_polys[j].append([(p1[0], p1[1] * k), (p2[0], p2[1] * k), (p2[0], 0), (p1[0], 0)])
def _create_clouds(self):
self.clouds = []
for _ in range(10):
self.clouds.append(self._create_cloud([0.2, 0.4], [0.65, 0.7], 1))
self.clouds.append(self._create_cloud([0.7, 0.8], [0.75, 0.8], 1))
def _create_cloud(self, x_range, y_range, y_variance=0.1):
cloud_poly = []
numberofdiscretepoints = 3
initial_y = (self.cfg.viewport_height * np.random.uniform(y_range[0], y_range[1])) / self.cfg.scale
initial_x = (self.cfg.viewport_width * np.random.uniform(x_range[0], x_range[1])) / self.cfg.scale
y_coordinates = np.random.normal(0, y_variance, numberofdiscretepoints)
x_step = np.linspace(initial_x, initial_x + np.random.uniform(1, 6), numberofdiscretepoints + 1)
for i in range(0, numberofdiscretepoints):
cloud_poly.append((x_step[i], initial_y + np.sin(3.14 * 2 * i / 50) * y_coordinates[i]))
return cloud_poly
def _create_particle(self, mass, x, y, ttl, radius=3):
"""
Used for both the Main Engine and Side Engines
:param mass: Different mass to represent different forces
:param x: x position
:param y: y position
:param ttl:
:param radius:
:return:
"""
p = self.world.CreateDynamicBody(
position=(x, y),
angle=0.0,
fixtures=fixtureDef(
shape=circleShape(radius=radius / self.cfg.scale, pos=(0, 0)),
density=mass,
friction=0.1,
categoryBits=0x0100,
maskBits=0x0001, # collide only with ground, not with the sea
restitution=0.3,
),
)
p.ttl = ttl # ttl is decreased with every time step to determine if the particle should be destroyed
self.particles.append(p)
# Check if some particles need cleaning
self._clean_particles(False)
return p
def _update_particles(self):
for obj in self.particles:
obj.ttl -= 0.1
color_red = min(255, max(50, 50 + 255 * obj.ttl))
color_green_blue = min(255, max(50, 0.5 * 255 * obj.ttl))
obj.color1 = self._cast_tuple_to_int((color_red, color_green_blue, color_green_blue))
obj.color2 = self._cast_tuple_to_int((color_red, color_green_blue, color_green_blue))
self._clean_particles(False)
def _clean_particles(self, all_particles):
while self.particles and (all_particles or self.particles[0].ttl < 0):
self.world.DestroyBody(self.particles.pop(0))
def render(self):
if self.render_mode == "rgb_array":
return self._render_frame()
def _render_frame(self):
"""Render a single frame"""
if self.window is None and self.render_mode == "human":
pygame.display.init()
self.window = pygame.display.set_mode((self.cfg.viewport_width, self.cfg.viewport_height))
if self.clock is None and self.render_mode == "human":
self.clock = pygame.time.Clock()
self.canvas = pygame.Surface((self.cfg.viewport_width, self.cfg.viewport_height))
pygame.transform.scale(self.canvas, (self.cfg.scale, self.cfg.scale))
self.canvas.fill((255, 255, 255))
self._render_sky()
self._render_lander()
self._render_environment()
if self.args.render_lander_center_position:
self._draw_marker(
x=self.lander.position.x,
y=self.lander.position.y,
theta=self.lander.angle,
color=(0, 0, 0),
)
if self.args.render_landing_position:
landing_pos = self.get_landing_position()
self._draw_marker(
x=landing_pos[0],
y=landing_pos[1],
theta=landing_pos[2],
color=(11, 218, 81),
)
self.canvas = pygame.transform.flip(self.canvas, False, True)
if self.render_mode == "human":
# copies our drawings from canvas to visible window
self.window.blit(self.canvas, self.canvas.get_rect())
pygame.event.pump()
pygame.display.update()
# to ensure that human rendering occurs at a predefined framerate
self.clock.tick(self.metadata["render_fps"])
else: # rgb_array
return np.transpose(np.array(pygame.surfarray.pixels3d(self.canvas)), axes=(1, 0, 2))
def _render_lander(self):
for obj in self.particles + self.lander_drawlist:
for f in obj.fixtures:
trans = f.body.transform
if type(f.shape) is circleShape:
pygame.draw.circle(
self.canvas,
color=obj.color1,
center=trans * f.shape.pos * self.cfg.scale,
radius=f.shape.radius * self.cfg.scale,
)
pygame.draw.circle(
self.canvas,
color=obj.color2,
center=trans * f.shape.pos * self.cfg.scale,
radius=f.shape.radius * self.cfg.scale,
)
else:
# Lander
path = [trans * v for v in f.shape.vertices]
pygame.draw.polygon(
self.canvas,
color=obj.color1,
points=self._scale_list_of_tuples(path, self.cfg.scale),
)
pygame.draw.aalines(
self.canvas,
color=obj.color2,
points=self._scale_list_of_tuples(path, self.cfg.scale),
closed=True,
)
def _render_sky(self):
for p in self.sky_polys:
pygame.draw.polygon(
self.canvas,
color=(212, 234, 255),
points=self._scale_list_of_tuples(p, self.cfg.scale),
)
if self.cfg.clouds:
for x in self.clouds:
pygame.draw.polygon(
self.canvas,
color=(255, 255, 255),
points=self._scale_list_of_tuples(x, self.cfg.scale),
)
def _render_environment(self):
"""Render the sea and barge"""
for i, s in enumerate(self.sea_polys):
k = 1 - (i + 1) / self.cfg.sea_chunks_y
for poly in s:
pygame.draw.polygon(
self.canvas,
color=self._cast_tuple_to_int((0, 0.5 * 255 * k, min(255, 255 * k + 128))),
points=self._scale_list_of_tuples(poly, self.cfg.scale),
)
# barge
for f in self.barge.fixtures:
trans = f.body.transform
path = [trans * v for v in f.shape.vertices]
pygame.draw.polygon(
self.canvas,
color=self.barge.color1,
points=self._scale_list_of_tuples(path, self.cfg.scale),
)
pygame.draw.aalines(
self.canvas,
color=self.barge.color2,
points=self._scale_list_of_tuples(path, self.cfg.scale),
closed=True,
)
# landing flags
trans = self.barge.fixtures[0].body.transform # should only be one
for ind in [-1, +1]:
# set some x and y points for the flag
flag_x = ind * (3 / 4) * self.cfg.barge_width / 2
flag_y = self.cfg.barge_height / 2
# these units are meters
flag_pole_length = 0.8
flag_triangle_height = 0.3
flag_triangle_width = 0.8
flag_triangle = [
(flag_x, flag_y + flag_pole_length),
(flag_x, flag_y + flag_pole_length + flag_triangle_height),
(
flag_x + flag_triangle_width,
flag_y + flag_pole_length + flag_triangle_height / 2,
),
]
# flag_triangle = polygonShape(vertices=flag_triangle)
flag_triangle = [trans * v for v in flag_triangle]
pygame.draw.polygon(
self.canvas,
color=(255, 0, 0),
points=self._scale_list_of_tuples(flag_triangle, self.cfg.scale),
)
pygame.draw.lines(
self.canvas,
color=(0, 0, 0),
closed=True,
points=self._scale_list_of_tuples(flag_triangle, self.cfg.scale),
)
flag_pole = [trans * v for v in [(flag_x, flag_y), (flag_x, flag_y + flag_pole_length)]]
pygame.draw.lines(
self.canvas,
color=(128, 128, 128),
closed=True,
points=self._scale_list_of_tuples(flag_pole, self.cfg.scale),
)
def _draw_marker(self, x, y, theta, color: Tuple[int, ...] = (0, 0, 0)):
"""Draw a marker
Args:
x (_type_): _description_
y (_type_): _description_
theta (_type_): _description_
"""
offset = 0.2
cross_vert = [
(x + offset * np.sin(theta), y - offset * np.cos(theta)),
(x - offset * np.sin(theta), y + offset * np.cos(theta)),
]
cross_horiz = [
(x - offset * np.cos(theta), y - offset * np.sin(theta)),
(x + offset * np.cos(theta), y + offset * np.sin(theta)),
]
pygame.draw.lines(
self.canvas,
color=color,
closed=False,
points=self._scale_list_of_tuples(cross_horiz, self.cfg.scale),
width=2,
)
pygame.draw.lines(
self.canvas,
color=color,
closed=False,
points=self._scale_list_of_tuples(cross_vert, self.cfg.scale),
width=2,
)
## MISC. HELPERS
@staticmethod
def _cast_tuple_to_int(t: Tuple) -> Tuple:
return tuple([int(item) for item in t])
@staticmethod
def _scale_list_of_tuples(l: List[Tuple], scale: int) -> List[Tuple]:
out = []
for t in l:
out.append(tuple([item * scale for item in t]))
return out
def adjust_dynamics(self, body, **kwargs):
"""Adjust dynamic parameters of a body"""
# position is weird, only doing velocities and angles
if kwargs.get("x_dot"):
body.linearVelocity.x = kwargs["x_dot"]
if kwargs.get("y_dot"):
body.linearVelocity.y = kwargs["y_dot"]
if kwargs.get("theta"):
body.angle = kwargs["theta"]
if kwargs.get("theta_dot"):
body.angularVelocity = kwargs["theta_dot"]
self.state = self.__generate_state()