The following issues were found
torch/distributed/rpc/options.py
22 issues
Line: 1
Column: 1
from torch._C._distributed_rpc import _TensorPipeRpcBackendOptionsBase
from . import constants as rpc_contants
import torch
from typing import Dict, List, Optional, Union
DeviceType = Union[int, str, torch.device]
Reported by Pylint.
Line: 2
Column: 1
from torch._C._distributed_rpc import _TensorPipeRpcBackendOptionsBase
from . import constants as rpc_contants
import torch
from typing import Dict, List, Optional, Union
DeviceType = Union[int, str, torch.device]
Reported by Pylint.
Line: 9
Column: 30
from typing import Dict, List, Optional, Union
DeviceType = Union[int, str, torch.device]
def _to_device(device: DeviceType) -> torch.device:
device = torch.device(device)
if device.type != "cuda":
Reported by Pylint.
Line: 12
Column: 39
DeviceType = Union[int, str, torch.device]
def _to_device(device: DeviceType) -> torch.device:
device = torch.device(device)
if device.type != "cuda":
raise ValueError(
"`set_devices` expect a list of CUDA devices, but got "
f"device type {device.type}."
Reported by Pylint.
Line: 13
Column: 14
def _to_device(device: DeviceType) -> torch.device:
device = torch.device(device)
if device.type != "cuda":
raise ValueError(
"`set_devices` expect a list of CUDA devices, but got "
f"device type {device.type}."
)
Reported by Pylint.
Line: 22
Column: 84
return device
def _to_device_map(device_map: Dict[DeviceType, DeviceType]) -> Dict[torch.device, torch.device]:
full_device_map : Dict[torch.device, torch.device] = {}
reverse_map : Dict[torch.device, torch.device] = {}
for k in device_map:
v = device_map[k]
k, v = torch.device(k), torch.device(v)
Reported by Pylint.
Line: 22
Column: 70
return device
def _to_device_map(device_map: Dict[DeviceType, DeviceType]) -> Dict[torch.device, torch.device]:
full_device_map : Dict[torch.device, torch.device] = {}
reverse_map : Dict[torch.device, torch.device] = {}
for k in device_map:
v = device_map[k]
k, v = torch.device(k), torch.device(v)
Reported by Pylint.
Line: 23
Column: 28
def _to_device_map(device_map: Dict[DeviceType, DeviceType]) -> Dict[torch.device, torch.device]:
full_device_map : Dict[torch.device, torch.device] = {}
reverse_map : Dict[torch.device, torch.device] = {}
for k in device_map:
v = device_map[k]
k, v = torch.device(k), torch.device(v)
if v in reverse_map:
Reported by Pylint.
Line: 23
Column: 42
def _to_device_map(device_map: Dict[DeviceType, DeviceType]) -> Dict[torch.device, torch.device]:
full_device_map : Dict[torch.device, torch.device] = {}
reverse_map : Dict[torch.device, torch.device] = {}
for k in device_map:
v = device_map[k]
k, v = torch.device(k), torch.device(v)
if v in reverse_map:
Reported by Pylint.
Line: 24
Column: 24
def _to_device_map(device_map: Dict[DeviceType, DeviceType]) -> Dict[torch.device, torch.device]:
full_device_map : Dict[torch.device, torch.device] = {}
reverse_map : Dict[torch.device, torch.device] = {}
for k in device_map:
v = device_map[k]
k, v = torch.device(k), torch.device(v)
if v in reverse_map:
raise ValueError(
Reported by Pylint.
test/onnx/test_pytorch_common.py
22 issues
Line: 5
Column: 1
import os
import unittest
import sys
import torch
import torch.autograd.function as function
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
Reported by Pylint.
Line: 6
Column: 1
import unittest
import sys
import torch
import torch.autograd.function as function
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
from torch.testing._internal.common_utils import * # noqa: F401,F403
Reported by Pylint.
Line: 11
Column: 1
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
from torch.testing._internal.common_utils import * # noqa: F401,F403
torch.set_default_tensor_type("torch.FloatTensor")
BATCH_SIZE = 2
Reported by Pylint.
Line: 11
Column: 1
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
from torch.testing._internal.common_utils import * # noqa: F401,F403
torch.set_default_tensor_type("torch.FloatTensor")
BATCH_SIZE = 2
Reported by Pylint.
Line: 105
Column: 18
return skip_dec
def flatten(x):
return tuple(function._iter_filter(lambda o: isinstance(o, torch.Tensor))(x))
Reported by Pylint.
Line: 1
Column: 1
import functools
import os
import unittest
import sys
import torch
import torch.autograd.function as function
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
Reported by Pylint.
Line: 11
Column: 1
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(-1, pytorch_test_dir)
from torch.testing._internal.common_utils import * # noqa: F401,F403
torch.set_default_tensor_type("torch.FloatTensor")
BATCH_SIZE = 2
Reported by Pylint.
Line: 24
Column: 5
def _skipper(condition, reason):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if condition():
raise unittest.SkipTest(reason)
return f(*args, **kwargs)
Reported by Pylint.
Line: 44
Column: 1
# if exporting the op is only supported after a specific version,
# add this wrapper to prevent running the test for opset_versions
# smaller than the currently tested opset_version
def skipIfUnsupportedMinOpsetVersion(min_opset_version):
def skip_dec(func):
def wrapper(self):
if self.opset_version < min_opset_version:
raise unittest.SkipTest("Skip verify test for unsupported opset_version")
return func(self)
Reported by Pylint.
Line: 44
Column: 1
# if exporting the op is only supported after a specific version,
# add this wrapper to prevent running the test for opset_versions
# smaller than the currently tested opset_version
def skipIfUnsupportedMinOpsetVersion(min_opset_version):
def skip_dec(func):
def wrapper(self):
if self.opset_version < min_opset_version:
raise unittest.SkipTest("Skip verify test for unsupported opset_version")
return func(self)
Reported by Pylint.
torch/distributions/multinomial.py
22 issues
Line: 68
Column: 23
def expand(self, batch_shape, _instance=None):
new = self._get_checked_instance(Multinomial, _instance)
batch_shape = torch.Size(batch_shape)
new.total_count = self.total_count
new._categorical = self._categorical.expand(batch_shape)
super(Multinomial, new).__init__(batch_shape, self.event_shape, validate_args=False)
new._validate_args = self._validate_args
return new
Reported by Pylint.
Line: 94
Column: 35
def param_shape(self):
return self._categorical.param_shape
def sample(self, sample_shape=torch.Size()):
sample_shape = torch.Size(sample_shape)
samples = self._categorical.sample(torch.Size((self.total_count,)) + sample_shape)
# samples.shape is (total_count, sample_shape, batch_shape), need to change it to
# (sample_shape, batch_shape, total_count)
shifted_idx = list(range(samples.dim()))
Reported by Pylint.
Line: 95
Column: 24
return self._categorical.param_shape
def sample(self, sample_shape=torch.Size()):
sample_shape = torch.Size(sample_shape)
samples = self._categorical.sample(torch.Size((self.total_count,)) + sample_shape)
# samples.shape is (total_count, sample_shape, batch_shape), need to change it to
# (sample_shape, batch_shape, total_count)
shifted_idx = list(range(samples.dim()))
shifted_idx.append(shifted_idx.pop(0))
Reported by Pylint.
Line: 96
Column: 44
def sample(self, sample_shape=torch.Size()):
sample_shape = torch.Size(sample_shape)
samples = self._categorical.sample(torch.Size((self.total_count,)) + sample_shape)
# samples.shape is (total_count, sample_shape, batch_shape), need to change it to
# (sample_shape, batch_shape, total_count)
shifted_idx = list(range(samples.dim()))
shifted_idx.append(shifted_idx.pop(0))
samples = samples.permute(*shifted_idx)
Reported by Pylint.
Line: 103
Column: 42
shifted_idx.append(shifted_idx.pop(0))
samples = samples.permute(*shifted_idx)
counts = samples.new(self._extended_shape(sample_shape)).zero_()
counts.scatter_add_(-1, samples, torch.ones_like(samples))
return counts.type_as(self.probs)
def log_prob(self, value):
if self._validate_args:
self._validate_sample(value)
Reported by Pylint.
Line: 110
Column: 45
if self._validate_args:
self._validate_sample(value)
logits, value = broadcast_all(self.logits, value)
logits = logits.clone(memory_format=torch.contiguous_format)
log_factorial_n = torch.lgamma(value.sum(-1) + 1)
log_factorial_xs = torch.lgamma(value + 1).sum(-1)
logits[(value == 0) & (logits == -inf)] = 0
log_powers = (logits * value).sum(-1)
return log_factorial_n - log_factorial_xs + log_powers
Reported by Pylint.
Line: 111
Column: 27
self._validate_sample(value)
logits, value = broadcast_all(self.logits, value)
logits = logits.clone(memory_format=torch.contiguous_format)
log_factorial_n = torch.lgamma(value.sum(-1) + 1)
log_factorial_xs = torch.lgamma(value + 1).sum(-1)
logits[(value == 0) & (logits == -inf)] = 0
log_powers = (logits * value).sum(-1)
return log_factorial_n - log_factorial_xs + log_powers
Reported by Pylint.
Line: 112
Column: 28
logits, value = broadcast_all(self.logits, value)
logits = logits.clone(memory_format=torch.contiguous_format)
log_factorial_n = torch.lgamma(value.sum(-1) + 1)
log_factorial_xs = torch.lgamma(value + 1).sum(-1)
logits[(value == 0) & (logits == -inf)] = 0
log_powers = (logits * value).sum(-1)
return log_factorial_n - log_factorial_xs + log_powers
Reported by Pylint.
Line: 9
Column: 1
from torch.distributions.utils import broadcast_all
class Multinomial(Distribution):
r"""
Creates a Multinomial distribution parameterized by :attr:`total_count` and
either :attr:`probs` or :attr:`logits` (but not both). The innermost dimension of
:attr:`probs` indexes over categories. All other dimensions index over batches.
Reported by Pylint.
Line: 9
Column: 1
from torch.distributions.utils import broadcast_all
class Multinomial(Distribution):
r"""
Creates a Multinomial distribution parameterized by :attr:`total_count` and
either :attr:`probs` or :attr:`logits` (but not both). The innermost dimension of
:attr:`probs` indexes over categories. All other dimensions index over batches.
Reported by Pylint.
torch/distributed/pipeline/sync/worker.py
21 issues
Line: 17
Column: 1
import torch
from .microbatch import Batch
from .stream import AbstractStream, use_device, use_stream
__all__: List[str] = []
Reported by Pylint.
Line: 18
Column: 1
import torch
from .microbatch import Batch
from .stream import AbstractStream, use_device, use_stream
__all__: List[str] = []
ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
Reported by Pylint.
Line: 28
Column: 15
# Queue is generic only in stubs.
# https://mypy.readthedocs.io/en/latest/common_issues.html#using-classes-that-are-generic-in-stubs-but-not-at-runtime
if TYPE_CHECKING:
InQueue = Queue[Optional["Task"]]
OutQueue = Queue[Tuple[bool, Union[Tuple["Task", Batch], ExcInfo, None]]]
else:
InQueue = Queue
OutQueue = Queue
Reported by Pylint.
Line: 29
Column: 16
# https://mypy.readthedocs.io/en/latest/common_issues.html#using-classes-that-are-generic-in-stubs-but-not-at-runtime
if TYPE_CHECKING:
InQueue = Queue[Optional["Task"]]
OutQueue = Queue[Tuple[bool, Union[Tuple["Task", Batch], ExcInfo, None]]]
else:
InQueue = Queue
OutQueue = Queue
Reported by Pylint.
Line: 56
Column: 30
self.stream = stream
self._compute = compute
self._finalize = finalize
self._grad_enabled = torch.is_grad_enabled()
def compute(self) -> Batch:
with use_stream(self.stream), torch.set_grad_enabled(self._grad_enabled):
return self._compute()
Reported by Pylint.
Line: 69
Column: 60
self._finalize(batch)
def worker(in_queue: InQueue, out_queue: OutQueue, device: torch.device) -> None:
"""The main loop of a worker thread."""
with use_device(device):
while True:
task = in_queue.get()
Reported by Pylint.
Line: 91
Column: 34
out_queue.put(done)
def create_workers(devices: List[torch.device],) -> Tuple[List[InQueue], List[OutQueue]]:
"""Spawns worker threads. A worker thread is bound to a device."""
in_queues: List[InQueue] = []
out_queues: List[OutQueue] = []
# Spawn workers.
Reported by Pylint.
Line: 97
Column: 19
out_queues: List[OutQueue] = []
# Spawn workers.
workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {}
def normalize_device(device: torch.device) -> torch.device:
if device.type == "cuda" and device.index is None:
return torch.device("cuda", index=torch.cuda.current_device())
Reported by Pylint.
Line: 99
Column: 34
# Spawn workers.
workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {}
def normalize_device(device: torch.device) -> torch.device:
if device.type == "cuda" and device.index is None:
return torch.device("cuda", index=torch.cuda.current_device())
if device.type == "cpu" and device.index is not None:
return torch.device("cpu")
Reported by Pylint.
Line: 99
Column: 51
# Spawn workers.
workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {}
def normalize_device(device: torch.device) -> torch.device:
if device.type == "cuda" and device.index is None:
return torch.device("cuda", index=torch.cuda.current_device())
if device.type == "cpu" and device.index is not None:
return torch.device("cpu")
Reported by Pylint.
torch/distributed/optim/optimizer.py
21 issues
Line: 18
Column: 3
logger = logging.getLogger(__name__)
# XXX: we define a _ScriptModuleOptimizer here to explicitly
# compile the FunctionalOptimizer class into TorchScript
# This is because ScriptClass instance still lives in
# python unless you explicitly compile it as an attribute
# in ScriptModule or pass it to a ScriptFunction
# _ScriptLocalOptimizerInterface serves as a common
Reported by Pylint.
Line: 26
Column: 3
# _ScriptLocalOptimizerInterface serves as a common
# interface type for Optimizer ScriptModules.
#
# TODO (wanchaol): remove this once we added TorchScript
# class reference semantics
@jit.interface
class _ScriptLocalOptimizerInterface(object):
def step(self, autograd_ctx_id: int) -> None:
pass
Reported by Pylint.
Line: 33
Column: 1
def step(self, autograd_ctx_id: int) -> None:
pass
class _ScriptLocalOptimizer(nn.Module):
# TorchScript does not support multithread concurrent compiling.
# request_callback might invoke concurrent compiling, so we
# serialize the compiling with a lock
compile_lock = Lock()
Reported by Pylint.
Line: 59
Column: 3
self.optim.step(grads)
# TODO (wanchaol): remove/merge this with ScriptLocalOptimizer once
# we have converted all to functional optimizer in distributed.optim
class _LocalOptimizer(object):
# Ideally we would only need to share a lock for instances of
# _LocalOptimizer that deal with the same parameters. We are
# making a simplifying assumption here that if there is more
Reported by Pylint.
Line: 115
Column: 3
local_optim.step(autograd_ctx_id)
def _wait_for_all(rpc_futs):
# TODO: improve error propagation
exception = None
results = []
for fut in rpc_futs:
try:
results.append(fut.wait())
Reported by Pylint.
Line: 121
Column: 16
for fut in rpc_futs:
try:
results.append(fut.wait())
except Exception as e:
results.append(e)
exception = e
if exception is not None:
raise exception
return results
Reported by Pylint.
Line: 202
Column: 13
if self.is_functional_optim:
optimizer_new_func = _new_script_local_optimizer
else:
logger.warn(
f"Creating the optimizer {optimizer_class} without TorchScript support, "
"this might result in slow computation time in multithreading environment"
"(i.e. Distributed Model Parallel training on CPU) due to the Python's "
"Global Interpreter Lock (GIL). Please file an issue if you need this "
"optimizer in TorchScript. "
Reported by Pylint.
Line: 202
Column: 13
if self.is_functional_optim:
optimizer_new_func = _new_script_local_optimizer
else:
logger.warn(
f"Creating the optimizer {optimizer_class} without TorchScript support, "
"this might result in slow computation time in multithreading environment"
"(i.e. Distributed Model Parallel training on CPU) due to the Python's "
"Global Interpreter Lock (GIL). Please file an issue if you need this "
"optimizer in TorchScript. "
Reported by Pylint.
Line: 237
Column: 9
context_id: the autograd context id for which we should run the
optimizer step.
"""
dist_autograd._is_valid_context(context_id)
if self.is_functional_optim:
optimizer_step_func = _script_local_optimizer_step
else:
optimizer_step_func = _local_optimizer_step
Reported by Pylint.
Line: 1
Column: 1
from typing import List, Optional
import logging
import torch.distributed.rpc as rpc
import torch.jit as jit
import torch.nn as nn
from torch import Tensor
from torch.distributed.rpc import RRef
from torch.distributed.optim import functional_optim_map
Reported by Pylint.
torch/autograd/grad_mode.py
21 issues
Line: 125
Column: 21
self.prev = False
def __enter__(self):
self.prev = torch.is_grad_enabled()
torch.set_grad_enabled(False)
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
torch.set_grad_enabled(self.prev)
Reported by Pylint.
Line: 168
Column: 21
"""
def __enter__(self) -> None:
self.prev = torch.is_grad_enabled()
torch._C._set_grad_enabled(True)
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
torch._C._set_grad_enabled(self.prev)
Reported by Pylint.
Line: 214
Column: 21
"""
def __init__(self, mode: bool) -> None:
self.prev = torch.is_grad_enabled()
torch._C._set_grad_enabled(mode)
def __enter__(self) -> None:
pass
Reported by Pylint.
Line: 58
Column: 28
gen.close()
raise
except BaseException:
# Propagate the exception thrown at us by the caller
with cls():
response = gen.throw(*sys.exc_info())
else:
Reported by Pylint.
Line: 168
Column: 9
"""
def __enter__(self) -> None:
self.prev = torch.is_grad_enabled()
torch._C._set_grad_enabled(True)
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
torch._C._set_grad_enabled(self.prev)
Reported by Pylint.
Line: 1
Column: 1
import sys
import torch
import functools
import inspect
from typing import Any, Callable, TypeVar, cast
__all__ = ['no_grad', 'enable_grad', 'set_grad_enabled',
'inference_mode']
Reported by Pylint.
Line: 3
Column: 1
import sys
import torch
import functools
import inspect
from typing import Any, Callable, TypeVar, cast
__all__ = ['no_grad', 'enable_grad', 'set_grad_enabled',
'inference_mode']
Reported by Pylint.
Line: 4
Column: 1
import sys
import torch
import functools
import inspect
from typing import Any, Callable, TypeVar, cast
__all__ = ['no_grad', 'enable_grad', 'set_grad_enabled',
'inference_mode']
Reported by Pylint.
Line: 5
Column: 1
import torch
import functools
import inspect
from typing import Any, Callable, TypeVar, cast
__all__ = ['no_grad', 'enable_grad', 'set_grad_enabled',
'inference_mode']
Reported by Pylint.
Line: 15
Column: 1
# Used for annotating the decorator usage of 'no_grad' and 'enable_grad'.
# See https://mypy.readthedocs.io/en/latest/generics.html#declaring-decorators
FuncType = Callable[..., Any]
F = TypeVar('F', bound=FuncType)
class _DecoratorContextManager:
"""Allow a context manager to be used as a decorator"""
Reported by Pylint.
torch/_vmap_internals.py
21 issues
Line: 83
Column: 23
batch_size = _validate_and_get_batch_size(flat_in_dims, flat_args)
# See NOTE [Ignored _remove_batch_dim, _add_batch_dim]
batched_inputs = [arg if in_dim is None else
torch._add_batch_dim(arg, in_dim, vmap_level)
for in_dim, arg in zip(flat_in_dims, flat_args)]
return tree_unflatten(batched_inputs, args_spec), batch_size
# Undos the batching (and any batch dimensions) associated with the `vmap_level`.
def _unwrap_batched(
Reported by Pylint.
Line: 103
Column: 16
# with '_', see #40397.
if isinstance(batched_outputs, Tensor):
out_dim = out_dims_as_tuple[0]
return torch._remove_batch_dim(batched_outputs, vmap_level, batch_size, out_dim) # type: ignore[return-value]
return tuple(torch._remove_batch_dim(out, vmap_level, batch_size, out_dim)
for out, out_dim in zip(batched_outputs, out_dims_as_tuple))
# Checks that `fn` returned one or more Tensors and nothing else.
# NB: A python function that return multiple arguments returns a single tuple,
Reported by Pylint.
Line: 104
Column: 18
if isinstance(batched_outputs, Tensor):
out_dim = out_dims_as_tuple[0]
return torch._remove_batch_dim(batched_outputs, vmap_level, batch_size, out_dim) # type: ignore[return-value]
return tuple(torch._remove_batch_dim(out, vmap_level, batch_size, out_dim)
for out, out_dim in zip(batched_outputs, out_dims_as_tuple))
# Checks that `fn` returned one or more Tensors and nothing else.
# NB: A python function that return multiple arguments returns a single tuple,
# so we are effectively checking that `outputs` is a single Tensor or a tuple of
Reported by Pylint.
Line: 83
Column: 23
batch_size = _validate_and_get_batch_size(flat_in_dims, flat_args)
# See NOTE [Ignored _remove_batch_dim, _add_batch_dim]
batched_inputs = [arg if in_dim is None else
torch._add_batch_dim(arg, in_dim, vmap_level)
for in_dim, arg in zip(flat_in_dims, flat_args)]
return tree_unflatten(batched_inputs, args_spec), batch_size
# Undos the batching (and any batch dimensions) associated with the `vmap_level`.
def _unwrap_batched(
Reported by Pylint.
Line: 103
Column: 16
# with '_', see #40397.
if isinstance(batched_outputs, Tensor):
out_dim = out_dims_as_tuple[0]
return torch._remove_batch_dim(batched_outputs, vmap_level, batch_size, out_dim) # type: ignore[return-value]
return tuple(torch._remove_batch_dim(out, vmap_level, batch_size, out_dim)
for out, out_dim in zip(batched_outputs, out_dims_as_tuple))
# Checks that `fn` returned one or more Tensors and nothing else.
# NB: A python function that return multiple arguments returns a single tuple,
Reported by Pylint.
Line: 104
Column: 18
if isinstance(batched_outputs, Tensor):
out_dim = out_dims_as_tuple[0]
return torch._remove_batch_dim(batched_outputs, vmap_level, batch_size, out_dim) # type: ignore[return-value]
return tuple(torch._remove_batch_dim(out, vmap_level, batch_size, out_dim)
for out, out_dim in zip(batched_outputs, out_dims_as_tuple))
# Checks that `fn` returned one or more Tensors and nothing else.
# NB: A python function that return multiple arguments returns a single tuple,
# so we are effectively checking that `outputs` is a single Tensor or a tuple of
Reported by Pylint.
Line: 260
Column: 22
@functools.wraps(func)
def wrapped(*args):
_check_out_dims_is_int_or_int_tuple(out_dims, func)
vmap_level = torch._C._vmapmode_increment_nesting()
try:
batched_inputs, batch_size = _create_batched_inputs(in_dims, args, vmap_level, func)
batched_outputs = func(*batched_inputs)
_validate_outputs(batched_outputs, func)
return _unwrap_batched(batched_outputs, out_dims, vmap_level, batch_size, func)
Reported by Pylint.
Line: 260
Column: 22
@functools.wraps(func)
def wrapped(*args):
_check_out_dims_is_int_or_int_tuple(out_dims, func)
vmap_level = torch._C._vmapmode_increment_nesting()
try:
batched_inputs, batch_size = _create_batched_inputs(in_dims, args, vmap_level, func)
batched_outputs = func(*batched_inputs)
_validate_outputs(batched_outputs, func)
return _unwrap_batched(batched_outputs, out_dims, vmap_level, batch_size, func)
Reported by Pylint.
Line: 267
Column: 13
_validate_outputs(batched_outputs, func)
return _unwrap_batched(batched_outputs, out_dims, vmap_level, batch_size, func)
finally:
torch._C._vmapmode_decrement_nesting()
return wrapped
Reported by Pylint.
Line: 267
Column: 13
_validate_outputs(batched_outputs, func)
return _unwrap_batched(batched_outputs, out_dims, vmap_level, batch_size, func)
finally:
torch._C._vmapmode_decrement_nesting()
return wrapped
Reported by Pylint.
torch/fx/experimental/fx_acc/acc_normalizer.py
21 issues
Line: 1
Column: 1
# type: ignore[]
import inspect
import re
from typing import NamedTuple, Optional, Callable, Dict, List, Tuple, Union, Any, Set
import torch.fx.experimental.fx_acc.acc_utils as acc_utils
import torch
import torch.fx
from torch.fx.node import _get_qualified_name
Reported by Pylint.
Line: 73
Column: 1
_acc_ops: Set[Callable] = set()
def _insert_fun(
op_and_target: Tuple[str, Union[str, Callable]],
arg_replacement_tuples: List[Tuple],
new_fn_target: Optional[Callable] = None,
custom_mapping_fn: Optional[Callable] = None,
kwargs_to_move_to_acc_out_ty: Optional[Optional[List[Tuple[str, str]]]] = None,
Reported by Pylint.
Line: 73
Column: 1
_acc_ops: Set[Callable] = set()
def _insert_fun(
op_and_target: Tuple[str, Union[str, Callable]],
arg_replacement_tuples: List[Tuple],
new_fn_target: Optional[Callable] = None,
custom_mapping_fn: Optional[Callable] = None,
kwargs_to_move_to_acc_out_ty: Optional[Optional[List[Tuple[str, str]]]] = None,
Reported by Pylint.
Line: 83
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
allow_normalize_from_torch_package=False,
):
if op_and_target[0] == "call_function":
assert callable(op_and_target[1])
elif op_and_target[0] == "call_method":
assert isinstance(op_and_target[1], str)
elif op_and_target[0] == "call_module":
assert isinstance(op_and_target[1], type)
Reported by Bandit.
Line: 85
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if op_and_target[0] == "call_function":
assert callable(op_and_target[1])
elif op_and_target[0] == "call_method":
assert isinstance(op_and_target[1], str)
elif op_and_target[0] == "call_module":
assert isinstance(op_and_target[1], type)
# Finalize arg replacement tuples.
# 1. Check to see if they have the `is_optional` bool, and if not defaulting it to
Reported by Bandit.
Line: 87
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
elif op_and_target[0] == "call_method":
assert isinstance(op_and_target[1], str)
elif op_and_target[0] == "call_module":
assert isinstance(op_and_target[1], type)
# Finalize arg replacement tuples.
# 1. Check to see if they have the `is_optional` bool, and if not defaulting it to
# False.
# 2. Some kwargs might have aliases. e.g. "a", "x" and "x1" are aliases of "input".
Reported by Bandit.
Line: 99
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if len(arg_replacement_tuple) == 2:
orig_kwarg, new_kwarg, is_optional = *arg_replacement_tuple, False
else:
assert len(arg_replacement_tuple) == 3
orig_kwarg, new_kwarg, is_optional = arg_replacement_tuple
if not isinstance(orig_kwarg, tuple):
orig_kwarg = (orig_kwarg,)
Reported by Bandit.
Line: 115
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
(tuple(orig_kwarg_set), new_kwarg, is_optional)
)
assert op_and_target not in _normalization_dict.keys()
norm_info = NormalizationInfo(
new_fn_target=new_fn_target,
arg_replacement_tuples=final_arg_replacement_tuples,
custom_mapping_fn=custom_mapping_fn,
kwargs_to_move_to_acc_out_ty=kwargs_to_move_to_acc_out_ty,
Reported by Bandit.
Line: 137
Column: 1
_normalization_dict[torch_package_op_and_target] = norm_info
def _get_dup_signature_tuples(fn: Callable) -> List[Tuple[str, str]]:
"""
Helper that inspects the arg signature of `fn` and returns a list of tuples, where
each tuple is a pair of duplicated names which is used for arg_replacement_tuples.
"""
sig_tuples: List[Tuple[str, str]] = []
Reported by Pylint.
Line: 167
Column: 1
Use this decorator to map a non-acc operator to an acc operator.
Args:
op_and_target: A tuple that contains op and target of the node that represents the non-acc operator.
arg_replacement_tuples: Please refer to the comment on above for `ArgReplacementTuplesType`.
kwargs_to_move_to_acc_out_ty: The kwargs we want to move out from the non-acc op kwargs to acc_out_ty.
"""
def insert(new_fn_target: Callable):
Reported by Pylint.
caffe2/quantization/server/group_norm_dnnlowp_op_test.py
21 issues
Line: 6
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import check_quantized_results_close
from hypothesis import given
Reported by Pylint.
Line: 11
Column: 1
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import check_quantized_results_close
from hypothesis import given
dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
Reported by Pylint.
Line: 43
Column: 9
out_quantized,
weight_quantized,
gc,
dc,
):
C = G * K
X = np.random.rand(N, C, H, W).astype(np.float32) * 5.0 - 1.0
if order == "NHWC":
Reported by Pylint.
Line: 1
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
Reported by Pylint.
Line: 18
Column: 1
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
class DNNLowPOpGroupNormTest(hu.HypothesisTestCase):
@given(
N=st.integers(0, 4),
G=st.integers(2, 4),
K=st.integers(2, 12),
H=st.integers(4, 16),
Reported by Pylint.
Line: 30
Column: 5
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_group_norm(
self,
N,
G,
K,
Reported by Pylint.
Line: 30
Column: 5
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_group_norm(
self,
N,
G,
K,
Reported by Pylint.
Line: 30
Column: 5
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_group_norm(
self,
N,
G,
K,
Reported by Pylint.
Line: 30
Column: 5
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_group_norm(
self,
N,
G,
K,
Reported by Pylint.
Line: 30
Column: 5
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_group_norm(
self,
N,
G,
K,
Reported by Pylint.
caffe2/python/operator_test/counter_ops_test.py
21 issues
Line: 69
Column: 17
workspace.RunOperatorOnce(core.CreateOperator(
'Save', ['serialized_c'], [], absolute_path=1,
db_type='minidb', db=tmp.name))
for i in range(10):
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['serialized_c'], ['t8']))
workspace.RunOperatorOnce(core.CreateOperator(
'RetrieveCount', ['serialized_c'], ['t8']))
assert workspace.FetchBlob('t8') == 12
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test_util import TestCase
import tempfile
Reported by Pylint.
Line: 8
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test_util import TestCase
import tempfile
class TestCounterOps(TestCase):
def test_counter_ops(self):
Reported by Pylint.
Line: 11
Column: 1
import tempfile
class TestCounterOps(TestCase):
def test_counter_ops(self):
workspace.RunOperatorOnce(core.CreateOperator(
'CreateCounter', [], ['c'], init_count=1))
Reported by Pylint.
Line: 13
Column: 5
class TestCounterOps(TestCase):
def test_counter_ops(self):
workspace.RunOperatorOnce(core.CreateOperator(
'CreateCounter', [], ['c'], init_count=1))
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t1'])) # 1 -> 0
Reported by Pylint.
Line: 13
Column: 5
class TestCounterOps(TestCase):
def test_counter_ops(self):
workspace.RunOperatorOnce(core.CreateOperator(
'CreateCounter', [], ['c'], init_count=1))
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t1'])) # 1 -> 0
Reported by Pylint.
Line: 19
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t1'])) # 1 -> 0
assert not workspace.FetchBlob('t1')
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t2'])) # 0 -> -1
assert workspace.FetchBlob('t2')
Reported by Bandit.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t2'])) # 0 -> -1
assert workspace.FetchBlob('t2')
workspace.RunOperatorOnce(core.CreateOperator(
'CountUp', ['c'], ['t21'])) # -1 -> 0
assert workspace.FetchBlob('t21') == -1
workspace.RunOperatorOnce(core.CreateOperator(
Reported by Bandit.
Line: 27
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
workspace.RunOperatorOnce(core.CreateOperator(
'CountUp', ['c'], ['t21'])) # -1 -> 0
assert workspace.FetchBlob('t21') == -1
workspace.RunOperatorOnce(core.CreateOperator(
'RetrieveCount', ['c'], ['t22']))
assert workspace.FetchBlob('t22') == 0
workspace.RunOperatorOnce(core.CreateOperator(
Reported by Bandit.
Line: 30
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert workspace.FetchBlob('t21') == -1
workspace.RunOperatorOnce(core.CreateOperator(
'RetrieveCount', ['c'], ['t22']))
assert workspace.FetchBlob('t22') == 0
workspace.RunOperatorOnce(core.CreateOperator(
'ResetCounter', ['c'], [], init_count=1)) # -> 1
workspace.RunOperatorOnce(core.CreateOperator(
'CountDown', ['c'], ['t3'])) # 1 -> 0
Reported by Bandit.