The following issues were found
torch/autograd/anomaly_mode.py
13 issues
Line: 72
Column: 21
"""
def __init__(self) -> None:
self.prev = torch.is_anomaly_enabled()
warnings.warn('Anomaly Detection has been enabled. '
'This mode will increase the runtime '
'and should only be enabled for debugging.', stacklevel=2)
def __enter__(self) -> None:
Reported by Pylint.
Line: 78
Column: 9
'and should only be enabled for debugging.', stacklevel=2)
def __enter__(self) -> None:
torch.set_anomaly_enabled(True)
def __exit__(self, *args: Any) -> None:
torch.set_anomaly_enabled(self.prev)
Reported by Pylint.
Line: 81
Column: 9
torch.set_anomaly_enabled(True)
def __exit__(self, *args: Any) -> None:
torch.set_anomaly_enabled(self.prev)
class set_detect_anomaly(object):
r"""Context-manager that sets the anomaly detection for the autograd engine on or off.
Reported by Pylint.
Line: 100
Column: 21
"""
def __init__(self, mode: bool) -> None:
self.prev = torch.is_anomaly_enabled()
torch.set_anomaly_enabled(mode)
def __enter__(self) -> None:
pass
Reported by Pylint.
Line: 101
Column: 9
def __init__(self, mode: bool) -> None:
self.prev = torch.is_anomaly_enabled()
torch.set_anomaly_enabled(mode)
def __enter__(self) -> None:
pass
def __exit__(self, *args: Any) -> None:
Reported by Pylint.
Line: 107
Column: 9
pass
def __exit__(self, *args: Any) -> None:
torch.set_anomaly_enabled(self.prev)
Reported by Pylint.
Line: 1
Column: 1
import torch
import warnings
from typing import Any
class detect_anomaly(object):
r"""Context-manager that enable anomaly detection for the autograd engine.
This does two things:
Reported by Pylint.
Line: 2
Column: 1
import torch
import warnings
from typing import Any
class detect_anomaly(object):
r"""Context-manager that enable anomaly detection for the autograd engine.
This does two things:
Reported by Pylint.
Line: 4
Column: 1
import torch
import warnings
from typing import Any
class detect_anomaly(object):
r"""Context-manager that enable anomaly detection for the autograd engine.
This does two things:
Reported by Pylint.
Line: 6
Column: 1
from typing import Any
class detect_anomaly(object):
r"""Context-manager that enable anomaly detection for the autograd engine.
This does two things:
- Running the forward pass with detection enabled will allow the backward
Reported by Pylint.
tools/code_coverage/package/tool/utils.py
13 issues
Line: 3
Column: 1
import subprocess
from ..util.setting import TestPlatform
from ..util.utils import print_error
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
Reported by Pylint.
Line: 4
Column: 1
import subprocess
from ..util.setting import TestPlatform
from ..util.utils import print_error
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
Reported by Pylint.
Line: 17
Column: 9
def get_tool_path_by_platform(platform: TestPlatform) -> str:
if platform == TestPlatform.FBCODE:
from caffe2.fb.code_coverage.tool.package.fbcode.utils import get_llvm_tool_path # type: ignore[import]
return get_llvm_tool_path() # type: ignore[no-any-return]
else:
from ..oss.utils import get_llvm_tool_path # type: ignore[no-redef]
Reported by Pylint.
Line: 21
Column: 9
return get_llvm_tool_path() # type: ignore[no-any-return]
else:
from ..oss.utils import get_llvm_tool_path # type: ignore[no-redef]
return get_llvm_tool_path() # type: ignore[no-any-return]
Reported by Pylint.
Line: 1
Column: 1
import subprocess
from ..util.setting import TestPlatform
from ..util.utils import print_error
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
Reported by Pylint.
Line: 1
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import subprocess
from ..util.setting import TestPlatform
from ..util.utils import print_error
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
Reported by Bandit.
Line: 7
Column: 1
from ..util.utils import print_error
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
subprocess.check_call(binary_file)
except subprocess.CalledProcessError:
print_error(f"Binary failed to run: {binary_file}")
Reported by Pylint.
Line: 10
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
def run_cpp_test(binary_file: str) -> None:
# cpp test binary
try:
subprocess.check_call(binary_file)
except subprocess.CalledProcessError:
print_error(f"Binary failed to run: {binary_file}")
def get_tool_path_by_platform(platform: TestPlatform) -> str:
Reported by Bandit.
Line: 15
Column: 1
print_error(f"Binary failed to run: {binary_file}")
def get_tool_path_by_platform(platform: TestPlatform) -> str:
if platform == TestPlatform.FBCODE:
from caffe2.fb.code_coverage.tool.package.fbcode.utils import get_llvm_tool_path # type: ignore[import]
return get_llvm_tool_path() # type: ignore[no-any-return]
else:
Reported by Pylint.
Line: 16
Column: 5
def get_tool_path_by_platform(platform: TestPlatform) -> str:
if platform == TestPlatform.FBCODE:
from caffe2.fb.code_coverage.tool.package.fbcode.utils import get_llvm_tool_path # type: ignore[import]
return get_llvm_tool_path() # type: ignore[no-any-return]
else:
from ..oss.utils import get_llvm_tool_path # type: ignore[no-redef]
Reported by Pylint.
torch/distributed/optim/functional_adadelta.py
13 issues
Line: 69
Column: 37
if param not in self.state:
self.state[param] = {}
state = self.state[param]
state['step'] = torch.tensor(0.0)
state['square_avg'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state['acc_delta'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state = self.state[param]
square_avgs.append(state['square_avg'])
Reported by Pylint.
Line: 70
Column: 81
self.state[param] = {}
state = self.state[param]
state['step'] = torch.tensor(0.0)
state['square_avg'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state['acc_delta'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state = self.state[param]
square_avgs.append(state['square_avg'])
acc_deltas.append(state['acc_delta'])
Reported by Pylint.
Line: 70
Column: 43
self.state[param] = {}
state = self.state[param]
state['step'] = torch.tensor(0.0)
state['square_avg'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state['acc_delta'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state = self.state[param]
square_avgs.append(state['square_avg'])
acc_deltas.append(state['acc_delta'])
Reported by Pylint.
Line: 71
Column: 80
state = self.state[param]
state['step'] = torch.tensor(0.0)
state['square_avg'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state['acc_delta'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state = self.state[param]
square_avgs.append(state['square_avg'])
acc_deltas.append(state['acc_delta'])
Reported by Pylint.
Line: 71
Column: 42
state = self.state[param]
state['step'] = torch.tensor(0.0)
state['square_avg'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state['acc_delta'] = torch.zeros_like(param, memory_format=torch.preserve_format)
state = self.state[param]
square_avgs.append(state['square_avg'])
acc_deltas.append(state['acc_delta'])
Reported by Pylint.
Line: 1
Column: 1
from typing import List, Dict, Optional
import torch
import torch.optim._functional as F
from torch import Tensor
# Define a TorchScript compatible Functional Adadelta Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
Reported by Pylint.
Line: 17
Column: 1
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalAdadelta(object):
def __init__(
self,
params: List[Tensor],
lr: float = 1.0,
rho: float = 0.9,
Reported by Pylint.
Line: 17
Column: 1
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalAdadelta(object):
def __init__(
self,
params: List[Tensor],
lr: float = 1.0,
rho: float = 0.9,
Reported by Pylint.
Line: 18
Column: 5
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalAdadelta(object):
def __init__(
self,
params: List[Tensor],
lr: float = 1.0,
rho: float = 0.9,
eps: float = 1e-6,
Reported by Pylint.
Line: 43
Column: 5
self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})
def step(self, gradients: List[Optional[Tensor]]):
params = self.param_group['params']
params_with_grad = []
grads = []
square_avgs = []
acc_deltas = []
Reported by Pylint.
torch/csrc/distributed/c10d/FileStore.cpp
12 issues
Line: 144
Column: 13
CWE codes:
362
while (true) {
#ifdef _WIN32
fd_ = syscall(std::bind(
::open, path.c_str(), flags | _O_BINARY, _S_IREAD | _S_IWRITE));
#else
fd_ = syscall(std::bind(::open, path.c_str(), flags, 0644));
#endif
// Only retry when the file doesn't exist, since we are waiting for the
// file to be created in this case to address the following issue:
Reported by FlawFinder.
Line: 146
Column: 33
CWE codes:
362
fd_ = syscall(std::bind(
::open, path.c_str(), flags | _O_BINARY, _S_IREAD | _S_IWRITE));
#else
fd_ = syscall(std::bind(::open, path.c_str(), flags, 0644));
#endif
// Only retry when the file doesn't exist, since we are waiting for the
// file to be created in this case to address the following issue:
// https://github.com/pytorch/pytorch/issues/13750
if (fd_ >= 0 || errno != ENOENT) {
Reported by FlawFinder.
Line: 204
Column: 8
CWE codes:
120
20
}
}
void read(void* buf, size_t count) {
while (count > 0) {
auto rv = syscall(std::bind(::read, fd_, buf, count));
SYSASSERT(rv, "read");
buf = (uint8_t*)buf + rv;
count -= rv;
Reported by FlawFinder.
Line: 206
Column: 37
CWE codes:
120
20
void read(void* buf, size_t count) {
while (count > 0) {
auto rv = syscall(std::bind(::read, fd_, buf, count));
SYSASSERT(rv, "read");
buf = (uint8_t*)buf + rv;
count -= rv;
}
}
Reported by FlawFinder.
Line: 227
Column: 8
CWE codes:
120
20
write(data.data(), len);
}
void read(std::string& str) {
uint32_t len;
read(&len, sizeof(len));
std::vector<uint8_t> buf(len);
read(buf.data(), len);
str.assign(buf.begin(), buf.end());
Reported by FlawFinder.
Line: 229
Column: 5
CWE codes:
120
20
void read(std::string& str) {
uint32_t len;
read(&len, sizeof(len));
std::vector<uint8_t> buf(len);
read(buf.data(), len);
str.assign(buf.begin(), buf.end());
}
Reported by FlawFinder.
Line: 231
Column: 5
CWE codes:
120
20
uint32_t len;
read(&len, sizeof(len));
std::vector<uint8_t> buf(len);
read(buf.data(), len);
str.assign(buf.begin(), buf.end());
}
void read(std::vector<uint8_t>& data) {
uint32_t len;
Reported by FlawFinder.
Line: 235
Column: 8
CWE codes:
120
20
str.assign(buf.begin(), buf.end());
}
void read(std::vector<uint8_t>& data) {
uint32_t len;
read(&len, sizeof(len));
data.resize(len);
read(data.data(), len);
}
Reported by FlawFinder.
torch/distributed/rpc/_testing/faulty_agent_backend_registry.py
12 issues
Line: 39
Column: 5
num_fail_sends,
**kwargs
):
from . import FaultyTensorPipeRpcBackendOptions
return FaultyTensorPipeRpcBackendOptions(
num_worker_threads=num_worker_threads,
rpc_timeout=rpc_timeout,
init_method=init_method,
Reported by Pylint.
Line: 54
Column: 5
def _faulty_tensorpipe_init_backend_handler(
store, name, rank, world_size, rpc_backend_options
):
from . import FaultyTensorPipeAgent
from . import FaultyTensorPipeRpcBackendOptions
from torch.distributed.rpc import api
if not isinstance(store, dist.Store):
raise TypeError("`store` must be a c10d::Store. {}".format(store))
Reported by Pylint.
Line: 55
Column: 5
store, name, rank, world_size, rpc_backend_options
):
from . import FaultyTensorPipeAgent
from . import FaultyTensorPipeRpcBackendOptions
from torch.distributed.rpc import api
if not isinstance(store, dist.Store):
raise TypeError("`store` must be a c10d::Store. {}".format(store))
Reported by Pylint.
Line: 31
Column: 1
def _faulty_tensorpipe_construct_rpc_backend_options_handler(
rpc_timeout,
init_method,
num_worker_threads,
messages_to_fail,
messages_to_delay,
num_fail_sends,
Reported by Pylint.
Line: 81
Column: 5
{}, # reverse_device_map
[], # devices
)
api._init_rpc_states(agent)
return agent
rpc.backend_registry.register_backend(
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
import torch.distributed as dist
import torch.distributed.rpc as rpc
from torch.distributed.rpc import constants as rpc_constants
def _init_process_group(store, rank, world_size):
# Initialize ProcessGroup.
process_group_timeout = rpc_constants.DEFAULT_PROCESS_GROUP_TIMEOUT
Reported by Pylint.
Line: 15
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# default group to be initialized.
group = dist.ProcessGroupGloo(store, rank, world_size, process_group_timeout)
assert group is not None, "Failed to initialize default ProcessGroup."
if (rank != -1) and (rank != group.rank()):
raise RuntimeError(
"rank argument {} doesn't match pg rank {}".format(rank, group.rank())
)
Reported by Bandit.
Line: 30
Column: 1
return group
def _faulty_tensorpipe_construct_rpc_backend_options_handler(
rpc_timeout,
init_method,
num_worker_threads,
messages_to_fail,
messages_to_delay,
Reported by Pylint.
Line: 39
Column: 5
num_fail_sends,
**kwargs
):
from . import FaultyTensorPipeRpcBackendOptions
return FaultyTensorPipeRpcBackendOptions(
num_worker_threads=num_worker_threads,
rpc_timeout=rpc_timeout,
init_method=init_method,
Reported by Pylint.
Line: 54
Column: 5
def _faulty_tensorpipe_init_backend_handler(
store, name, rank, world_size, rpc_backend_options
):
from . import FaultyTensorPipeAgent
from . import FaultyTensorPipeRpcBackendOptions
from torch.distributed.rpc import api
if not isinstance(store, dist.Store):
raise TypeError("`store` must be a c10d::Store. {}".format(store))
Reported by Pylint.
torch/distributed/pipeline/sync/skip/layout.py
12 issues
Line: 12
Column: 1
from torch import nn
from .namespace import Namespace
__all__: List[str] = []
class SkipLayout:
Reported by Pylint.
Line: 69
Column: 5
# NOTE(sublee): Hide circular import inside this subroutine. Circular
# import is not ideal but placing this logic near to SkipLayout may
# increase cohesion of code.
from .skippable import Skippable
skip_routes: Dict[Tuple[Namespace, str], Tuple[int, int]] = {}
stashed_at: Dict[Tuple[Namespace, str], int] = {}
for j, partition in enumerate(partitions):
Reported by Pylint.
Line: 80
Column: 42
return
for ns, name in layer.stashable():
stashed_at[(ns, name)] = j
for ns, name in layer.poppable():
prev_j = stashed_at.pop((ns, name))
skip_routes[(ns, name)] = (prev_j, j)
Reported by Pylint.
Line: 84
Column: 52
for ns, name in layer.poppable():
prev_j = stashed_at.pop((ns, name))
skip_routes[(ns, name)] = (prev_j, j)
if isinstance(partition, nn.Sequential):
for layer in partition:
inspect_layer(layer)
else:
Reported by Pylint.
Line: 26
Column: 1
# Skip routes indexed by partition number 'j': [[next_j]: [(prev_j, ns, name), ...], ...]
by_partition: List[List[Tuple[int, Namespace, str]]]
def __init__(self, num_partitions: int, skip_routes: Dict[Tuple[Namespace, str], Tuple[int, int]],) -> None:
# The skip routes are already indexed by 'ns, name'.
self.by_ns_name = skip_routes
# Index skip routes by partition number 'j'.
self.by_partition = [[] for _ in range(num_partitions)]
Reported by Pylint.
Line: 33
Column: 14
# Index skip routes by partition number 'j'.
self.by_partition = [[] for _ in range(num_partitions)]
for (ns, name), (prev_j, next_j) in skip_routes.items():
self.by_partition[next_j].append((prev_j, ns, name))
for p in self.by_partition:
p.sort()
Reported by Pylint.
Line: 36
Column: 13
for (ns, name), (prev_j, next_j) in skip_routes.items():
self.by_partition[next_j].append((prev_j, ns, name))
for p in self.by_partition:
p.sort()
def copy_policy(self, next_j: int) -> Iterable[Tuple[int, Namespace, str]]:
"""Generates skip routes for the given destination partition number.
The skip routes are sorted by source partition number in ascending
Reported by Pylint.
Line: 48
Column: 21
Each tuple of (source partition number, namespace, name).
"""
for prev_j, ns, name in self.by_partition[next_j]:
if prev_j == next_j:
# This skip tensor will be popped at the same partition where
# it is stashed. In this case, copy is not required.
continue
Reported by Pylint.
Line: 56
Column: 5
yield (prev_j, ns, name)
def requires_copy(self, ns: Namespace, name: str) -> bool:
"""Whether the given namespace and name requires partition-to-partition
copy or not.
"""
prev_j, next_j = self.by_ns_name.get((ns, name), (-1, -1))
return prev_j != next_j
Reported by Pylint.
Line: 69
Column: 5
# NOTE(sublee): Hide circular import inside this subroutine. Circular
# import is not ideal but placing this logic near to SkipLayout may
# increase cohesion of code.
from .skippable import Skippable
skip_routes: Dict[Tuple[Namespace, str], Tuple[int, int]] = {}
stashed_at: Dict[Tuple[Namespace, str], int] = {}
for j, partition in enumerate(partitions):
Reported by Pylint.
test/test_show_pickle.py
12 issues
Line: 4
Column: 1
import unittest
import io
import tempfile
import torch
import torch.utils.show_pickle
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
Reported by Pylint.
Line: 5
Column: 1
import io
import tempfile
import torch
import torch.utils.show_pickle
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
Reported by Pylint.
Line: 7
Column: 1
import torch
import torch.utils.show_pickle
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
Reported by Pylint.
Line: 1
Column: 1
import unittest
import io
import tempfile
import torch
import torch.utils.show_pickle
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
Reported by Pylint.
Line: 9
Column: 1
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
class MyCoolModule(torch.nn.Module):
def __init__(self, weight):
Reported by Pylint.
Line: 9
Column: 1
from torch.testing._internal.common_utils import TestCase, run_tests, IS_WINDOWS
class TestShowPickle(TestCase):
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
class MyCoolModule(torch.nn.Module):
def __init__(self, weight):
Reported by Pylint.
Line: 12
Column: 5
class TestShowPickle(TestCase):
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
class MyCoolModule(torch.nn.Module):
def __init__(self, weight):
super().__init__()
self.weight = weight
Reported by Pylint.
Line: 13
Column: 9
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
class MyCoolModule(torch.nn.Module):
def __init__(self, weight):
super().__init__()
self.weight = weight
def forward(self, x):
Reported by Pylint.
Line: 13
Column: 9
@unittest.skipIf(IS_WINDOWS, "Can't re-open temp file on Windows")
def test_scripted_model(self):
class MyCoolModule(torch.nn.Module):
def __init__(self, weight):
super().__init__()
self.weight = weight
def forward(self, x):
Reported by Pylint.
Line: 18
Column: 13
super().__init__()
self.weight = weight
def forward(self, x):
return x * self.weight
m = torch.jit.script(MyCoolModule(torch.tensor([2.0])))
with tempfile.NamedTemporaryFile() as tmp:
Reported by Pylint.
tools/test/test_max_tokens_pragma.py
12 issues
Line: 2
Column: 1
import unittest
from tools.linter.clang_tidy.max_tokens_pragma import (
add_max_tokens_pragma,
strip_max_tokens_pragmas,
)
def compare_code(a: str, b: str) -> bool:
a_lines = [line.strip() for line in a.splitlines()]
Reported by Pylint.
Line: 16
Column: 9
class TestMaxTokensPragma(unittest.TestCase):
def test_no_prior_pragmas(self) -> None:
input = """\
// File without any prior pragmas
int main() {
for (int i = 0; i < 10; i++);
return 0;
Reported by Pylint.
Line: 41
Column: 9
self.assertTrue(compare_code(output, input))
def test_single_prior_pragma(self) -> None:
input = """\
// File with prior pragmas
#pragma clang max_tokens_total 1
int main() {
Reported by Pylint.
Line: 79
Column: 9
self.assertTrue(compare_code(output, stripped))
def test_multiple_prior_pragmas(self) -> None:
input = """\
// File with multiple prior pragmas
#pragma clang max_tokens_total 1
// Different pragma; script should ignore this
Reported by Pylint.
Line: 1
Column: 1
import unittest
from tools.linter.clang_tidy.max_tokens_pragma import (
add_max_tokens_pragma,
strip_max_tokens_pragmas,
)
def compare_code(a: str, b: str) -> bool:
a_lines = [line.strip() for line in a.splitlines()]
Reported by Pylint.
Line: 8
Column: 1
)
def compare_code(a: str, b: str) -> bool:
a_lines = [line.strip() for line in a.splitlines()]
b_lines = [line.strip() for line in b.splitlines()]
return a_lines == b_lines
Reported by Pylint.
Line: 8
Column: 1
)
def compare_code(a: str, b: str) -> bool:
a_lines = [line.strip() for line in a.splitlines()]
b_lines = [line.strip() for line in b.splitlines()]
return a_lines == b_lines
Reported by Pylint.
Line: 8
Column: 1
)
def compare_code(a: str, b: str) -> bool:
a_lines = [line.strip() for line in a.splitlines()]
b_lines = [line.strip() for line in b.splitlines()]
return a_lines == b_lines
Reported by Pylint.
Line: 14
Column: 1
return a_lines == b_lines
class TestMaxTokensPragma(unittest.TestCase):
def test_no_prior_pragmas(self) -> None:
input = """\
// File without any prior pragmas
int main() {
Reported by Pylint.
Line: 15
Column: 5
class TestMaxTokensPragma(unittest.TestCase):
def test_no_prior_pragmas(self) -> None:
input = """\
// File without any prior pragmas
int main() {
for (int i = 0; i < 10; i++);
Reported by Pylint.
torch/autograd/_functions/tensor.py
12 issues
Line: 4
Column: 1
from functools import reduce
import torch
import torch._utils
from ..function import Function
class Type(Function):
@staticmethod
Reported by Pylint.
Line: 24
Column: 3
return grad_output.type(ctx.input_type), None
# TODO: deprecate this
class Resize(Function):
@staticmethod
def forward(ctx, tensor, sizes):
ctx.sizes = sizes
Reported by Pylint.
Line: 1
Column: 1
from functools import reduce
import torch
import torch._utils
from ..function import Function
class Type(Function):
@staticmethod
Reported by Pylint.
Line: 7
Column: 1
from ..function import Function
class Type(Function):
@staticmethod
def forward(ctx, i, dest_type):
ctx.input_type = type(i)
ctx.input_device = -1 if not i.is_cuda else i.get_device()
Reported by Pylint.
Line: 10
Column: 5
class Type(Function):
@staticmethod
def forward(ctx, i, dest_type):
ctx.input_type = type(i)
ctx.input_device = -1 if not i.is_cuda else i.get_device()
return i.type(dest_type)
@staticmethod
Reported by Pylint.
Line: 16
Column: 5
return i.type(dest_type)
@staticmethod
def backward(ctx, grad_output):
if ctx.input_device == -1:
return grad_output.type(ctx.input_type), None
else:
with torch.cuda.device(ctx.input_device):
return grad_output.type(ctx.input_type), None
Reported by Pylint.
Line: 17
Column: 9
@staticmethod
def backward(ctx, grad_output):
if ctx.input_device == -1:
return grad_output.type(ctx.input_type), None
else:
with torch.cuda.device(ctx.input_device):
return grad_output.type(ctx.input_type), None
Reported by Pylint.
Line: 25
Column: 1
# TODO: deprecate this
class Resize(Function):
@staticmethod
def forward(ctx, tensor, sizes):
ctx.sizes = sizes
ctx.numel = reduce(lambda x, y: x * y, sizes, 1)
Reported by Pylint.
Line: 28
Column: 5
class Resize(Function):
@staticmethod
def forward(ctx, tensor, sizes):
ctx.sizes = sizes
ctx.numel = reduce(lambda x, y: x * y, sizes, 1)
if tensor.numel() != ctx.numel:
raise RuntimeError(("requested resize to {} ({} elements in total), "
"but the given tensor has a size of {} ({} elements). "
Reported by Pylint.
Line: 42
Column: 9
if tensor.is_quantized:
tensor.copy_(tensor)
return tensor.contiguous().view(*sizes)
if tensor.is_contiguous():
result = tensor.new(tensor).contiguous().view(*sizes)
return result
else:
return tensor.contiguous().view(*sizes)
Reported by Pylint.
test/package/test_resources.py
12 issues
Line: 7
Column: 1
from textwrap import dedent
from unittest import skipIf
from torch.package import PackageExporter, PackageImporter
from torch.testing._internal.common_utils import run_tests
try:
from .common import PackageTestCase
except ImportError:
Reported by Pylint.
Line: 8
Column: 1
from unittest import skipIf
from torch.package import PackageExporter, PackageImporter
from torch.testing._internal.common_utils import run_tests
try:
from .common import PackageTestCase
except ImportError:
# Support the case where we run this file directly.
Reported by Pylint.
Line: 1
Column: 1
# -*- coding: utf-8 -*-
from io import BytesIO
from sys import version_info
from textwrap import dedent
from unittest import skipIf
from torch.package import PackageExporter, PackageImporter
from torch.testing._internal.common_utils import run_tests
Reported by Pylint.
Line: 24
Column: 41
def test_resource_reader(self):
"""Test compliance with the get_resource_reader importlib API."""
buffer = BytesIO()
with PackageExporter(buffer) as pe:
# Layout looks like:
# package
# ├── one/
# │ ├── a.txt
# │ ├── b.txt
Reported by Pylint.
Line: 92
Column: 41
"""
)
buffer = BytesIO()
with PackageExporter(buffer) as pe:
pe.save_source_string("foo.bar", mod_src)
pe.save_text("my_cool_resources", "sekrit.txt", "my sekrit plays")
buffer.seek(0)
importer = PackageImporter(buffer)
Reported by Pylint.
Line: 102
Column: 5
importer.import_module("foo.bar").secret_message(), "my sekrit plays"
)
def test_importer_access(self):
buffer = BytesIO()
with PackageExporter(buffer) as he:
he.save_text("main", "main", "my string")
he.save_binary("main", "main_binary", "my string".encode("utf-8"))
src = dedent(
Reported by Pylint.
Line: 104
Column: 41
def test_importer_access(self):
buffer = BytesIO()
with PackageExporter(buffer) as he:
he.save_text("main", "main", "my string")
he.save_binary("main", "main_binary", "my string".encode("utf-8"))
src = dedent(
"""\
import importlib
Reported by Pylint.
Line: 118
Column: 9
)
he.save_source_string("main", src, is_package=True)
buffer.seek(0)
hi = PackageImporter(buffer)
m = hi.import_module("main")
self.assertEqual(m.t, "my string")
self.assertEqual(m.b, "my string".encode("utf-8"))
def test_resource_access_by_path(self):
Reported by Pylint.
Line: 119
Column: 9
he.save_source_string("main", src, is_package=True)
buffer.seek(0)
hi = PackageImporter(buffer)
m = hi.import_module("main")
self.assertEqual(m.t, "my string")
self.assertEqual(m.b, "my string".encode("utf-8"))
def test_resource_access_by_path(self):
"""
Reported by Pylint.
Line: 128
Column: 41
Tests that packaged code can used importlib.resources.path.
"""
buffer = BytesIO()
with PackageExporter(buffer) as he:
he.save_binary("string_module", "my_string", "my string".encode("utf-8"))
src = dedent(
"""\
import importlib.resources
import string_module
Reported by Pylint.