The following issues were found
gym/envs/classic_control/acrobot.py
47 issues
Line: 108
Column: 5
self.state = self.np_random.uniform(low=-0.1, high=0.1, size=(4,))
return self._get_ob()
def step(self, a):
s = self.state
torque = self.AVAIL_TORQUE[a]
# Add noise to the force action
if self.torque_noise_max > 0:
Reported by Pylint.
Line: 148
Column: 34
s = self.state
return bool(-cos(s[0]) - cos(s[1] + s[0]) > 1.0)
def _dsdt(self, s_augmented, t):
m1 = self.LINK_MASS_1
m2 = self.LINK_MASS_2
l1 = self.LINK_LENGTH_1
lc1 = self.LINK_COM_POS_1
lc2 = self.LINK_COM_POS_2
Reported by Pylint.
Line: 197
Column: 13
if self.viewer is None:
self.viewer = rendering.Viewer(500, 500)
bound = self.LINK_LENGTH_1 + self.LINK_LENGTH_2 + 0.2 # 2.2 for default
self.viewer.set_bounds(-bound, bound, -bound, bound)
if s is None:
return None
Reported by Pylint.
Line: 109
Column: 9
return self._get_ob()
def step(self, a):
s = self.state
torque = self.AVAIL_TORQUE[a]
# Add noise to the force action
if self.torque_noise_max > 0:
torque += self.np_random.uniform(
Reported by Pylint.
Line: 122
Column: 9
# _dsdt
s_augmented = np.append(s, torque)
ns = rk4(self._dsdt, s_augmented, [0, self.dt])
# only care about final timestep of integration returned by integrator
ns = ns[-1]
ns = ns[:4] # omit action
# ODEINT IS TOO SLOW!
# ns_continuous = integrate.odeint(self._dsdt, self.s_continuous, [0, self.dt])
Reported by Pylint.
Line: 124
Column: 9
ns = rk4(self._dsdt, s_augmented, [0, self.dt])
# only care about final timestep of integration returned by integrator
ns = ns[-1]
ns = ns[:4] # omit action
# ODEINT IS TOO SLOW!
# ns_continuous = integrate.odeint(self._dsdt, self.s_continuous, [0, self.dt])
# self.s_continuous = ns_continuous[-1] # We only care about the state
# at the ''final timestep'', self.dt
Reported by Pylint.
Line: 125
Column: 9
ns = rk4(self._dsdt, s_augmented, [0, self.dt])
# only care about final timestep of integration returned by integrator
ns = ns[-1]
ns = ns[:4] # omit action
# ODEINT IS TOO SLOW!
# ns_continuous = integrate.odeint(self._dsdt, self.s_continuous, [0, self.dt])
# self.s_continuous = ns_continuous[-1] # We only care about the state
# at the ''final timestep'', self.dt
Reported by Pylint.
Line: 141
Column: 9
return (self._get_ob(), reward, terminal, {})
def _get_ob(self):
s = self.state
return np.array([cos(s[0]), sin(s[0]), cos(s[1]), sin(s[1]), s[2], s[3]])
def _terminal(self):
s = self.state
return bool(-cos(s[0]) - cos(s[1] + s[0]) > 1.0)
Reported by Pylint.
Line: 145
Column: 9
return np.array([cos(s[0]), sin(s[0]), cos(s[1]), sin(s[1]), s[2], s[3]])
def _terminal(self):
s = self.state
return bool(-cos(s[0]) - cos(s[1] + s[0]) > 1.0)
def _dsdt(self, s_augmented, t):
m1 = self.LINK_MASS_1
m2 = self.LINK_MASS_2
Reported by Pylint.
Line: 148
Column: 5
s = self.state
return bool(-cos(s[0]) - cos(s[1] + s[0]) > 1.0)
def _dsdt(self, s_augmented, t):
m1 = self.LINK_MASS_1
m2 = self.LINK_MASS_2
l1 = self.LINK_LENGTH_1
lc1 = self.LINK_COM_POS_1
lc2 = self.LINK_COM_POS_2
Reported by Pylint.
gym/wrappers/monitoring/video_recorder.py
44 issues
Line: 399
Column: 25
logger.debug('Starting %s with "%s"', self.backend, " ".join(self.cmdline))
if hasattr(os, "setsid"): # setsid not present on Windows
self.proc = subprocess.Popen(
self.cmdline, stdin=subprocess.PIPE, preexec_fn=os.setsid
)
else:
self.proc = subprocess.Popen(self.cmdline, stdin=subprocess.PIPE)
Reported by Pylint.
Line: 432
Column: 9
self.proc.stdin.write(frame.tobytes())
else:
self.proc.stdin.write(frame.tostring())
except Exception as e:
stdout, stderr = self.proc.communicate()
logger.error("VideoRecorder encoder failed: %s", stderr)
def close(self):
self.proc.stdin.close()
Reported by Pylint.
Line: 432
Column: 16
self.proc.stdin.write(frame.tobytes())
else:
self.proc.stdin.write(frame.tostring())
except Exception as e:
stdout, stderr = self.proc.communicate()
logger.error("VideoRecorder encoder failed: %s", stderr)
def close(self):
self.proc.stdin.close()
Reported by Pylint.
Line: 433
Column: 13
else:
self.proc.stdin.write(frame.tostring())
except Exception as e:
stdout, stderr = self.proc.communicate()
logger.error("VideoRecorder encoder failed: %s", stderr)
def close(self):
self.proc.stdin.close()
ret = self.proc.wait()
Reported by Pylint.
Line: 1
Column: 1
import json
import os
import os.path
import subprocess
import tempfile
from io import StringIO
import distutils.spawn
import distutils.version
Reported by Pylint.
Line: 4
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import json
import os
import os.path
import subprocess
import tempfile
from io import StringIO
import distutils.spawn
import distutils.version
Reported by Bandit.
Line: 15
Column: 1
from gym import error, logger
def touch(path):
open(path, "a").close()
class VideoRecorder(object):
"""VideoRecorder renders a nice movie of a rollout, frame by frame. It
Reported by Pylint.
Line: 19
Column: 1
open(path, "a").close()
class VideoRecorder(object):
"""VideoRecorder renders a nice movie of a rollout, frame by frame. It
comes with an `enabled` option so you can still use the same code
on episodes where you don't want to record video.
Note:
Reported by Pylint.
Line: 19
Column: 1
open(path, "a").close()
class VideoRecorder(object):
"""VideoRecorder renders a nice movie of a rollout, frame by frame. It
comes with an `enabled` option so you can still use the same code
on episodes where you don't want to record video.
Note:
Reported by Pylint.
Line: 31
Column: 1
Args:
env (Env): Environment to take video of.
path (Optional[str]): Path to the video file; will be randomly chosen if omitted.
base_path (Optional[str]): Alternatively, path to the video file without extension, which will be added.
metadata (Optional[dict]): Contents to save to the metadata file.
enabled (bool): Whether to actually record video, or just no-op (for convenience)
"""
def __init__(self, env, path=None, metadata=None, enabled=True, base_path=None):
Reported by Pylint.
gym/envs/algorithmic/tests/test_algorithmic.py
43 issues
Line: 34
Column: 19
CANNED_INPUT = [0, 1]
ENV_KLS = alg.copy_.CopyEnv
LEFT, RIGHT = ENV_KLS._movement_idx("left"), ENV_KLS._movement_idx("right")
def setUp(self):
self.env = self.ENV_KLS(base=2, chars=True)
imprint(self.env, self.CANNED_INPUT)
Reported by Pylint.
Line: 34
Column: 50
CANNED_INPUT = [0, 1]
ENV_KLS = alg.copy_.CopyEnv
LEFT, RIGHT = ENV_KLS._movement_idx("left"), ENV_KLS._movement_idx("right")
def setUp(self):
self.env = self.ENV_KLS(base=2, chars=True)
imprint(self.env, self.CANNED_INPUT)
Reported by Pylint.
Line: 52
Column: 9
self.assertGreater(reward, 0)
def test_bad_output_fail_fast(self):
obs = self.env.reset()
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 1])
self.assertTrue(done)
self.assertLess(reward, 0)
def test_levelup(self):
Reported by Pylint.
Line: 58
Column: 9
self.assertLess(reward, 0)
def test_levelup(self):
obs = self.env.reset()
# Kind of a hack
self.env.reward_shortfalls = []
min_length = self.env.min_length
for i in range(self.env.last):
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 0])
Reported by Pylint.
Line: 63
Column: 18
self.env.reward_shortfalls = []
min_length = self.env.min_length
for i in range(self.env.last):
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 0])
self.assertFalse(done)
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 1])
self.assertTrue(done)
self.env.reset()
if i < self.env.last - 1:
Reported by Pylint.
Line: 96
Column: 13
def test_grid_naviation(self):
env = alg.reversed_addition.ReversedAdditionEnv(rows=2, base=6)
N, S, E, W = [
env._movement_idx(named_dir)
for named_dir in ["up", "down", "right", "left"]
]
# Corresponds to a grid that looks like...
# 0 1 2
# 3 4 5
Reported by Pylint.
Line: 137
Column: 9
self.assertEqual(done, i == len(target) - 1)
def test_sane_time_limit(self):
obs = self.env.reset()
self.assertLess(self.env.time_limit, 100)
for _ in range(100):
obs, r, done, _ = self.env.step([self.LEFT, 0, 0])
if done:
return
Reported by Pylint.
Line: 140
Column: 18
obs = self.env.reset()
self.assertLess(self.env.time_limit, 100)
for _ in range(100):
obs, r, done, _ = self.env.step([self.LEFT, 0, 0])
if done:
return
self.fail("Time limit wasn't enforced")
def test_rendering(self):
Reported by Pylint.
Line: 148
Column: 26
def test_rendering(self):
env = self.env
env.reset()
self.assertEqual(env._get_str_obs(), "A")
self.assertEqual(env._get_str_obs(1), "B")
self.assertEqual(env._get_str_obs(-1), " ")
self.assertEqual(env._get_str_obs(2), " ")
self.assertEqual(env._get_str_target(0), "A")
self.assertEqual(env._get_str_target(1), "B")
Reported by Pylint.
Line: 149
Column: 26
env = self.env
env.reset()
self.assertEqual(env._get_str_obs(), "A")
self.assertEqual(env._get_str_obs(1), "B")
self.assertEqual(env._get_str_obs(-1), " ")
self.assertEqual(env._get_str_obs(2), " ")
self.assertEqual(env._get_str_target(0), "A")
self.assertEqual(env._get_str_target(1), "B")
# Test numerical alphabet rendering
Reported by Pylint.
gym/envs/robotics/hand/manipulate.py
40 issues
Line: 138
Column: 5
# GoalEnv methods
# ----------------------------
def compute_reward(self, achieved_goal, goal, info):
if self.reward_type == "sparse":
success = self._is_success(achieved_goal, goal).astype(np.float32)
return success - 1.0
else:
d_pos, d_rot = self._goal_distance(achieved_goal, goal)
Reported by Pylint.
Line: 1
Column: 1
import os
import numpy as np
from gym import utils, error
from gym.envs.robotics import rotations, hand_env
from gym.envs.robotics.utils import robot_get_obs
try:
import mujoco_py
Reported by Pylint.
Line: 10
Column: 1
try:
import mujoco_py
except ImportError as e:
raise error.DependencyNotInstalled(
"{}. (HINT: you need to install mujoco_py, and also perform the setup instructions here: https://github.com/openai/mujoco-py/.)".format(
e
)
)
Reported by Pylint.
Line: 12
Column: 1
import mujoco_py
except ImportError as e:
raise error.DependencyNotInstalled(
"{}. (HINT: you need to install mujoco_py, and also perform the setup instructions here: https://github.com/openai/mujoco-py/.)".format(
e
)
)
Reported by Pylint.
Line: 18
Column: 1
)
def quat_from_angle_and_axis(angle, axis):
assert axis.shape == (3,)
axis /= np.linalg.norm(axis)
quat = np.concatenate([[np.cos(angle / 2.0)], np.sin(angle / 2.0) * axis])
quat /= np.linalg.norm(quat)
return quat
Reported by Pylint.
Line: 19
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def quat_from_angle_and_axis(angle, axis):
assert axis.shape == (3,)
axis /= np.linalg.norm(axis)
quat = np.concatenate([[np.cos(angle / 2.0)], np.sin(angle / 2.0) * axis])
quat /= np.linalg.norm(quat)
return quat
Reported by Bandit.
Line: 32
Column: 1
MANIPULATE_PEN_XML = os.path.join("hand", "manipulate_pen.xml")
class ManipulateEnv(hand_env.HandEnv):
def __init__(
self,
model_path,
target_position,
target_rotation,
Reported by Pylint.
Line: 32
Column: 1
MANIPULATE_PEN_XML = os.path.join("hand", "manipulate_pen.xml")
class ManipulateEnv(hand_env.HandEnv):
def __init__(
self,
model_path,
target_position,
target_rotation,
Reported by Pylint.
Line: 33
Column: 5
class ManipulateEnv(hand_env.HandEnv):
def __init__(
self,
model_path,
target_position,
target_rotation,
target_position_range,
Reported by Pylint.
Line: 54
Column: 1
Args:
model_path (string): path to the environments XML file
target_position (string): the type of target position:
- ignore: target position is fully ignored, i.e. the object can be positioned arbitrarily
- fixed: target position is set to the initial position of the object
- random: target position is fully randomized according to target_position_range
target_rotation (string): the type of target rotation:
- ignore: target rotation is fully ignored, i.e. the object can be rotated arbitrarily
- fixed: target rotation is set to the initial rotation of the object
Reported by Pylint.
gym/envs/algorithmic/algorithmic_env.py
35 issues
Line: 77
Column: 27
# 2. Write or not
# 3. Which character to write. (Ignored if should_write=0)
self.action_space = Tuple(
[Discrete(len(self.MOVEMENTS)), Discrete(2), Discrete(self.base)]
)
# Can see just what is on the input tape (one of n characters, or
# nothing)
self.observation_space = Discrete(self.base + 1)
self.seed()
Reported by Pylint.
Line: 87
Column: 16
@classmethod
def _movement_idx(kls, movement_name):
return kls.MOVEMENTS.index(movement_name)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
Reported by Pylint.
Line: 147
Column: 20
if action is not None:
outfile.write("Current reward : %.3f\n" % self.last_reward)
outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward)
move = self.MOVEMENTS[inp_act]
outfile.write("Action : Tuple(move over input: %s,\n" % move)
out_act = out_act == 1
outfile.write(
" write to the output tape: %s,\n"
% out_act
Reported by Pylint.
Line: 230
Column: 35
self._check_levelup()
self.last_action = None
self.last_reward = 0
self.read_head_position = self.READ_HEAD_START
self.write_head_position = 0
self.episode_total_reward = 0.0
self.time = 0
length = self.np_random.randint(3) + self.min_length
self.input_data = self.generate_input_data(length)
Reported by Pylint.
Line: 168
Column: 9
def step(self, action):
assert self.action_space.contains(action)
self.last_action = action
inp_act, out_act, pred = action
done = False
reward = 0.0
self.time += 1
assert 0 <= self.write_head_position
Reported by Pylint.
Line: 199
Column: 9
reward = -1.0
done = True
obs = self._get_obs()
self.last_reward = reward
self.episode_total_reward += reward
return (obs, reward, done, {})
@property
def time_limit(self):
Reported by Pylint.
Line: 249
Column: 1
raise NotImplementedError
class TapeAlgorithmicEnv(AlgorithmicEnv):
"""An algorithmic env with a 1-d input tape."""
MOVEMENTS = ["left", "right"]
READ_HEAD_START = 0
Reported by Pylint.
Line: 288
Column: 1
return x_str
class GridAlgorithmicEnv(AlgorithmicEnv):
"""An algorithmic env with a 2-d input grid."""
MOVEMENTS = ["left", "right", "up", "down"]
READ_HEAD_START = (0, 0)
Reported by Pylint.
Line: 311
Column: 9
y += 1
else:
raise ValueError("Unrecognized direction: {}".format(named))
self.read_head_position = x, y
def generate_input_data(self, size):
return [
[self.np_random.randint(self.base) for _ in range(self.rows)]
for __ in range(size)
Reported by Pylint.
Line: 37
Column: 1
from gym import Env, logger
from gym.spaces import Discrete, Tuple
from gym.utils import colorize, seeding
import sys
from contextlib import closing
import numpy as np
from io import StringIO
Reported by Pylint.
gym/vector/tests/test_sync_vector_env.py
34 issues
Line: 1
Column: 1
import pytest
import numpy as np
from gym.spaces import Box, Tuple
from gym.vector.tests.utils import CustomSpace, make_env, make_custom_space_env
from gym.vector.sync_vector_env import SyncVectorEnv
Reported by Pylint.
Line: 82
Column: 28
env = SyncVectorEnv(env_fns)
reset_observations = env.reset()
actions = ("action-2", "action-3", "action-5", "action-7")
step_observations, rewards, dones, _ = env.step(actions)
finally:
env.close()
assert isinstance(env.single_observation_space, CustomSpace)
assert isinstance(env.observation_space, Tuple)
Reported by Pylint.
Line: 82
Column: 37
env = SyncVectorEnv(env_fns)
reset_observations = env.reset()
actions = ("action-2", "action-3", "action-5", "action-7")
step_observations, rewards, dones, _ = env.step(actions)
finally:
env.close()
assert isinstance(env.single_observation_space, CustomSpace)
assert isinstance(env.observation_space, Tuple)
Reported by Pylint.
Line: 1
Column: 1
import pytest
import numpy as np
from gym.spaces import Box, Tuple
from gym.vector.tests.utils import CustomSpace, make_env, make_custom_space_env
from gym.vector.sync_vector_env import SyncVectorEnv
Reported by Pylint.
Line: 10
Column: 1
from gym.vector.sync_vector_env import SyncVectorEnv
def test_create_sync_vector_env():
env_fns = [make_env("CubeCrash-v0", i) for i in range(8)]
try:
env = SyncVectorEnv(env_fns)
finally:
env.close()
Reported by Pylint.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
finally:
env.close()
assert env.num_envs == 8
def test_reset_sync_vector_env():
env_fns = [make_env("CubeCrash-v0", i) for i in range(8)]
try:
Reported by Bandit.
Line: 20
Column: 1
assert env.num_envs == 8
def test_reset_sync_vector_env():
env_fns = [make_env("CubeCrash-v0", i) for i in range(8)]
try:
env = SyncVectorEnv(env_fns)
observations = env.reset()
finally:
Reported by Pylint.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
finally:
env.close()
assert isinstance(env.observation_space, Box)
assert isinstance(observations, np.ndarray)
assert observations.dtype == env.observation_space.dtype
assert observations.shape == (8,) + env.single_observation_space.shape
assert observations.shape == env.observation_space.shape
Reported by Bandit.
Line: 29
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
env.close()
assert isinstance(env.observation_space, Box)
assert isinstance(observations, np.ndarray)
assert observations.dtype == env.observation_space.dtype
assert observations.shape == (8,) + env.single_observation_space.shape
assert observations.shape == env.observation_space.shape
Reported by Bandit.
Line: 30
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert isinstance(env.observation_space, Box)
assert isinstance(observations, np.ndarray)
assert observations.dtype == env.observation_space.dtype
assert observations.shape == (8,) + env.single_observation_space.shape
assert observations.shape == env.observation_space.shape
@pytest.mark.parametrize("use_single_action_space", [True, False])
Reported by Bandit.
gym/vector/tests/test_shared_memory.py
34 issues
Line: 1
Column: 1
import pytest
import numpy as np
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
Reported by Pylint.
Line: 9
Column: 1
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
from gym.vector.tests.utils import spaces, custom_spaces
from gym.vector.utils.shared_memory import (
Reported by Pylint.
Line: 87
Column: 9
def test_create_shared_memory_custom_space(n, ctx, space):
ctx = mp if (ctx is None) else mp.get_context(ctx)
with pytest.raises(CustomSpaceError):
shared_memory = create_shared_memory(space, n=n, ctx=ctx)
@pytest.mark.parametrize(
"space", spaces, ids=[space.__class__.__name__ for space in spaces]
)
Reported by Pylint.
Line: 1
Column: 1
import pytest
import numpy as np
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
Reported by Pylint.
Line: 4
Column: 1
import pytest
import numpy as np
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
Reported by Pylint.
Line: 5
Column: 1
import numpy as np
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
from gym.error import CustomSpaceError
Reported by Pylint.
Line: 6
Column: 1
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
Reported by Pylint.
Line: 7
Column: 1
import multiprocessing as mp
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing import Array, Process
from collections import OrderedDict
from gym.spaces import Box, Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
from gym.vector.tests.utils import spaces, custom_spaces
Reported by Pylint.
Line: 50
Column: 1
ids=[space.__class__.__name__ for space in spaces],
)
@pytest.mark.parametrize(
"ctx", [None, "fork", "spawn"], ids=["default", "fork", "spawn"]
)
def test_create_shared_memory(space, expected_type, n, ctx):
def assert_nested_type(lhs, rhs, n):
assert type(lhs) == type(rhs)
if isinstance(lhs, (list, tuple)):
Reported by Pylint.
Line: 50
Column: 1
ids=[space.__class__.__name__ for space in spaces],
)
@pytest.mark.parametrize(
"ctx", [None, "fork", "spawn"], ids=["default", "fork", "spawn"]
)
def test_create_shared_memory(space, expected_type, n, ctx):
def assert_nested_type(lhs, rhs, n):
assert type(lhs) == type(rhs)
if isinstance(lhs, (list, tuple)):
Reported by Pylint.
gym/error.py
33 issues
Line: 1
Column: 1
import sys
class Error(Exception):
pass
# Local errors
Reported by Pylint.
Line: 16
Column: 5
not actually exist.
"""
pass
class UnregisteredEnv(Unregistered):
"""Raised when the user requests an env from the registry that does
not actually exist.
Reported by Pylint.
Line: 24
Column: 5
not actually exist.
"""
pass
class UnregisteredBenchmark(Unregistered):
"""Raised when the user requests an env from the registry that does
not actually exist.
Reported by Pylint.
Line: 32
Column: 5
not actually exist.
"""
pass
class DeprecatedEnv(Error):
"""Raised when the user requests an env from the registry with an
older version number than the latest env with the same name.
Reported by Pylint.
Line: 40
Column: 5
older version number than the latest env with the same name.
"""
pass
class UnseedableEnv(Error):
"""Raised when the user tries to seed an env that does not support
seeding.
Reported by Pylint.
Line: 48
Column: 5
seeding.
"""
pass
class DependencyNotInstalled(Error):
pass
Reported by Pylint.
Line: 60
Column: 5
environment.
"""
pass
class ResetNeeded(Exception):
"""When the monitor is active, raised when the user tries to step an
environment that's already done.
Reported by Pylint.
Line: 68
Column: 5
environment that's already done.
"""
pass
class ResetNotAllowed(Exception):
"""When the monitor is active, raised when the user tries to step an
environment that's not yet done.
Reported by Pylint.
Line: 76
Column: 5
environment that's not yet done.
"""
pass
class InvalidAction(Exception):
"""Raised when the user performs an action not contained within the
action space
Reported by Pylint.
Line: 84
Column: 5
action space
"""
pass
# API errors
Reported by Pylint.
gym/wrappers/test_pixel_observation.py
31 issues
Line: 4
Column: 1
"""Tests for the pixel observation wrapper."""
import pytest
import numpy as np
import gym
from gym import spaces
from gym.wrappers.pixel_observation import PixelObservationWrapper, STATE_KEY
Reported by Pylint.
Line: 16
Column: 5
def __init__(self):
self.action_space = spaces.Box(shape=(1,), low=-1, high=1, dtype=np.float32)
def render(self, width=32, height=32, *args, **kwargs):
del args
del kwargs
image_shape = (height, width, 3)
return np.zeros(image_shape, dtype=np.uint8)
Reported by Pylint.
Line: 16
Column: 5
def __init__(self):
self.action_space = spaces.Box(shape=(1,), low=-1, high=1, dtype=np.float32)
def render(self, width=32, height=32, *args, **kwargs):
del args
del kwargs
image_shape = (height, width, 3)
return np.zeros(image_shape, dtype=np.uint8)
Reported by Pylint.
Line: 12
Column: 1
from gym.wrappers.pixel_observation import PixelObservationWrapper, STATE_KEY
class FakeEnvironment(gym.Env):
def __init__(self):
self.action_space = spaces.Box(shape=(1,), low=-1, high=1, dtype=np.float32)
def render(self, width=32, height=32, *args, **kwargs):
del args
Reported by Pylint.
Line: 33
Column: 1
return observation, reward, terminal, info
class FakeArrayObservationEnvironment(FakeEnvironment):
def __init__(self, *args, **kwargs):
self.observation_space = spaces.Box(
shape=(2,), low=-1, high=1, dtype=np.float32
)
super(FakeArrayObservationEnvironment, self).__init__(*args, **kwargs)
Reported by Pylint.
Line: 38
Column: 9
self.observation_space = spaces.Box(
shape=(2,), low=-1, high=1, dtype=np.float32
)
super(FakeArrayObservationEnvironment, self).__init__(*args, **kwargs)
class FakeDictObservationEnvironment(FakeEnvironment):
def __init__(self, *args, **kwargs):
self.observation_space = spaces.Dict(
Reported by Pylint.
Line: 41
Column: 1
super(FakeArrayObservationEnvironment, self).__init__(*args, **kwargs)
class FakeDictObservationEnvironment(FakeEnvironment):
def __init__(self, *args, **kwargs):
self.observation_space = spaces.Dict(
{
"state": spaces.Box(shape=(2,), low=-1, high=1, dtype=np.float32),
}
Reported by Pylint.
Line: 48
Column: 9
"state": spaces.Box(shape=(2,), low=-1, high=1, dtype=np.float32),
}
)
super(FakeDictObservationEnvironment, self).__init__(*args, **kwargs)
class TestPixelObservationWrapper(object):
@pytest.mark.parametrize("pixels_only", (True, False))
def test_dict_observation(self, pixels_only):
Reported by Pylint.
Line: 51
Column: 1
super(FakeDictObservationEnvironment, self).__init__(*args, **kwargs)
class TestPixelObservationWrapper(object):
@pytest.mark.parametrize("pixels_only", (True, False))
def test_dict_observation(self, pixels_only):
pixel_key = "rgb"
env = FakeDictObservationEnvironment()
Reported by Pylint.
Line: 51
Column: 1
super(FakeDictObservationEnvironment, self).__init__(*args, **kwargs)
class TestPixelObservationWrapper(object):
@pytest.mark.parametrize("pixels_only", (True, False))
def test_dict_observation(self, pixels_only):
pixel_key = "rgb"
env = FakeDictObservationEnvironment()
Reported by Pylint.
gym/utils/env_checker.py
30 issues
Line: 185
Column: 17
try:
_check_obs(obs[key], observation_space.spaces[key], "reset")
except AssertionError as e:
raise AssertionError(f"Error while checking key={key}: " + str(e))
else:
_check_obs(obs, observation_space, "reset")
# Sample a random action
action = action_space.sample()
Reported by Pylint.
Line: 208
Column: 17
try:
_check_obs(obs[key], observation_space.spaces[key], "step")
except AssertionError as e:
raise AssertionError(f"Error while checking key={key}: " + str(e))
else:
_check_obs(obs, observation_space, "step")
# We also allow int because the reward will be cast to float
Reported by Pylint.
Line: 77
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
correspond to the declared one.
"""
if not isinstance(observation_space, spaces.Tuple):
assert not isinstance(
obs, tuple
), f"The observation returned by the `{method_name}()` method should be a single value, not a tuple"
# The check for a GoalEnv is done by the base class
if isinstance(observation_space, spaces.Discrete):
Reported by Bandit.
Line: 79
Column: 1
if not isinstance(observation_space, spaces.Tuple):
assert not isinstance(
obs, tuple
), f"The observation returned by the `{method_name}()` method should be a single value, not a tuple"
# The check for a GoalEnv is done by the base class
if isinstance(observation_space, spaces.Discrete):
assert isinstance(
obs, int
Reported by Pylint.
Line: 83
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# The check for a GoalEnv is done by the base class
if isinstance(observation_space, spaces.Discrete):
assert isinstance(
obs, int
), f"The observation returned by `{method_name}()` method must be an int"
elif _is_numpy_array_space(observation_space):
assert isinstance(
obs, np.ndarray
Reported by Bandit.
Line: 87
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
obs, int
), f"The observation returned by `{method_name}()` method must be an int"
elif _is_numpy_array_space(observation_space):
assert isinstance(
obs, np.ndarray
), f"The observation returned by `{method_name}()` method must be a numpy array"
assert observation_space.contains(
obs
Reported by Bandit.
Line: 91
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
obs, np.ndarray
), f"The observation returned by `{method_name}()` method must be a numpy array"
assert observation_space.contains(
obs
), f"The observation returned by the `{method_name}()` method does not match the given observation space"
def _check_box_obs(observation_space: spaces.Box, key: str = "") -> None:
Reported by Bandit.
Line: 93
Column: 1
assert observation_space.contains(
obs
), f"The observation returned by the `{method_name}()` method does not match the given observation space"
def _check_box_obs(observation_space: spaces.Box, key: str = "") -> None:
"""
Check that the observation space is correctly formatted
Reported by Pylint.
Line: 110
Column: 1
if len(observation_space.shape) not in [1, 3]:
warnings.warn(
f"Your observation {key} has an unconventional shape (neither an image, nor a 1D vector). "
"We recommend you to flatten the observation "
"to have only a 1D vector or use a custom policy to properly process the data."
)
if np.any(np.equal(observation_space.low, -np.inf)):
Reported by Pylint.
Line: 126
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if np.any(np.equal(observation_space.low, observation_space.high)):
warnings.warn("Agent's maximum and minimum observation space values are equal")
if np.any(np.greater(observation_space.low, observation_space.high)):
assert False, "Agent's minimum observation value is greater than it's maximum"
if observation_space.low.shape != observation_space.shape:
assert (
False
), "Agent's observation_space.low and observation_space have different shapes"
if observation_space.high.shape != observation_space.shape:
Reported by Bandit.