The following issues were found
caffe2/python/operator_test/loss_ops_test.py
11 issues
Line: 9
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
class TestLossOps(serial.SerializedTestCase):
Reported by Pylint.
Line: 16
Column: 41
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
Reported by Pylint.
Line: 13
Column: 1
import numpy as np
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
Reported by Pylint.
Line: 16
Column: 5
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 16
Column: 5
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 16
Column: 5
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 16
Column: 5
class TestLossOps(serial.SerializedTestCase):
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 17
Column: 9
@serial.given(n=st.integers(1, 8), **hu.gcs)
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
op = core.CreateOperator(
Reported by Pylint.
Line: 19
Column: 9
def test_averaged_loss(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
op = core.CreateOperator(
"AveragedLoss",
["X"],
Reported by Pylint.
caffe2/python/operator_fp_exceptions_test.py
11 issues
Line: 34
Column: 13
exception_raised = False
try:
workspace.RunNetOnce(net)
except Exception as e:
exception_raised = True
self.assertEquals(exception_raised, throw_if_fp_exceptions)
if __name__ == '__main__':
Reported by Pylint.
Line: 34
Column: 20
exception_raised = False
try:
workspace.RunNetOnce(net)
except Exception as e:
exception_raised = True
self.assertEquals(exception_raised, throw_if_fp_exceptions)
if __name__ == '__main__':
Reported by Pylint.
Line: 36
Column: 13
workspace.RunNetOnce(net)
except Exception as e:
exception_raised = True
self.assertEquals(exception_raised, throw_if_fp_exceptions)
if __name__ == '__main__':
unittest.main()
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test_util import TestCase
import numpy as np
import unittest
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python.test_util import TestCase
import numpy as np
import unittest
def setThrowIfFpExceptions(enabled):
core.GlobalInit(["caffe2", "--caffe2_operator_throw_if_fp_exceptions=%d" % (1 if enabled else 0)])
Reported by Pylint.
Line: 12
Column: 1
import unittest
def setThrowIfFpExceptions(enabled):
core.GlobalInit(["caffe2", "--caffe2_operator_throw_if_fp_exceptions=%d" % (1 if enabled else 0)])
class OperatorFPExceptionsTest(TestCase):
def test_fp_exception_divbyzero(self):
Reported by Pylint.
Line: 12
Column: 1
import unittest
def setThrowIfFpExceptions(enabled):
core.GlobalInit(["caffe2", "--caffe2_operator_throw_if_fp_exceptions=%d" % (1 if enabled else 0)])
class OperatorFPExceptionsTest(TestCase):
def test_fp_exception_divbyzero(self):
Reported by Pylint.
Line: 13
Column: 1
def setThrowIfFpExceptions(enabled):
core.GlobalInit(["caffe2", "--caffe2_operator_throw_if_fp_exceptions=%d" % (1 if enabled else 0)])
class OperatorFPExceptionsTest(TestCase):
def test_fp_exception_divbyzero(self):
# This test asserts the followings
Reported by Pylint.
Line: 16
Column: 1
core.GlobalInit(["caffe2", "--caffe2_operator_throw_if_fp_exceptions=%d" % (1 if enabled else 0)])
class OperatorFPExceptionsTest(TestCase):
def test_fp_exception_divbyzero(self):
# This test asserts the followings
# - If flag caffe2_operator_throw_if_fp_exceptions is set,
# floating point exceptions will be thrown
# - If flag caffe2_operator_throw_if_fp_exceptions is not set,
Reported by Pylint.
Line: 17
Column: 5
class OperatorFPExceptionsTest(TestCase):
def test_fp_exception_divbyzero(self):
# This test asserts the followings
# - If flag caffe2_operator_throw_if_fp_exceptions is set,
# floating point exceptions will be thrown
# - If flag caffe2_operator_throw_if_fp_exceptions is not set,
# floating point exceptions will not be thrown
Reported by Pylint.
test/distributed/elastic/utils/distributed_test.py
11 issues
Line: 15
Column: 1
import unittest
from contextlib import closing
from torch.distributed.elastic.utils.distributed import (
create_c10d_store,
get_free_port,
get_socket_with_port,
)
from torch.testing._internal.common_utils import IS_MACOS, IS_WINDOWS, run_tests
Reported by Pylint.
Line: 20
Column: 1
get_free_port,
get_socket_with_port,
)
from torch.testing._internal.common_utils import IS_MACOS, IS_WINDOWS, run_tests
def _create_c10d_store_mp(is_server, server_addr, port, world_size):
store = create_c10d_store(is_server, server_addr, port, world_size, timeout=2)
if store is None:
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import multiprocessing as mp
import os
Reported by Pylint.
Line: 35
Column: 1
print("tests incompatible with tsan or asan", file=sys.stderr)
sys.exit(0)
class DistributedUtilTest(unittest.TestCase):
def test_create_store_single_server(self):
store = create_c10d_store(is_server=True, server_addr=socket.gethostname())
self.assertIsNotNone(store)
def test_create_store_no_port_multi(self):
Reported by Pylint.
Line: 36
Column: 5
sys.exit(0)
class DistributedUtilTest(unittest.TestCase):
def test_create_store_single_server(self):
store = create_c10d_store(is_server=True, server_addr=socket.gethostname())
self.assertIsNotNone(store)
def test_create_store_no_port_multi(self):
with self.assertRaises(ValueError):
Reported by Pylint.
Line: 40
Column: 5
store = create_c10d_store(is_server=True, server_addr=socket.gethostname())
self.assertIsNotNone(store)
def test_create_store_no_port_multi(self):
with self.assertRaises(ValueError):
create_c10d_store(
is_server=True, server_addr=socket.gethostname(), world_size=2
)
Reported by Pylint.
Line: 46
Column: 5
is_server=True, server_addr=socket.gethostname(), world_size=2
)
def test_create_store_multi(self):
world_size = 3
server_port = get_free_port()
localhost = socket.gethostname()
worker0 = mp.Process(
target=_create_c10d_store_mp,
Reported by Pylint.
Line: 85
Column: 5
self.assertEqual(0, worker0.exitcode)
self.assertEqual(0, worker1.exitcode)
def test_create_store_timeout_on_server(self):
with self.assertRaises(TimeoutError):
port = get_free_port()
create_c10d_store(
is_server=True,
server_addr=socket.gethostname(),
Reported by Pylint.
Line: 96
Column: 5
timeout=1,
)
def test_create_store_timeout_on_worker(self):
with self.assertRaises(TimeoutError):
port = get_free_port()
create_c10d_store(
is_server=False,
server_addr=socket.gethostname(),
Reported by Pylint.
Line: 107
Column: 5
timeout=1,
)
def test_port_already_in_use_on_server(self):
# try to create the TCPStore server twice on the same port
# the second should fail due to a port conflict
# first store binds onto a free port
# try creating the second store on the port that the first store binded to
server_addr = socket.gethostname()
Reported by Pylint.
test/distributed/elastic/multiprocessing/tail_log_test.py
11 issues
Line: 21
Column: 1
from typing import Dict, Set
from unittest import mock
from torch.distributed.elastic.multiprocessing.tail_log import TailLog
def write(max: int, sleep: float, file: str):
with open(file, "w") as fp:
for i in range(max):
Reported by Pylint.
Line: 24
Column: 11
from torch.distributed.elastic.multiprocessing.tail_log import TailLog
def write(max: int, sleep: float, file: str):
with open(file, "w") as fp:
for i in range(max):
print(i, file=fp, flush=True)
time.sleep(sleep)
Reported by Pylint.
Line: 46
Column: 9
and validate that all lines are accounted for.
"""
nprocs = 32
max = 1000
interval_sec = 0.0001
log_files = {
local_rank: os.path.join(self.test_dir, f"{local_rank}_stdout.log")
for local_rank in range(nprocs)
Reported by Pylint.
Line: 104
Column: 25
tail = TailLog("writer", log_files={0: "foobar.log"}, dst=sys.stdout).start()
tail.stop()
self.assertTrue(tail.stopped())
self.assertTrue(tail._threadpool._shutdown)
@mock.patch("torch.distributed.elastic.multiprocessing.tail_log.log")
def test_tail_logfile_error_in_tail_fn(self, mock_logger):
"""
Ensures that when there is an error in the tail_fn (the one that runs in the
Reported by Pylint.
Line: 104
Column: 25
tail = TailLog("writer", log_files={0: "foobar.log"}, dst=sys.stdout).start()
tail.stop()
self.assertTrue(tail.stopped())
self.assertTrue(tail._threadpool._shutdown)
@mock.patch("torch.distributed.elastic.multiprocessing.tail_log.log")
def test_tail_logfile_error_in_tail_fn(self, mock_logger):
"""
Ensures that when there is an error in the tail_fn (the one that runs in the
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import io
import os
Reported by Pylint.
Line: 24
Column: 1
from torch.distributed.elastic.multiprocessing.tail_log import TailLog
def write(max: int, sleep: float, file: str):
with open(file, "w") as fp:
for i in range(max):
print(i, file=fp, flush=True)
time.sleep(sleep)
Reported by Pylint.
Line: 25
Column: 29
def write(max: int, sleep: float, file: str):
with open(file, "w") as fp:
for i in range(max):
print(i, file=fp, flush=True)
time.sleep(sleep)
Reported by Pylint.
Line: 31
Column: 1
time.sleep(sleep)
class TailLogTest(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp(prefix=f"{self.__class__.__name__}_")
self.threadpool = ThreadPoolExecutor()
def tearDown(self):
Reported by Pylint.
Line: 39
Column: 5
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_tail(self):
"""
writer() writes 0 - max (on number on each line) to a log file.
Run nprocs such writers and tail the log files into an IOString
and validate that all lines are accounted for.
"""
Reported by Pylint.
tools/linter/clang_tidy/generate_build_files.py
11 issues
Line: 9
Column: 14
def run_cmd(cmd: List[str]) -> None:
print(f"Running: {cmd}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
stdout, stderr = result.stdout.decode("utf-8").strip(), result.stderr.decode("utf-8").strip()
print(stdout)
print(stderr)
if result.returncode != 0:
print(f"Failed to run {cmd}")
Reported by Pylint.
Line: 1
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import subprocess
import sys
import os
from typing import List
def run_cmd(cmd: List[str]) -> None:
print(f"Running: {cmd}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
Reported by Bandit.
Line: 1
Column: 1
import subprocess
import sys
import os
from typing import List
def run_cmd(cmd: List[str]) -> None:
print(f"Running: {cmd}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
Reported by Pylint.
Line: 7
Column: 1
from typing import List
def run_cmd(cmd: List[str]) -> None:
print(f"Running: {cmd}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
stdout, stderr = result.stdout.decode("utf-8").strip(), result.stderr.decode("utf-8").strip()
print(stdout)
print(stderr)
Reported by Pylint.
Line: 9
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
def run_cmd(cmd: List[str]) -> None:
print(f"Running: {cmd}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
stdout, stderr = result.stdout.decode("utf-8").strip(), result.stderr.decode("utf-8").strip()
print(stdout)
print(stderr)
if result.returncode != 0:
print(f"Failed to run {cmd}")
Reported by Bandit.
Line: 15
Column: 9
print(stderr)
if result.returncode != 0:
print(f"Failed to run {cmd}")
exit(1)
def run_timed_cmd(cmd: List[str]) -> None:
run_cmd(["time"] + cmd)
Reported by Pylint.
Line: 18
Column: 1
exit(1)
def run_timed_cmd(cmd: List[str]) -> None:
run_cmd(["time"] + cmd)
def update_submodules() -> None:
run_cmd(["git", "submodule", "update", "--init", "--recursive"])
Reported by Pylint.
Line: 22
Column: 1
run_cmd(["time"] + cmd)
def update_submodules() -> None:
run_cmd(["git", "submodule", "update", "--init", "--recursive"])
def gen_compile_commands() -> None:
os.environ["USE_NCCL"] = "0"
Reported by Pylint.
Line: 26
Column: 1
run_cmd(["git", "submodule", "update", "--init", "--recursive"])
def gen_compile_commands() -> None:
os.environ["USE_NCCL"] = "0"
os.environ["USE_DEPLOY"] = "1"
os.environ["CC"] = "clang"
os.environ["CXX"] = "clang++"
run_timed_cmd([sys.executable, "setup.py", "--cmake-only", "build"])
Reported by Pylint.
Line: 34
Column: 1
run_timed_cmd([sys.executable, "setup.py", "--cmake-only", "build"])
def run_autogen() -> None:
run_timed_cmd(
[
sys.executable,
"-m",
"tools.codegen.gen",
Reported by Pylint.
torch/distributed/algorithms/ddp_comm_hooks/__init__.py
11 issues
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
quantization_hooks as quantization,
)
Reported by Pylint.
Line: 1
Column: 1
from enum import Enum
from functools import partial
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from . import (
default_hooks as default,
powerSGD_hook as powerSGD,
Reported by Pylint.
Line: 18
Column: 1
model.register_comm_hook(state, comm_hook)
def _powerSGD_comm_hook_wrapper(
comm_hook,
model,
state,
matrix_approximation_rank,
start_powerSGD_iter,
Reported by Pylint.
Line: 18
Column: 1
model.register_comm_hook(state, comm_hook)
def _powerSGD_comm_hook_wrapper(
comm_hook,
model,
state,
matrix_approximation_rank,
start_powerSGD_iter,
Reported by Pylint.
Line: 26
Column: 1
start_powerSGD_iter,
):
"""
To be consistent with the wrappers of other DDP comm hooks, the input state only needs to be a process group,
which will be wrapped up with other state info.
"""
powerSGD_state = powerSGD.PowerSGDState(
process_group=state,
matrix_approximation_rank=matrix_approximation_rank,
Reported by Pylint.
torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py
11 issues
Line: 57
Column: 44
group_to_use = process_group if process_group is not None else dist.group.WORLD
world_size = group_to_use.size()
compressed_tensor = bucket.buffer().to(torch.float16).div_(world_size)
fut = dist.all_reduce(
compressed_tensor, group=group_to_use, async_op=True
).get_future()
Reported by Pylint.
Line: 148
Column: 46
hook_state, bucket: dist.GradBucket
) -> torch.futures.Future[torch.Tensor]:
# Cast bucket tensor to FP16.
bucket.set_buffer(bucket.buffer().to(torch.float16))
fut = hook(hook_state, bucket)
def decompress(fut):
decompressed_tensor = bucket.buffer()
Reported by Pylint.
Line: 96
Column: 3
)
# TODO: Add an example to use such a wrapper.
def _hook_then_optimizer(
hook: Callable[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]],
optimizer_state: _OptimizerHookState,
) -> Callable[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]:
r"""
Reported by Pylint.
Line: 115
Column: 28
# Run original hook
fut = hook(hook_state, bucket)
def optimizer_step(fut):
gradient_tensors = bucket.gradients()
model_params = bucket.parameters()
for grad_tensor, model_param in zip(gradient_tensors, model_params):
optimizer_state.functional_optimizer.step_param(
model_param,
Reported by Pylint.
Line: 1
Column: 1
from typing import Any, Callable
import torch
import torch.distributed as dist
def _allreduce_fut(
process_group: dist.ProcessGroup, tensor: torch.Tensor
) -> torch.futures.Future[torch.Tensor]:
Reported by Pylint.
Line: 46
Column: 1
) -> torch.futures.Future[torch.Tensor]:
"""
This DDP communication hook implements a simple gradient compression
approach that casts ``GradBucket`` tensor to half-precision floating-point format (``torch.float16``)
and then divides it by the process group size.
It allreduces those ``float16`` gradient tensors. Once compressed gradient
tensors are allreduced, the chained callback ``decompress`` casts it back to the input data type (such as ``float32``).
Example::
Reported by Pylint.
Line: 49
Column: 1
approach that casts ``GradBucket`` tensor to half-precision floating-point format (``torch.float16``)
and then divides it by the process group size.
It allreduces those ``float16`` gradient tensors. Once compressed gradient
tensors are allreduced, the chained callback ``decompress`` casts it back to the input data type (such as ``float32``).
Example::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
"""
group_to_use = process_group if process_group is not None else dist.group.WORLD
Reported by Pylint.
Line: 73
Column: 1
return fut.then(decompress)
class _OptimizerHookState(object):
"""
Holds state for running optimizer in-line after DDP communication hook.
Currently contains only optimizer class which must have a method `step_param`.
"""
Reported by Pylint.
Line: 73
Column: 1
return fut.then(decompress)
class _OptimizerHookState(object):
"""
Holds state for running optimizer in-line after DDP communication hook.
Currently contains only optimizer class which must have a method `step_param`.
"""
Reported by Pylint.
Line: 134
Column: 1
) -> Callable[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]:
"""
This wrapper casts the input gradient tensor of a given DDP communication hook to half-precision
floating point format (``torch.float16``), and casts the resulting tensor of the given hook back to
the input data type, such as ``float32``.
Therefore, ``fp16_compress_hook`` is equivalent to ``fp16_compress_wrapper(allreduce_hook)``.
Example::
Reported by Pylint.
torch/distributed/autograd/__init__.py
11 issues
Line: 14
Column: 5
raise RuntimeError("Failed to initialize torch.distributed.autograd")
if is_available():
from torch._C._distributed_autograd import (
get_gradients,
backward,
_init,
_new_context,
_release_context,
Reported by Pylint.
Line: 7
Column: 20
def is_available():
return hasattr(torch._C, "_dist_autograd_init")
if is_available() and not torch._C._dist_autograd_init():
raise RuntimeError("Failed to initialize torch.distributed.autograd")
Reported by Pylint.
Line: 10
Column: 27
return hasattr(torch._C, "_dist_autograd_init")
if is_available() and not torch._C._dist_autograd_init():
raise RuntimeError("Failed to initialize torch.distributed.autograd")
if is_available():
from torch._C._distributed_autograd import (
get_gradients,
Reported by Pylint.
Line: 10
Column: 27
return hasattr(torch._C, "_dist_autograd_init")
if is_available() and not torch._C._dist_autograd_init():
raise RuntimeError("Failed to initialize torch.distributed.autograd")
if is_available():
from torch._C._distributed_autograd import (
get_gradients,
Reported by Pylint.
Line: 46
Column: 9
>>> dist_autograd.backward(context_id, [loss])
'''
def __enter__(self):
self.autograd_context = _new_context()
return self.autograd_context._context_id()
def __exit__(self, type, value, traceback):
_release_context(self.autograd_context._context_id())
Reported by Pylint.
Line: 49
Column: 24
self.autograd_context = _new_context()
return self.autograd_context._context_id()
def __exit__(self, type, value, traceback):
_release_context(self.autograd_context._context_id())
Reported by Pylint.
Line: 1
Column: 1
import sys
import torch
def is_available():
return hasattr(torch._C, "_dist_autograd_init")
Reported by Pylint.
Line: 6
Column: 1
import torch
def is_available():
return hasattr(torch._C, "_dist_autograd_init")
if is_available() and not torch._C._dist_autograd_init():
raise RuntimeError("Failed to initialize torch.distributed.autograd")
Reported by Pylint.
Line: 28
Column: 1
DistAutogradContext,
)
class context(object):
'''
Context object to wrap forward and backward passes when using
distributed autograd. The ``context_id`` generated in the ``with``
statement is required to uniquely identify a distributed backward pass
on all workers. Each worker stores metadata associated with this
Reported by Pylint.
Line: 28
Column: 1
DistAutogradContext,
)
class context(object):
'''
Context object to wrap forward and backward passes when using
distributed autograd. The ``context_id`` generated in the ``with``
statement is required to uniquely identify a distributed backward pass
on all workers. Each worker stores metadata associated with this
Reported by Pylint.
test/onnx/test_caffe2_common.py
11 issues
Line: 3
Column: 1
import glob
import numpy as np
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
Reported by Pylint.
Line: 4
Column: 1
import glob
import numpy as np
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
Reported by Pylint.
Line: 6
Column: 1
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
tensor = onnx.TensorProto()
with open(f, "rb") as file:
Reported by Pylint.
Line: 1
Column: 1
import glob
import numpy as np
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
Reported by Pylint.
Line: 4
Column: 1
import glob
import numpy as np
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
Reported by Pylint.
Line: 5
Column: 1
import numpy as np
import onnx.backend.test
import caffe2.python.onnx.backend as c2
import os
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
tensor = onnx.TensorProto()
Reported by Pylint.
Line: 9
Column: 1
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
tensor = onnx.TensorProto()
with open(f, "rb") as file:
tensor.ParseFromString(file.read())
return tensor
Reported by Pylint.
Line: 9
Column: 1
from onnx import numpy_helper
def load_tensor_as_numpy_array(f):
tensor = onnx.TensorProto()
with open(f, "rb") as file:
tensor.ParseFromString(file.read())
return tensor
Reported by Pylint.
Line: 16
Column: 1
return tensor
def assert_similar(ref, real):
np.testing.assert_equal(len(ref), len(real))
for i in range(len(ref)):
np.testing.assert_allclose(ref[i], real[i], rtol=1e-3)
Reported by Pylint.
Line: 18
Column: 5
def assert_similar(ref, real):
np.testing.assert_equal(len(ref), len(real))
for i in range(len(ref)):
np.testing.assert_allclose(ref[i], real[i], rtol=1e-3)
def run_generated_test(model_file, data_dir, device="CPU"):
model = onnx.load(model_file)
Reported by Pylint.
tools/stats/import_test_stats.py
11 issues
Line: 55
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b310-urllib-urlopen
with open(path, "r") as f:
return cast(Dict[str, Any], json.load(f))
try:
contents = urlopen(url, timeout=1).read().decode('utf-8')
processed_contents = process_fn(json.loads(contents))
with open(path, "w") as f:
f.write(json.dumps(processed_contents))
return processed_contents
except Exception as e:
Reported by Bandit.
Line: 60
Column: 12
with open(path, "w") as f:
f.write(json.dumps(processed_contents))
return processed_contents
except Exception as e:
print(f'Could not download {url} because of error {e}.')
return {}
def get_slow_tests(dirpath: str, filename: str = SLOW_TESTS_FILE) -> Optional[Dict[str, float]]:
Reported by Pylint.
Line: 69
Column: 12
url = "https://raw.githubusercontent.com/pytorch/test-infra/main/stats/slow-tests.json"
try:
return fetch_and_cache(dirpath, filename, url, lambda x: x)
except Exception:
print("Couldn't download slow test set, leaving all tests enabled...")
return {}
def get_disabled_tests(dirpath: str, filename: str = DISABLED_TESTS_FILE) -> Optional[Dict[str, Any]]:
Reported by Pylint.
Line: 97
Column: 12
try:
url = 'https://raw.githubusercontent.com/pytorch/test-infra/main/stats/disabled-tests.json'
return fetch_and_cache(dirpath, filename, url, process_disabled_test)
except Exception:
print("Couldn't download test skip set, leaving all tests enabled...")
return {}
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
import datetime
import json
import os
import pathlib
import re
from typing import Any, Callable, Dict, List, Optional, cast
from urllib.request import urlopen
Reported by Pylint.
Line: 52
Column: 33
if os.path.exists(path) and is_cached_file_valid():
# Another test process already downloaded the file, so don't re-do it
with open(path, "r") as f:
return cast(Dict[str, Any], json.load(f))
try:
contents = urlopen(url, timeout=1).read().decode('utf-8')
processed_contents = process_fn(json.loads(contents))
with open(path, "w") as f:
Reported by Pylint.
Line: 57
Column: 33
try:
contents = urlopen(url, timeout=1).read().decode('utf-8')
processed_contents = process_fn(json.loads(contents))
with open(path, "w") as f:
f.write(json.dumps(processed_contents))
return processed_contents
except Exception as e:
print(f'Could not download {url} because of error {e}.')
return {}
Reported by Pylint.
Line: 60
Column: 5
with open(path, "w") as f:
f.write(json.dumps(processed_contents))
return processed_contents
except Exception as e:
print(f'Could not download {url} because of error {e}.')
return {}
def get_slow_tests(dirpath: str, filename: str = SLOW_TESTS_FILE) -> Optional[Dict[str, float]]:
Reported by Pylint.
Line: 65
Column: 1
return {}
def get_slow_tests(dirpath: str, filename: str = SLOW_TESTS_FILE) -> Optional[Dict[str, float]]:
url = "https://raw.githubusercontent.com/pytorch/test-infra/main/stats/slow-tests.json"
try:
return fetch_and_cache(dirpath, filename, url, lambda x: x)
except Exception:
print("Couldn't download slow test set, leaving all tests enabled...")
Reported by Pylint.
Line: 74
Column: 1
return {}
def get_disabled_tests(dirpath: str, filename: str = DISABLED_TESTS_FILE) -> Optional[Dict[str, Any]]:
def process_disabled_test(the_response: Dict[str, Any]) -> Dict[str, Any]:
disabled_test_from_issues = dict()
for item in the_response['items']:
title = item['title']
key = 'DISABLED '
Reported by Pylint.