The following issues were found
gym/wrappers/monitoring/tests/test_video_recorder.py
30 issues
Line: 1
Column: 1
import json
import os
import shutil
import tempfile
import numpy as np
import gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
Reported by Pylint.
Line: 3
Column: 1
import json
import os
import shutil
import tempfile
import numpy as np
import gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
Reported by Pylint.
Line: 4
Column: 1
import json
import os
import shutil
import tempfile
import numpy as np
import gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
Reported by Pylint.
Line: 5
Column: 1
import os
import shutil
import tempfile
import numpy as np
import gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
Reported by Pylint.
Line: 1
Column: 1
import json
import os
import shutil
import tempfile
import numpy as np
import gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
Reported by Pylint.
Line: 11
Column: 1
from gym.wrappers.monitoring.video_recorder import VideoRecorder
class BrokenRecordableEnv(object):
metadata = {"render.modes": [None, "rgb_array"]}
def render(self, mode=None):
pass
Reported by Pylint.
Line: 11
Column: 1
from gym.wrappers.monitoring.video_recorder import VideoRecorder
class BrokenRecordableEnv(object):
metadata = {"render.modes": [None, "rgb_array"]}
def render(self, mode=None):
pass
Reported by Pylint.
Line: 11
Column: 1
from gym.wrappers.monitoring.video_recorder import VideoRecorder
class BrokenRecordableEnv(object):
metadata = {"render.modes": [None, "rgb_array"]}
def render(self, mode=None):
pass
Reported by Pylint.
Line: 14
Column: 5
class BrokenRecordableEnv(object):
metadata = {"render.modes": [None, "rgb_array"]}
def render(self, mode=None):
pass
class UnrecordableEnv(object):
metadata = {"render.modes": [None]}
Reported by Pylint.
Line: 18
Column: 1
pass
class UnrecordableEnv(object):
metadata = {"render.modes": [None]}
def render(self, mode=None):
pass
Reported by Pylint.
gym/vector/utils/numpy_utils.py
29 issues
Line: 55
Column: 34
)
def concatenate_base(items, out, space):
return np.stack(items, axis=0, out=out)
def concatenate_tuple(items, out, space):
return tuple(
Reported by Pylint.
Line: 75
Column: 36
)
def concatenate_custom(items, out, space):
return tuple(items)
def create_empty_array(space, n=1, fn=np.zeros):
"""Create an empty (possibly nested) numpy array.
Reported by Pylint.
Line: 75
Column: 31
)
def concatenate_custom(items, out, space):
return tuple(items)
def create_empty_array(space, n=1, fn=np.zeros):
"""Create an empty (possibly nested) numpy array.
Reported by Pylint.
Line: 145
Column: 38
)
def create_empty_array_custom(space, n=1, fn=np.zeros):
return None
Reported by Pylint.
Line: 145
Column: 43
)
def create_empty_array_custom(space, n=1, fn=np.zeros):
return None
Reported by Pylint.
Line: 145
Column: 31
)
def create_empty_array_custom(space, n=1, fn=np.zeros):
return None
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
from gym.spaces import Space, Tuple, Dict
from gym.vector.utils.spaces import _BaseGymSpaces
from collections import OrderedDict
__all__ = ["concatenate", "create_empty_array"]
Reported by Pylint.
Line: 5
Column: 1
from gym.spaces import Space, Tuple, Dict
from gym.vector.utils.spaces import _BaseGymSpaces
from collections import OrderedDict
__all__ = ["concatenate", "create_empty_array"]
def concatenate(items, out, space):
Reported by Pylint.
Line: 39
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
array([[0.6348213 , 0.28607962, 0.60760117],
[0.87383074, 0.192658 , 0.2148103 ]], dtype=float32)
"""
assert isinstance(items, (list, tuple))
if isinstance(space, _BaseGymSpaces):
return concatenate_base(items, out, space)
elif isinstance(space, Tuple):
return concatenate_tuple(items, out, space)
elif isinstance(space, Dict):
Reported by Bandit.
Line: 40
Column: 5
[0.87383074, 0.192658 , 0.2148103 ]], dtype=float32)
"""
assert isinstance(items, (list, tuple))
if isinstance(space, _BaseGymSpaces):
return concatenate_base(items, out, space)
elif isinstance(space, Tuple):
return concatenate_tuple(items, out, space)
elif isinstance(space, Dict):
return concatenate_dict(items, out, space)
Reported by Pylint.
gym/envs/tests/test_registration.py
28 issues
Line: 8
Column: 1
from gym.envs.classic_control import cartpole
class ArgumentEnv(gym.Env):
def __init__(self, arg1, arg2, arg3):
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
Reported by Pylint.
Line: 8
Column: 1
from gym.envs.classic_control import cartpole
class ArgumentEnv(gym.Env):
def __init__(self, arg1, arg2, arg3):
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
Reported by Pylint.
Line: 8
Column: 1
from gym.envs.classic_control import cartpole
class ArgumentEnv(gym.Env):
def __init__(self, arg1, arg2, arg3):
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
Reported by Pylint.
Line: 57
Column: 12
def test_spec_with_kwargs():
map_name_value = "8x8"
env = gym.make("FrozenLake-v1", map_name=map_name_value)
assert env.spec._kwargs["map_name"] == map_name_value
def test_missing_lookup():
registry = registration.EnvRegistry()
registry.register(id="Test-v0", entry_point=None)
Reported by Pylint.
Line: 1
Column: 1
# -*- coding: utf-8 -*-
import gym
from gym import error, envs
from gym.envs import registration
from gym.envs.classic_control import cartpole
class ArgumentEnv(gym.Env):
def __init__(self, arg1, arg2, arg3):
Reported by Pylint.
Line: 8
Column: 1
from gym.envs.classic_control import cartpole
class ArgumentEnv(gym.Env):
def __init__(self, arg1, arg2, arg3):
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
Reported by Pylint.
Line: 25
Column: 1
)
def test_make():
env = envs.make("CartPole-v0")
assert env.spec.id == "CartPole-v0"
assert isinstance(env.unwrapped, cartpole.CartPoleEnv)
Reported by Pylint.
Line: 27
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_make():
env = envs.make("CartPole-v0")
assert env.spec.id == "CartPole-v0"
assert isinstance(env.unwrapped, cartpole.CartPoleEnv)
def test_make_with_kwargs():
env = envs.make("test.ArgumentEnv-v0", arg2="override_arg2", arg3="override_arg3")
Reported by Bandit.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_make():
env = envs.make("CartPole-v0")
assert env.spec.id == "CartPole-v0"
assert isinstance(env.unwrapped, cartpole.CartPoleEnv)
def test_make_with_kwargs():
env = envs.make("test.ArgumentEnv-v0", arg2="override_arg2", arg3="override_arg3")
assert env.spec.id == "test.ArgumentEnv-v0"
Reported by Bandit.
Line: 31
Column: 1
assert isinstance(env.unwrapped, cartpole.CartPoleEnv)
def test_make_with_kwargs():
env = envs.make("test.ArgumentEnv-v0", arg2="override_arg2", arg3="override_arg3")
assert env.spec.id == "test.ArgumentEnv-v0"
assert isinstance(env.unwrapped, ArgumentEnv)
assert env.arg1 == "arg1"
assert env.arg2 == "override_arg2"
Reported by Pylint.
gym/vector/async_vector_env.py
27 issues
Line: 35
Column: 1
WAITING_STEP = "step"
class AsyncVectorEnv(VectorEnv):
"""Vectorized environment that runs multiple environments in parallel. It
uses `multiprocessing` processes, and pipes for communication.
Parameters
----------
Reported by Pylint.
Line: 118
Column: 17
_obs_buffer, self.single_observation_space, n=self.num_envs
)
except CustomSpaceError:
raise ValueError(
"Using `shared_memory=True` in `AsyncVectorEnv` "
"is incompatible with non-standard Gym observation spaces "
"(i.e. custom spaces inheriting from `gym.Space`), and is "
"only compatible with default Gym spaces (e.g. `Box`, "
"`Tuple`, `Dict`) for batching. Set `shared_memory=False` "
Reported by Pylint.
Line: 195
Column: 5
pipe.send(("reset", None))
self._state = AsyncState.WAITING_RESET
def reset_wait(self, timeout=None):
"""
Parameters
----------
timeout : int or float, optional
Number of seconds before the call to `reset_wait` times out. If
Reported by Pylint.
Line: 252
Column: 5
pipe.send(("step", action))
self._state = AsyncState.WAITING_STEP
def step_wait(self, timeout=None):
"""
Parameters
----------
timeout : int or float, optional
Number of seconds before the call to `step_wait` times out. If
Reported by Pylint.
Line: 305
Column: 5
infos,
)
def close_extras(self, timeout=None, terminate=False):
"""
Parameters
----------
timeout : int or float, optional
Number of seconds before the call to `close` times out. If `None`,
Reported by Pylint.
Line: 428
Column: 21
pipe.send((data == env.observation_space, True))
else:
raise RuntimeError(
"Received unknown command `{0}`. Must "
"be one of {`reset`, `step`, `seed`, `close`, "
"`_check_observation_space`}.".format(command)
)
except (KeyboardInterrupt, Exception):
error_queue.put((index,) + sys.exc_info()[:2])
Reported by Pylint.
Line: 432
Column: 12
"be one of {`reset`, `step`, `seed`, `close`, "
"`_check_observation_space`}.".format(command)
)
except (KeyboardInterrupt, Exception):
error_queue.put((index,) + sys.exc_info()[:2])
pipe.send((None, False))
finally:
env.close()
Reported by Pylint.
Line: 471
Column: 21
pipe.send((data == observation_space, True))
else:
raise RuntimeError(
"Received unknown command `{0}`. Must "
"be one of {`reset`, `step`, `seed`, `close`, "
"`_check_observation_space`}.".format(command)
)
except (KeyboardInterrupt, Exception):
error_queue.put((index,) + sys.exc_info()[:2])
Reported by Pylint.
Line: 475
Column: 12
"be one of {`reset`, `step`, `seed`, `close`, "
"`_check_observation_space`}.".format(command)
)
except (KeyboardInterrupt, Exception):
error_queue.put((index,) + sys.exc_info()[:2])
pipe.send((None, False))
finally:
env.close()
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
import multiprocessing as mp
import time
import sys
from enum import Enum
from copy import deepcopy
from gym import logger
from gym.vector.vector_env import VectorEnv
Reported by Pylint.
gym/wrappers/test_atari_preprocessing.py
27 issues
Line: 4
Column: 1
import numpy as np
import gym
from gym.wrappers import AtariPreprocessing
import pytest
pytest.importorskip("atari_py")
@pytest.fixture(scope="module")
Reported by Pylint.
Line: 15
Column: 5
def test_atari_preprocessing_grayscale(env_fn):
import cv2
env1 = env_fn()
env2 = AtariPreprocessing(
env_fn(), screen_size=84, grayscale_obs=True, frame_skip=1, noop_max=0
)
Reported by Pylint.
Line: 14
Column: 40
return lambda: gym.make("PongNoFrameskip-v4")
def test_atari_preprocessing_grayscale(env_fn):
import cv2
env1 = env_fn()
env2 = AtariPreprocessing(
env_fn(), screen_size=84, grayscale_obs=True, frame_skip=1, noop_max=0
Reported by Pylint.
Line: 65
Column: 36
env4.close()
def test_atari_preprocessing_scale(env_fn):
# arbitrarily chosen number for stepping into env. and ensuring all observations are in the required range
max_test_steps = 10
for grayscale in [True, False]:
for scaled in [True, False]:
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
import gym
from gym.wrappers import AtariPreprocessing
import pytest
pytest.importorskip("atari_py")
@pytest.fixture(scope="module")
Reported by Pylint.
Line: 10
Column: 1
@pytest.fixture(scope="module")
def env_fn():
return lambda: gym.make("PongNoFrameskip-v4")
def test_atari_preprocessing_grayscale(env_fn):
import cv2
Reported by Pylint.
Line: 14
Column: 1
return lambda: gym.make("PongNoFrameskip-v4")
def test_atari_preprocessing_grayscale(env_fn):
import cv2
env1 = env_fn()
env2 = AtariPreprocessing(
env_fn(), screen_size=84, grayscale_obs=True, frame_skip=1, noop_max=0
Reported by Pylint.
Line: 15
Column: 5
def test_atari_preprocessing_grayscale(env_fn):
import cv2
env1 = env_fn()
env2 = AtariPreprocessing(
env_fn(), screen_size=84, grayscale_obs=True, frame_skip=1, noop_max=0
)
Reported by Pylint.
Line: 40
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
obs2 = env2.reset()
obs3 = env3.reset()
obs4 = env4.reset()
assert env1.observation_space.shape == (210, 160, 3)
assert env2.observation_space.shape == (84, 84)
assert env3.observation_space.shape == (84, 84, 3)
assert env4.observation_space.shape == (84, 84, 1)
assert obs1.shape == (210, 160, 3)
assert obs2.shape == (84, 84)
Reported by Bandit.
Line: 41
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
obs3 = env3.reset()
obs4 = env4.reset()
assert env1.observation_space.shape == (210, 160, 3)
assert env2.observation_space.shape == (84, 84)
assert env3.observation_space.shape == (84, 84, 3)
assert env4.observation_space.shape == (84, 84, 1)
assert obs1.shape == (210, 160, 3)
assert obs2.shape == (84, 84)
assert obs3.shape == (84, 84, 3)
Reported by Bandit.
gym/envs/classic_control/pendulum.py
25 issues
Line: 86
Column: 36
self.viewer.add_onetime(self.img)
self.pole_transform.set_rotation(self.state[0] + np.pi / 2)
if self.last_u:
self.imgtrans.scale = (-self.last_u / 2, np.abs(self.last_u) / 2)
return self.viewer.render(return_rgb_array=mode == "rgb_array")
def close(self):
if self.viewer:
Reported by Pylint.
Line: 32
Column: 5
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, u):
th, thdot = self.state # th := theta
g = self.g
m = self.m
l = self.l
Reported by Pylint.
Line: 41
Column: 9
dt = self.dt
u = np.clip(u, -self.max_torque, self.max_torque)[0]
self.last_u = u # for rendering
costs = angle_normalize(th) ** 2 + 0.1 * thdot ** 2 + 0.001 * (u ** 2)
newthdot = (
thdot
+ (-3 * g / (2 * l) * np.sin(th + np.pi) + 3.0 / (m * l ** 2) * u) * dt
Reported by Pylint.
Line: 51
Column: 9
newth = th + newthdot * dt
newthdot = np.clip(newthdot, -self.max_speed, self.max_speed)
self.state = np.array([newth, newthdot])
return self._get_obs(), -costs, False, {}
def reset(self):
high = np.array([np.pi, 1])
self.state = self.np_random.uniform(low=-high, high=high)
Reported by Pylint.
Line: 56
Column: 9
def reset(self):
high = np.array([np.pi, 1])
self.state = self.np_random.uniform(low=-high, high=high)
self.last_u = None
return self._get_obs()
def _get_obs(self):
theta, thetadot = self.state
Reported by Pylint.
Line: 57
Column: 9
def reset(self):
high = np.array([np.pi, 1])
self.state = self.np_random.uniform(low=-high, high=high)
self.last_u = None
return self._get_obs()
def _get_obs(self):
theta, thetadot = self.state
return np.array([np.cos(theta), np.sin(theta), thetadot])
Reported by Pylint.
Line: 72
Column: 13
self.viewer.set_bounds(-2.2, 2.2, -2.2, 2.2)
rod = rendering.make_capsule(1, 0.2)
rod.set_color(0.8, 0.3, 0.3)
self.pole_transform = rendering.Transform()
rod.add_attr(self.pole_transform)
self.viewer.add_geom(rod)
axle = rendering.make_circle(0.05)
axle.set_color(0, 0, 0)
self.viewer.add_geom(axle)
Reported by Pylint.
Line: 79
Column: 13
axle.set_color(0, 0, 0)
self.viewer.add_geom(axle)
fname = path.join(path.dirname(__file__), "assets/clockwise.png")
self.img = rendering.Image(fname, 1.0, 1.0)
self.imgtrans = rendering.Transform()
self.img.add_attr(self.imgtrans)
self.viewer.add_onetime(self.img)
self.pole_transform.set_rotation(self.state[0] + np.pi / 2)
Reported by Pylint.
Line: 80
Column: 13
self.viewer.add_geom(axle)
fname = path.join(path.dirname(__file__), "assets/clockwise.png")
self.img = rendering.Image(fname, 1.0, 1.0)
self.imgtrans = rendering.Transform()
self.img.add_attr(self.imgtrans)
self.viewer.add_onetime(self.img)
self.pole_transform.set_rotation(self.state[0] + np.pi / 2)
if self.last_u:
Reported by Pylint.
Line: 1
Column: 1
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
from os import path
class PendulumEnv(gym.Env):
metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 30}
Reported by Pylint.
gym/envs/classic_control/cartpole.py
25 issues
Line: 185
Column: 13
l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
axleoffset = cartheight / 4.0
cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
self.carttrans = rendering.Transform()
cart.add_attr(self.carttrans)
self.viewer.add_geom(cart)
l, r, t, b = (
-polewidth / 2,
polewidth / 2,
Reported by Pylint.
Line: 196
Column: 13
)
pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
pole.set_color(0.8, 0.6, 0.4)
self.poletrans = rendering.Transform(translation=(0, axleoffset))
pole.add_attr(self.poletrans)
pole.add_attr(self.carttrans)
self.viewer.add_geom(pole)
self.axle = rendering.make_circle(polewidth / 2)
self.axle.add_attr(self.poletrans)
Reported by Pylint.
Line: 200
Column: 13
pole.add_attr(self.poletrans)
pole.add_attr(self.carttrans)
self.viewer.add_geom(pole)
self.axle = rendering.make_circle(polewidth / 2)
self.axle.add_attr(self.poletrans)
self.axle.add_attr(self.carttrans)
self.axle.set_color(0.5, 0.5, 0.8)
self.viewer.add_geom(self.axle)
self.track = rendering.Line((0, carty), (screen_width, carty))
Reported by Pylint.
Line: 205
Column: 13
self.axle.add_attr(self.carttrans)
self.axle.set_color(0.5, 0.5, 0.8)
self.viewer.add_geom(self.axle)
self.track = rendering.Line((0, carty), (screen_width, carty))
self.track.set_color(0, 0, 0)
self.viewer.add_geom(self.track)
self._pole_geom = pole
Reported by Pylint.
Line: 209
Column: 13
self.track.set_color(0, 0, 0)
self.viewer.add_geom(self.track)
self._pole_geom = pole
if self.state is None:
return None
# Edit the pole polygon vertex
Reported by Pylint.
Line: 14
Column: 1
import numpy as np
class CartPoleEnv(gym.Env):
"""
Description:
A pole is attached by an un-actuated joint to a cart, which moves along
a frictionless track. The pendulum starts upright, and the goal is to
prevent it from falling over by increasing and reducing the cart's
Reported by Pylint.
Line: 105
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def step(self, action):
err_msg = "%r (%s) invalid" % (action, type(action))
assert self.action_space.contains(action), err_msg
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action == 1 else -self.force_mag
costheta = math.cos(theta)
sintheta = math.sin(theta)
Reported by Bandit.
Line: 107
Column: 9
err_msg = "%r (%s) invalid" % (action, type(action))
assert self.action_space.contains(action), err_msg
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action == 1 else -self.force_mag
costheta = math.cos(theta)
sintheta = math.sin(theta)
# For the interested reader:
Reported by Pylint.
Line: 123
Column: 13
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
if self.kinematics_integrator == "euler":
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # semi-implicit euler
x_dot = x_dot + self.tau * xacc
Reported by Pylint.
Line: 129
Column: 13
theta_dot = theta_dot + self.tau * thetaacc
else: # semi-implicit euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
self.state = (x, x_dot, theta, theta_dot)
Reported by Pylint.
gym/spaces/tests/test_utils.py
25 issues
Line: 4
Column: 1
from collections import OrderedDict
import numpy as np
import pytest
from gym.spaces import Box, Dict, Discrete, MultiBinary, MultiDiscrete, Tuple, utils
@pytest.mark.parametrize(
Reported by Pylint.
Line: 209
Column: 1
return left == right
"""
Expecteded flattened types are based off:
1. The type that the space is hardcoded as(ie. multi_discrete=np.int64, discrete=np.int64, multi_binary=np.int8)
2. The type that the space is instantiated with(ie. box=np.float32 by default unless instantiated with a different type)
3. The smallest type that the composite space(tuple, dict) can be represented as. In flatten, this is determined
internally by numpy when np.concatenate is called.
Reported by Pylint.
Line: 1
Column: 1
from collections import OrderedDict
import numpy as np
import pytest
from gym.spaces import Box, Dict, Discrete, MultiBinary, MultiDiscrete, Tuple, utils
@pytest.mark.parametrize(
Reported by Pylint.
Line: 37
Column: 1
}
),
7,
),
],
)
def test_flatdim(space, flatdim):
dim = utils.flatdim(space)
assert dim == flatdim, "Expected {} to equal {}".format(dim, flatdim)
Reported by Pylint.
Line: 42
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
)
def test_flatdim(space, flatdim):
dim = utils.flatdim(space)
assert dim == flatdim, "Expected {} to equal {}".format(dim, flatdim)
@pytest.mark.parametrize(
"space",
[
Reported by Bandit.
Line: 65
Column: 1
"position": Discrete(5),
"velocity": Box(
low=np.array([0, 0]), high=np.array([1, 5]), dtype=np.float32
),
}
),
],
)
def test_flatten_space_boxes(space):
Reported by Pylint.
Line: 72
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
)
def test_flatten_space_boxes(space):
flat_space = utils.flatten_space(space)
assert isinstance(flat_space, Box), "Expected {} to equal {}".format(
type(flat_space), Box
)
flatdim = utils.flatdim(space)
(single_dim,) = flat_space.shape
assert single_dim == flatdim, "Expected {} to equal {}".format(single_dim, flatdim)
Reported by Bandit.
Line: 77
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
)
flatdim = utils.flatdim(space)
(single_dim,) = flat_space.shape
assert single_dim == flatdim, "Expected {} to equal {}".format(single_dim, flatdim)
@pytest.mark.parametrize(
"space",
[
Reported by Bandit.
Line: 100
Column: 1
"position": Discrete(5),
"velocity": Box(
low=np.array([0, 0]), high=np.array([1, 5]), dtype=np.float32
),
}
),
],
)
def test_flat_space_contains_flat_points(space):
Reported by Pylint.
Line: 110
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
flattened_samples = [utils.flatten(space, sample) for sample in some_samples]
flat_space = utils.flatten_space(space)
for i, flat_sample in enumerate(flattened_samples):
assert flat_sample in flat_space, "Expected sample #{} {} to be in {}".format(
i, flat_sample, flat_space
)
@pytest.mark.parametrize(
Reported by Bandit.
gym/vector/utils/shared_memory.py
24 issues
Line: 6
Column: 1
from ctypes import c_bool
from collections import OrderedDict
from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
__all__ = ["create_shared_memory", "read_from_shared_memory", "write_to_shared_memory"]
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
import multiprocessing as mp
from ctypes import c_bool
from collections import OrderedDict
from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
Reported by Pylint.
Line: 2
Column: 1
import numpy as np
import multiprocessing as mp
from ctypes import c_bool
from collections import OrderedDict
from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
Reported by Pylint.
Line: 3
Column: 1
import numpy as np
import multiprocessing as mp
from ctypes import c_bool
from collections import OrderedDict
from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
Reported by Pylint.
Line: 4
Column: 1
import numpy as np
import multiprocessing as mp
from ctypes import c_bool
from collections import OrderedDict
from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
Reported by Pylint.
Line: 14
Column: 1
__all__ = ["create_shared_memory", "read_from_shared_memory", "write_to_shared_memory"]
def create_shared_memory(space, n=1, ctx=mp):
"""Create a shared memory object, to be shared across processes. This
eventually contains the observations from the vectorized environment.
Parameters
----------
Reported by Pylint.
Line: 35
Column: 5
shared_memory : dict, tuple, or `multiprocessing.Array` instance
Shared object across processes.
"""
if isinstance(space, _BaseGymSpaces):
return create_base_shared_memory(space, n=n, ctx=ctx)
elif isinstance(space, Tuple):
return create_tuple_shared_memory(space, n=n, ctx=ctx)
elif isinstance(space, Dict):
return create_dict_shared_memory(space, n=n, ctx=ctx)
Reported by Pylint.
Line: 51
Column: 1
)
def create_base_shared_memory(space, n=1, ctx=mp):
dtype = space.dtype.char
if dtype in "?":
dtype = c_bool
return ctx.Array(dtype, n * int(np.prod(space.shape)))
Reported by Pylint.
Line: 51
Column: 1
)
def create_base_shared_memory(space, n=1, ctx=mp):
dtype = space.dtype.char
if dtype in "?":
dtype = c_bool
return ctx.Array(dtype, n * int(np.prod(space.shape)))
Reported by Pylint.
Line: 58
Column: 1
return ctx.Array(dtype, n * int(np.prod(space.shape)))
def create_tuple_shared_memory(space, n=1, ctx=mp):
return tuple(
create_shared_memory(subspace, n=n, ctx=ctx) for subspace in space.spaces
)
Reported by Pylint.
gym/wrappers/monitoring/stats_recorder.py
24 issues
Line: 38
Column: 20
return self._type
@type.setter
def type(self, type):
if type not in ["t", "e"]:
raise error.Error(
"Invalid episode type {}: must be t for training or e for evaluation",
type,
)
Reported by Pylint.
Line: 40
Column: 13
@type.setter
def type(self, type):
if type not in ["t", "e"]:
raise error.Error(
"Invalid episode type {}: must be t for training or e for evaluation",
type,
)
self._type = type
Reported by Pylint.
Line: 46
Column: 27
)
self._type = type
def before_step(self, action):
assert not self.closed
if self.done:
raise error.ResetNeeded(
"Trying to step environment which is currently done. While the monitor is active for {}, you cannot step beyond the end of an episode. Call 'env.reset()' to start the next episode.".format(
Reported by Pylint.
Line: 62
Column: 53
)
)
def after_step(self, observation, reward, done, info):
self.steps += 1
self.total_steps += 1
self.rewards += reward
self.done = done
Reported by Pylint.
Line: 90
Column: 27
if self.initial_reset_timestamp is None:
self.initial_reset_timestamp = time.time()
def after_reset(self, observation):
self.steps = 0
self.rewards = 0
# We write the type at the beginning of the episode. If a user
# changes the type, it's more natural for it to apply next
# time the user calls reset().
Reported by Pylint.
Line: 1
Column: 1
import json
import os
import time
from gym import error
from gym.utils import atomic_write
from gym.utils.json_utils import json_encode_np
Reported by Pylint.
Line: 10
Column: 1
from gym.utils.json_utils import json_encode_np
class StatsRecorder(object):
def __init__(self, directory, file_prefix, autoreset=False, env_id=None):
self.autoreset = autoreset
self.env_id = env_id
self.initial_reset_timestamp = None
Reported by Pylint.
Line: 10
Column: 1
from gym.utils.json_utils import json_encode_np
class StatsRecorder(object):
def __init__(self, directory, file_prefix, autoreset=False, env_id=None):
self.autoreset = autoreset
self.env_id = env_id
self.initial_reset_timestamp = None
Reported by Pylint.
Line: 10
Column: 1
from gym.utils.json_utils import json_encode_np
class StatsRecorder(object):
def __init__(self, directory, file_prefix, autoreset=False, env_id=None):
self.autoreset = autoreset
self.env_id = env_id
self.initial_reset_timestamp = None
Reported by Pylint.
Line: 34
Column: 5
self.path = os.path.join(self.directory, filename)
@property
def type(self):
return self._type
@type.setter
def type(self, type):
if type not in ["t", "e"]:
Reported by Pylint.