The following issues were found
torch/utils/data/distributed.py
11 issues
Line: 5
Column: 1
from typing import TypeVar, Optional, Iterator
import torch
from . import Sampler, Dataset
import torch.distributed as dist
T_co = TypeVar('T_co', covariant=True)
Reported by Pylint.
Line: 97
Column: 17
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
Reported by Pylint.
Line: 99
Column: 23
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
if not self.drop_last:
# add extra samples to make it evenly divisible
Reported by Pylint.
Line: 1
Column: 1
import math
from typing import TypeVar, Optional, Iterator
import torch
from . import Sampler, Dataset
import torch.distributed as dist
T_co = TypeVar('T_co', covariant=True)
Reported by Pylint.
Line: 6
Column: 1
import torch
from . import Sampler, Dataset
import torch.distributed as dist
T_co = TypeVar('T_co', covariant=True)
Reported by Pylint.
Line: 9
Column: 1
import torch.distributed as dist
T_co = TypeVar('T_co', covariant=True)
class DistributedSampler(Sampler[T_co]):
r"""Sampler that restricts data loading to a subset of the dataset.
Reported by Pylint.
Line: 12
Column: 1
T_co = TypeVar('T_co', covariant=True)
class DistributedSampler(Sampler[T_co]):
r"""Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
:class:`torch.nn.parallel.DistributedDataParallel`. In such a case, each
process can pass a :class:`~torch.utils.data.DistributedSampler` instance as a
Reported by Pylint.
Line: 59
Column: 5
... train(loader)
"""
def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
rank: Optional[int] = None, shuffle: bool = True,
seed: int = 0, drop_last: bool = False) -> None:
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
Reported by Pylint.
Line: 97
Column: 13
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
Reported by Pylint.
Line: 113
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
else:
# remove tail of data to make it evenly divisible.
indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
Reported by Bandit.
torch/testing/_internal/dist_utils.py
11 issues
Line: 60
Column: 9
# in tests.
import torch.distributed.rpc.api as api
api._ignore_rref_leak = False
self.worker_id = self.rank
self.setup_fault_injection(faulty_messages, messages_to_delay)
if setup_rpc:
rpc.init_rpc(
Reported by Pylint.
Line: 100
Column: 16
try:
rpc.rpc_sync("worker{}".format(rank), noop, args=())
time.sleep(0.1)
except Exception as e:
if re.search(pattern=expected_error_regex, string=str(e)):
return str(e)
def wait_until_pending_futures_and_users_flushed(timeout: int = 20) -> None:
Reported by Pylint.
Line: 1
Column: 1
import re
import sys
import time
from functools import partial, wraps
from typing import Tuple
import torch.distributed as dist
import torch.distributed.rpc as rpc
from torch.distributed.rpc import _rref_context_get_debug_info
Reported by Pylint.
Line: 58
Column: 9
def new_test_method(self, *arg, **kwargs):
# Setting _ignore_rref_leak to make sure OwnerRRefs are properly deleted
# in tests.
import torch.distributed.rpc.api as api
api._ignore_rref_leak = False
self.worker_id = self.rank
self.setup_fault_injection(faulty_messages, messages_to_delay)
Reported by Pylint.
Line: 83
Column: 1
return new_test_method
def noop() -> None:
pass
def wait_until_node_failure(rank: int, expected_error_regex: str = ".*") -> str:
"""
Reported by Pylint.
Line: 93
Column: 1
indicate that the node has failed in unit tests.
Args:
rank (int): Rank of the node expected to fail
expected_error_regex (optional, str): Regex of exception message expected. Useful to ensure a specific failure
occurs, not just any.
"""
while True:
try:
rpc.rpc_sync("worker{}".format(rank), noop, args=())
Reported by Pylint.
Line: 100
Column: 9
try:
rpc.rpc_sync("worker{}".format(rank), noop, args=())
time.sleep(0.1)
except Exception as e:
if re.search(pattern=expected_error_regex, string=str(e)):
return str(e)
def wait_until_pending_futures_and_users_flushed(timeout: int = 20) -> None:
Reported by Pylint.
Line: 126
Column: 1
time.sleep(0.1)
if time.time() - start > timeout:
raise ValueError(
"Timed out waiting to flush pending futures and users, had {} pending futures and {} pending users".format(
num_pending_futures, num_pending_users
)
)
Reported by Pylint.
Line: 162
Column: 1
time.sleep(1)
if time.time() - start > timeout:
raise ValueError(
"Timed out waiting {} sec for {} owners and {} forks on rank, had {} owners and {} forks".format(
timeout,
num_owners,
num_forks,
num_owners_on_rank,
num_forks_on_rank,
Reported by Pylint.
Line: 172
Column: 1
)
def initialize_pg(init_method, rank: int, world_size: int) -> None:
# This is for tests using `dist.barrier`.
# For `RpcAgent` other than `ProcessGroupAgent`,
# no `_default_pg` is initialized.
if not dist.is_initialized():
dist.init_process_group(
Reported by Pylint.
torch/utils/data/datapipes/iter/readfilesfromtar.py
11 issues
Line: 1
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, Optional, IO, cast
from io import BufferedIOBase
import os
import tarfile
import warnings
Reported by Pylint.
Line: 3
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, Optional, IO, cast
from io import BufferedIOBase
import os
import tarfile
import warnings
Reported by Pylint.
Line: 4
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, Optional, IO, cast
from io import BufferedIOBase
import os
import tarfile
import warnings
Reported by Pylint.
Line: 6
Column: 1
from typing import Iterable, Iterator, Tuple, Optional, IO, cast
from io import BufferedIOBase
import os
import tarfile
import warnings
class ReadFilesFromTarIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""":class:`ReadFilesFromTarIterDataPipe`.
Reported by Pylint.
Line: 7
Column: 1
from io import BufferedIOBase
import os
import tarfile
import warnings
class ReadFilesFromTarIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""":class:`ReadFilesFromTarIterDataPipe`.
Reported by Pylint.
Line: 8
Column: 1
import os
import tarfile
import warnings
class ReadFilesFromTarIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""":class:`ReadFilesFromTarIterDataPipe`.
Iterable datapipe to extract tar binary streams from input iterable which contains tuples of
Reported by Pylint.
Line: 17
Column: 1
pathname and tar binary stream, yields pathname and extracted binary stream in a tuple.
args:
datapipe: Iterable datapipe that provides pathname and tar binary stream in tuples
mode: File mode used by `tarfile.open` to read file object. Mode has to be a string of the form 'filemode[:compression]'
length: a nominal length of the datapipe
Note:
The opened file handles will be closed automatically if the default DecoderDataPipe
is attached. Otherwise, user should be responsible to close file handles explicitly
Reported by Pylint.
Line: 50
Column: 1
continue
extracted_fobj = tar.extractfile(tarinfo)
if extracted_fobj is None:
warnings.warn("failed to extract file {} from source tarfile {}".format(tarinfo.name, pathname))
raise tarfile.ExtractError
inner_pathname = os.path.normpath(os.path.join(pathname, tarinfo.name))
# Add a reference of the source tarfile into extracted_fobj, so the source
# tarfile handle won't be released until all the extracted file objs are destroyed.
extracted_fobj.source_ref = tar # type: ignore[attr-defined]
Reported by Pylint.
Line: 54
Column: 1
raise tarfile.ExtractError
inner_pathname = os.path.normpath(os.path.join(pathname, tarinfo.name))
# Add a reference of the source tarfile into extracted_fobj, so the source
# tarfile handle won't be released until all the extracted file objs are destroyed.
extracted_fobj.source_ref = tar # type: ignore[attr-defined]
# typing.cast is used here to silence mypy's type checker
yield (inner_pathname, cast(BufferedIOBase, extracted_fobj))
except Exception as e:
warnings.warn(
Reported by Pylint.
Line: 58
Column: 13
extracted_fobj.source_ref = tar # type: ignore[attr-defined]
# typing.cast is used here to silence mypy's type checker
yield (inner_pathname, cast(BufferedIOBase, extracted_fobj))
except Exception as e:
warnings.warn(
"Unable to extract files from corrupted tarfile stream {} due to: {}, abort!".format(pathname, e))
raise e
def __len__(self):
Reported by Pylint.
torch/fx/tensor_type.py
11 issues
Line: 1
Column: 1
from torch.fx.experimental.unification import Var # type: ignore[attr-defined]
class TensorType:
"""
TensorType defines a type for tensors, which consists of a list of dimensions.
Example:
class M(torch.nn.Module):
def forward(self, x:TensorType((1,2,3, Dyn)), y:TensorType((1,2,3, Dyn))):
Reported by Pylint.
Line: 21
Column: 9
return f'TensorType[{self.__args__}]'
def __eq__(self, other):
if isinstance(other, self.__class__):
return list(self.__args__) == list(other.__args__)
else:
return False
@staticmethod
Reported by Pylint.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
@staticmethod
def __class_getitem__(*args):
assert isinstance(args[0], tuple)
return TensorType(args[0])
class _DynType:
"""
Reported by Bandit.
Line: 52
Column: 1
Dyn = _DynType()
def is_consistent(t1, t2):
"""
A binary relation denoted by ~ that determines if t1 is consistent with t2.
The relation is reflexive, semmetric but not transitive.
returns True if t1 and t2 are consistent and False otherwise.
Example:
Reported by Pylint.
Line: 52
Column: 1
Dyn = _DynType()
def is_consistent(t1, t2):
"""
A binary relation denoted by ~ that determines if t1 is consistent with t2.
The relation is reflexive, semmetric but not transitive.
returns True if t1 and t2 are consistent and False otherwise.
Example:
Reported by Pylint.
Line: 70
Column: 5
if t1 == Dyn or t2 == Dyn or isinstance(t1, Var) or isinstance(t2, Var):
return True
if isinstance(t1, TensorType) and isinstance(t2, TensorType):
return len(t1.__args__) == len(t2.__args__) and \
all([is_consistent(elem1, elem2) for elem1, elem2 in zip(t1.__args__, t2.__args__)])
else:
return False
Reported by Pylint.
Line: 72
Column: 13
if isinstance(t1, TensorType) and isinstance(t2, TensorType):
return len(t1.__args__) == len(t2.__args__) and \
all([is_consistent(elem1, elem2) for elem1, elem2 in zip(t1.__args__, t2.__args__)])
else:
return False
def is_more_precise(t1, t2):
Reported by Pylint.
Line: 77
Column: 1
return False
def is_more_precise(t1, t2):
"""
A binary relation denoted by <= that determines if t1 is more precise than t2.
The relation is reflexive and transitive.
returns True if t1 is more precise than t2 and False otherwise.
Example:
Reported by Pylint.
Line: 77
Column: 1
return False
def is_more_precise(t1, t2):
"""
A binary relation denoted by <= that determines if t1 is more precise than t2.
The relation is reflexive and transitive.
returns True if t1 is more precise than t2 and False otherwise.
Example:
Reported by Pylint.
Line: 94
Column: 5
if isinstance(t2, _DynType):
return True
if isinstance(t1, TensorType) and isinstance(t2, TensorType):
return len(t1.__args__) == len(t2.__args__) and \
all([is_more_precise(elem1, elem2) for elem1, elem2 in zip(t1.__args__, t2.__args__)])
else:
return False
Reported by Pylint.
torch/fx/experimental/unify_refinements.py
11 issues
Line: 45
Column: 12
equality constraints
"""
lhs, rhs = convert_eq(list_of_eq)
return unify(lhs, rhs)
def substitute_solution_one_type(mapping, t):
"""
Apply the most general unifier to a type
Reported by Pylint.
Line: 1
Column: 1
from torch.fx.experimental.graph_gradual_typechecker import Refine
from torch.fx.tensor_type import TensorType
from torch.fx.experimental.unification import Var, unify # type: ignore[attr-defined]
def infer_symbolic_types_single_pass(traced):
"""
Generate constraints over types,
solve constraints with unification,
apply solution back to the types
Reported by Pylint.
Line: 11
Column: 5
solve constraints with unification,
apply solution back to the types
"""
r = Refine(traced)
r.refine()
mgu = unify_eq(r.constraints)
substitute_all_types(traced.graph, mgu)
def infer_symbolic_types(traced):
Reported by Pylint.
Line: 33
Column: 9
"""
lhs = []
rhs = []
for eq in list_of_eq:
lhs.append(eq.lhs)
rhs.append(eq.rhs)
return tuple(lhs), tuple(rhs)
Reported by Pylint.
Line: 48
Column: 1
return unify(lhs, rhs)
def substitute_solution_one_type(mapping, t):
"""
Apply the most general unifier to a type
"""
if isinstance(t, Var):
if t in mapping.keys():
Reported by Pylint.
Line: 48
Column: 1
return unify(lhs, rhs)
def substitute_solution_one_type(mapping, t):
"""
Apply the most general unifier to a type
"""
if isinstance(t, Var):
if t in mapping.keys():
Reported by Pylint.
Line: 83
Column: 9
if old_mapping_val != mapping[k]:
flag = True
for n in graph.nodes:
n.type = substitute_solution_one_type(mapping, n.type)
def check_for_type_equality(g1, g2):
"""
A check equality to be used in fixed points.
Reported by Pylint.
Line: 86
Column: 1
for n in graph.nodes:
n.type = substitute_solution_one_type(mapping, n.type)
def check_for_type_equality(g1, g2):
"""
A check equality to be used in fixed points.
We do not use graph equality but instead type
equality.
"""
Reported by Pylint.
Line: 86
Column: 1
for n in graph.nodes:
n.type = substitute_solution_one_type(mapping, n.type)
def check_for_type_equality(g1, g2):
"""
A check equality to be used in fixed points.
We do not use graph equality but instead type
equality.
"""
Reported by Pylint.
Line: 92
Column: 9
We do not use graph equality but instead type
equality.
"""
for n, m in zip(g1.nodes, g2.nodes):
if n.type != m.type:
return False
return True
Reported by Pylint.
torch/optim/rprop.py
11 issues
Line: 2
Column: 1
import torch
from . import _functional as F
from .optimizer import Optimizer
class Rprop(Optimizer):
"""Implements the resilient backpropagation algorithm.
Args:
Reported by Pylint.
Line: 3
Column: 1
import torch
from . import _functional as F
from .optimizer import Optimizer
class Rprop(Optimizer):
"""Implements the resilient backpropagation algorithm.
Args:
Reported by Pylint.
Line: 62
Column: 37
# State initialization
if len(state) == 0:
state['step'] = 0
state['prev'] = torch.zeros_like(p, memory_format=torch.preserve_format)
state['step_size'] = grad.new().resize_as_(grad).fill_(group['lr'])
prevs.append(state['prev'])
step_sizes.append(state['step_size'])
Reported by Pylint.
Line: 62
Column: 71
# State initialization
if len(state) == 0:
state['step'] = 0
state['prev'] = torch.zeros_like(p, memory_format=torch.preserve_format)
state['step_size'] = grad.new().resize_as_(grad).fill_(group['lr'])
prevs.append(state['prev'])
step_sizes.append(state['step_size'])
Reported by Pylint.
Line: 1
Column: 1
import torch
from . import _functional as F
from .optimizer import Optimizer
class Rprop(Optimizer):
"""Implements the resilient backpropagation algorithm.
Args:
Reported by Pylint.
Line: 6
Column: 1
from .optimizer import Optimizer
class Rprop(Optimizer):
"""Implements the resilient backpropagation algorithm.
Args:
params (iterable): iterable of parameters to optimize or dicts defining
parameter groups
Reported by Pylint.
Line: 20
Column: 5
maximal allowed step sizes (default: (1e-6, 50))
"""
def __init__(self, params, lr=1e-2, etas=(0.5, 1.2), step_sizes=(1e-6, 50)):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 < etas[0] < 1.0 < etas[1]:
raise ValueError("Invalid eta values: {}, {}".format(etas[0], etas[1]))
Reported by Pylint.
Line: 21
Column: 16
"""
def __init__(self, params, lr=1e-2, etas=(0.5, 1.2), step_sizes=(1e-6, 50)):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 < etas[0] < 1.0 < etas[1]:
raise ValueError("Invalid eta values: {}, {}".format(etas[0], etas[1]))
defaults = dict(lr=lr, etas=etas, step_sizes=step_sizes)
Reported by Pylint.
Line: 21
Column: 12
"""
def __init__(self, params, lr=1e-2, etas=(0.5, 1.2), step_sizes=(1e-6, 50)):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 < etas[0] < 1.0 < etas[1]:
raise ValueError("Invalid eta values: {}, {}".format(etas[0], etas[1]))
defaults = dict(lr=lr, etas=etas, step_sizes=step_sizes)
Reported by Pylint.
Line: 27
Column: 9
raise ValueError("Invalid eta values: {}, {}".format(etas[0], etas[1]))
defaults = dict(lr=lr, etas=etas, step_sizes=step_sizes)
super(Rprop, self).__init__(params, defaults)
@torch.no_grad()
def step(self, closure=None):
"""Performs a single optimization step.
Reported by Pylint.
caffe2/python/operator_test/percentile_op_test.py
11 issues
Line: 48
Column: 9
original_inp,
value_to_pct_map,
dist_lengths,
expected_values
):
net = core.Net('test_shape_inference')
result = net.Percentile(
['original_values', 'value_to_pct_map', 'dist_lengths'],
['percentile_values']
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import numpy as np
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class TestPercentileOp(hu.HypothesisTestCase):
def _test_percentile_op(
self,
original_inp,
value_to_pct_map,
dist_lengths,
Reported by Pylint.
Line: 19
Column: 9
dist_lengths,
expected_values
):
op = core.CreateOperator(
'Percentile',
['original_values', 'value_to_pct_map', 'dist_lengths'],
['percentile_values']
)
workspace.FeedBlob('original_values', np.array(
Reported by Pylint.
Line: 67
Column: 5
self.assertEqual(shapes[result], list(workspace.blobs['original_values'].shape))
self.assertEqual(types[result], core.DataType.FLOAT)
def test_percentile_op_with_only_one_dist(self):
self._test_percentile_op(
original_inp=[[5]],
value_to_pct_map=[[5, 0.4]],
dist_lengths=[1],
expected_values=[[0.4]]
Reported by Pylint.
Line: 75
Column: 5
expected_values=[[0.4]]
)
def test_percentile_op_with_all_elements_in_map(self):
self._test_percentile_op(
original_inp=[[3, 4], [10, 4]],
value_to_pct_map=[[3, 0.3], [4, 0.6],
[10, 0.8], [4, 0.5], [5, 0.6]],
dist_lengths=[3, 2],
Reported by Pylint.
Line: 84
Column: 5
expected_values=[[0.3, 0.5], [0.8, 0.5]],
)
def test_percentile_op_with_same_value(self):
self._test_percentile_op(
original_inp=[[1, 1], [1, 2]],
value_to_pct_map=[[1, 0.1], [4, 0.4], [2, 0.5]],
dist_lengths=[2, 1],
expected_values=[[0.1, 0.0], [0.1, 0.5]]
Reported by Pylint.
Line: 92
Column: 5
expected_values=[[0.1, 0.0], [0.1, 0.5]]
)
def test_percentile_op_with_elements_bigger_than_map_range(self):
self._test_percentile_op(
original_inp=[[1, 5], [3, 4]],
value_to_pct_map=[[1, 0.1], [4, 0.4], [2, 0.1], [3, 0.3]],
dist_lengths=[2, 2],
expected_values=[[0.1, 1.], [0.3, 1.0]]
Reported by Pylint.
Line: 100
Column: 5
expected_values=[[0.1, 1.], [0.3, 1.0]]
)
def test_percentile_op_with_elements_smaller_than_map_range(self):
self._test_percentile_op(
original_inp=[[1], [5], [6]],
value_to_pct_map=[[2, 0.2], [5, 0.5], [7, 0.5]],
dist_lengths=[3],
expected_values=[[0.0], [0.5], [0.5]]
Reported by Pylint.
Line: 108
Column: 5
expected_values=[[0.0], [0.5], [0.5]]
)
def test_percentile_op_with_interpolation(self):
self._test_percentile_op(
original_inp=[[3, 2, 5], [6, 7, 8]],
value_to_pct_map=[[1, 0.1], [4, 0.7], [4.5, 0.8],
[6, 0.5], [8, 0.9],
[8, 0.6]],
Reported by Pylint.
caffe2/python/sparse_to_dense_test.py
11 issues
Line: 1
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test_util import TestCase
import numpy as np
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class TestSparseToDense(TestCase):
def test_sparse_to_dense(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values'],
['output'])
Reported by Pylint.
Line: 12
Column: 5
class TestSparseToDense(TestCase):
def test_sparse_to_dense(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values'],
['output'])
workspace.FeedBlob(
Reported by Pylint.
Line: 13
Column: 9
class TestSparseToDense(TestCase):
def test_sparse_to_dense(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values'],
['output'])
workspace.FeedBlob(
'indices',
Reported by Pylint.
Line: 36
Column: 5
self.assertEqual(output.shape, expected.shape)
np.testing.assert_array_equal(output, expected)
def test_sparse_to_dense_shape_inference(self):
indices = np.array([2, 4, 999, 2], dtype=np.int32)
values = np.array([[1, 2], [2, 4], [6, 7], [7, 8]], dtype=np.int32)
data_to_infer_dim = np.array(np.zeros(1500, ), dtype=np.int32)
op = core.CreateOperator(
'SparseToDense',
Reported by Pylint.
Line: 40
Column: 9
indices = np.array([2, 4, 999, 2], dtype=np.int32)
values = np.array([[1, 2], [2, 4], [6, 7], [7, 8]], dtype=np.int32)
data_to_infer_dim = np.array(np.zeros(1500, ), dtype=np.int32)
op = core.CreateOperator(
'SparseToDense',
['indices', 'values', 'data_to_infer_dim'],
['output'])
workspace.FeedBlob('indices', indices)
workspace.FeedBlob('values', values)
Reported by Pylint.
Line: 63
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
"data_to_infer_dim": core.DataType.INT32,
},
)
assert (
"output" in shapes and "output" in types
), "Failed to infer the shape or type of output"
self.assertEqual(shapes["output"], [1500, 2])
self.assertEqual(types["output"], core.DataType.INT32)
Reported by Bandit.
Line: 70
Column: 5
self.assertEqual(types["output"], core.DataType.INT32)
def test_sparse_to_dense_invalid_inputs(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values'],
['output'])
workspace.FeedBlob(
Reported by Pylint.
Line: 71
Column: 9
def test_sparse_to_dense_invalid_inputs(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values'],
['output'])
workspace.FeedBlob(
'indices',
Reported by Pylint.
Line: 85
Column: 5
with self.assertRaises(RuntimeError):
workspace.RunOperatorOnce(op)
def test_sparse_to_dense_with_data_to_infer_dim(self):
op = core.CreateOperator(
'SparseToDense',
['indices', 'values', 'data_to_infer_dim'],
['output'])
workspace.FeedBlob(
Reported by Pylint.
test/distributed/elastic/rendezvous/static_rendezvous_test.py
11 issues
Line: 9
Column: 1
import unittest
from contextlib import closing
from torch.distributed.elastic.rendezvous import RendezvousParameters
from torch.distributed.elastic.rendezvous.static_tcp_rendezvous import (
create_rdzv_handler,
)
from torch.distributed.elastic.utils import get_socket_with_port
Reported by Pylint.
Line: 10
Column: 1
from contextlib import closing
from torch.distributed.elastic.rendezvous import RendezvousParameters
from torch.distributed.elastic.rendezvous.static_tcp_rendezvous import (
create_rdzv_handler,
)
from torch.distributed.elastic.utils import get_socket_with_port
Reported by Pylint.
Line: 13
Column: 1
from torch.distributed.elastic.rendezvous.static_tcp_rendezvous import (
create_rdzv_handler,
)
from torch.distributed.elastic.utils import get_socket_with_port
class StaticTCPRendezvousTest(unittest.TestCase):
def test_missing_port(self):
rdzv_params = RendezvousParameters(
Reported by Pylint.
Line: 1
Column: 1
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import unittest
from contextlib import closing
from torch.distributed.elastic.rendezvous import RendezvousParameters
Reported by Pylint.
Line: 16
Column: 1
from torch.distributed.elastic.utils import get_socket_with_port
class StaticTCPRendezvousTest(unittest.TestCase):
def test_missing_port(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="localhost",
run_id="test_id",
Reported by Pylint.
Line: 17
Column: 5
class StaticTCPRendezvousTest(unittest.TestCase):
def test_missing_port(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="localhost",
run_id="test_id",
min_nodes=1,
Reported by Pylint.
Line: 28
Column: 5
with self.assertRaises(ValueError):
create_rdzv_handler(rdzv_params)
def test_empty_endpoint(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="",
run_id="test_id",
min_nodes=1,
Reported by Pylint.
Line: 39
Column: 5
with self.assertRaises(ValueError):
create_rdzv_handler(rdzv_params)
def test_ipv6_addr(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:90",
run_id="test_id",
min_nodes=1,
Reported by Pylint.
Line: 50
Column: 5
with self.assertRaises(ValueError):
create_rdzv_handler(rdzv_params)
def test_ipv6_addr_localhost(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="[::1]:90",
run_id="test_id",
min_nodes=1,
Reported by Pylint.
Line: 61
Column: 5
with self.assertRaises(ValueError):
create_rdzv_handler(rdzv_params)
def test_get_backend(self):
rdzv_params = RendezvousParameters(
backend="static",
endpoint="localhost:123",
run_id="test",
min_nodes=1,
Reported by Pylint.
test/onnx/model_defs/emb_seq.py
11 issues
Line: 2
Column: 1
import torch.nn as nn
class EmbeddingNetwork1(nn.Module):
def __init__(self, dim=5):
super(EmbeddingNetwork1, self).__init__()
self.emb = nn.Embedding(10, dim)
self.lin1 = nn.Linear(dim, 1)
Reported by Pylint.
Line: 15
Column: 23
self.lin1,
)
def forward(self, input):
return self.seq(input)
class EmbeddingNetwork2(nn.Module):
Reported by Pylint.
Line: 1
Column: 1
import torch.nn as nn
class EmbeddingNetwork1(nn.Module):
def __init__(self, dim=5):
super(EmbeddingNetwork1, self).__init__()
self.emb = nn.Embedding(10, dim)
self.lin1 = nn.Linear(dim, 1)
Reported by Pylint.
Line: 5
Column: 1
import torch.nn as nn
class EmbeddingNetwork1(nn.Module):
def __init__(self, dim=5):
super(EmbeddingNetwork1, self).__init__()
self.emb = nn.Embedding(10, dim)
self.lin1 = nn.Linear(dim, 1)
self.seq = nn.Sequential(
Reported by Pylint.
Line: 5
Column: 1
import torch.nn as nn
class EmbeddingNetwork1(nn.Module):
def __init__(self, dim=5):
super(EmbeddingNetwork1, self).__init__()
self.emb = nn.Embedding(10, dim)
self.lin1 = nn.Linear(dim, 1)
self.seq = nn.Sequential(
Reported by Pylint.
Line: 7
Column: 9
class EmbeddingNetwork1(nn.Module):
def __init__(self, dim=5):
super(EmbeddingNetwork1, self).__init__()
self.emb = nn.Embedding(10, dim)
self.lin1 = nn.Linear(dim, 1)
self.seq = nn.Sequential(
self.emb,
self.lin1,
Reported by Pylint.
Line: 15
Column: 5
self.lin1,
)
def forward(self, input):
return self.seq(input)
class EmbeddingNetwork2(nn.Module):
Reported by Pylint.
Line: 19
Column: 1
return self.seq(input)
class EmbeddingNetwork2(nn.Module):
def __init__(self, in_space=10, dim=3):
super(EmbeddingNetwork2, self).__init__()
self.embedding = nn.Embedding(in_space, dim)
self.seq = nn.Sequential(
Reported by Pylint.
Line: 19
Column: 1
return self.seq(input)
class EmbeddingNetwork2(nn.Module):
def __init__(self, in_space=10, dim=3):
super(EmbeddingNetwork2, self).__init__()
self.embedding = nn.Embedding(in_space, dim)
self.seq = nn.Sequential(
Reported by Pylint.
Line: 22
Column: 9
class EmbeddingNetwork2(nn.Module):
def __init__(self, in_space=10, dim=3):
super(EmbeddingNetwork2, self).__init__()
self.embedding = nn.Embedding(in_space, dim)
self.seq = nn.Sequential(
self.embedding,
nn.Linear(dim, 1),
nn.Sigmoid()
Reported by Pylint.