The following issues were found
tools/autograd/gen_trace_type.py
51 issues
Line: 47
Column: 3
'convolution', 'conv1d', 'conv2d', 'conv3d', 'conv_transpose1d',
'conv_transpose2d', 'conv_transpose3d', 'lstm_cell', 'gru_cell',
'rnn_tanh_cell', 'rnn_relu_cell',
# FIXME: figure out a better way when we support sparse tensors in jit
'_coalesced',
}
def should_trace(f: NativeFunction) -> bool:
# Operations involving Storage or Type are not traceable at the moment
Reported by Pylint.
Line: 81
Column: 3
}
def format_trace_op_name(f: NativeFunction) -> str:
# TODO: byte-for-byte compatible with old codegen behavior - should clean up
if f.func.kind() in (SchemaKind.functional, SchemaKind.out) or f.func.name.name.dunder_method:
# special case for *_out functions: the in-place and out-of-place ops
# are overloaded with the same name in the JIT
trace_name = str(f.func.name.name)
trace_name = RENAME_TRACE.get(trace_name, trace_name)
Reported by Pylint.
Line: 128
Column: 3
# *_out functions take the result as a separate argument, but we don't want to
# trace that argument directly. Instead, we trace its TensorOptions.
# So first, we need to remove the out argument from the list of arguments to trace.
# TODO: byte-for-byte compatible with old codegen behavior - it's incorrect to assume
# there is only one output argument.
args = args[:-1]
trace_inputs = itertools.chain.from_iterable(dispatch_trace_input(arg) for arg in args)
Reported by Pylint.
Line: 137
Column: 3
if f.func.is_out_fn():
# for *_out functions, handle the result argument differently for inplace/outplace.
# For inplace: just add the input to the end to confirm with the JIT schema
name = f.func.arguments.out[0].name # TODO: old codegen behavior - should fix
inplace = ADD_TRACE_INPUT.substitute(name=name, input=name)
# for outplace: do nothing, except if the function is a factory.
# Factories are a bit special because their out-of-place overloads
# take an extra TensorOptions argument, which is missing in the _out function
Reported by Pylint.
Line: 236
Column: 3
if not should_trace(f):
return ''
# TODO: clean up old codegen behavior
is_inplace = f.func.kind() in (SchemaKind.inplace, SchemaKind.out) and not f.func.name.name.dunder_method
add_args = RENAME_TRACE_ADD_ARGS.get(f.func.name.name.base, '') if is_inplace else ''
additional_inputs = SELECT.substitute(
cond='tracer_state->force_outplace',
true=add_args,
Reported by Pylint.
Line: 1
Column: 1
import itertools
from typing import List, Sequence, Union, Dict
from tools.codegen.api.types import CppSignatureGroup, DispatcherSignature
from tools.codegen.api import cpp
from tools.codegen.code_template import CodeTemplate
from tools.codegen.context import with_native_function
from tools.codegen.gen import parse_native_yaml, FileManager
from tools.codegen.model import (Argument, NativeFunction, SchemaKind,
Reported by Pylint.
Line: 51
Column: 1
'_coalesced',
}
def should_trace(f: NativeFunction) -> bool:
# Operations involving Storage or Type are not traceable at the moment
if any(str(arg.type) in {'Storage', 'Type', 'ConstQuantizerPtr'}
for arg in f.func.schema_order_arguments()):
return False
# We can't trace functions which don't have any Tensor or TensorList returns
Reported by Pylint.
Line: 51
Column: 1
'_coalesced',
}
def should_trace(f: NativeFunction) -> bool:
# Operations involving Storage or Type are not traceable at the moment
if any(str(arg.type) in {'Storage', 'Type', 'ConstQuantizerPtr'}
for arg in f.func.schema_order_arguments()):
return False
# We can't trace functions which don't have any Tensor or TensorList returns
Reported by Pylint.
Line: 80
Column: 1
'fill': 'full_like', # replacing aten::fill_ with aten::full_like
}
def format_trace_op_name(f: NativeFunction) -> str:
# TODO: byte-for-byte compatible with old codegen behavior - should clean up
if f.func.kind() in (SchemaKind.functional, SchemaKind.out) or f.func.name.name.dunder_method:
# special case for *_out functions: the in-place and out-of-place ops
# are overloaded with the same name in the JIT
trace_name = str(f.func.name.name)
Reported by Pylint.
Line: 80
Column: 1
'fill': 'full_like', # replacing aten::fill_ with aten::full_like
}
def format_trace_op_name(f: NativeFunction) -> str:
# TODO: byte-for-byte compatible with old codegen behavior - should clean up
if f.func.kind() in (SchemaKind.functional, SchemaKind.out) or f.func.name.name.dunder_method:
# special case for *_out functions: the in-place and out-of-place ops
# are overloaded with the same name in the JIT
trace_name = str(f.func.name.name)
Reported by Pylint.
test/test_vulkan.py
51 issues
Line: 2
Column: 1
import unittest
import torch
from torch.nn import functional as F
from torch.testing._internal.common_utils import TestCase, run_tests
from torch.testing import FileCheck
import io
@unittest.skipUnless(torch.is_vulkan_available(),
Reported by Pylint.
Line: 3
Column: 1
import unittest
import torch
from torch.nn import functional as F
from torch.testing._internal.common_utils import TestCase, run_tests
from torch.testing import FileCheck
import io
@unittest.skipUnless(torch.is_vulkan_available(),
Reported by Pylint.
Line: 5
Column: 1
import torch
from torch.nn import functional as F
from torch.testing._internal.common_utils import TestCase, run_tests
from torch.testing import FileCheck
import io
@unittest.skipUnless(torch.is_vulkan_available(),
"Vulkan backend must be available for these tests.")
Reported by Pylint.
Line: 6
Column: 1
from torch.nn import functional as F
from torch.testing._internal.common_utils import TestCase, run_tests
from torch.testing import FileCheck
import io
@unittest.skipUnless(torch.is_vulkan_available(),
"Vulkan backend must be available for these tests.")
class TestVulkanRewritePass(TestCase):
Reported by Pylint.
Line: 13
Column: 5
"Vulkan backend must be available for these tests.")
class TestVulkanRewritePass(TestCase):
@staticmethod
def validate_transformed_module(
# To please flake
self,
pattern_count_map,
data_shape,
prepack_removal=False,
Reported by Pylint.
Line: 24
Column: 9
scripted_model = torch.jit.script(module_instance)
scripted_model.eval()
input_data = torch.normal(1, 20, size=data_shape)
ref_result = scripted_model(input_data)
torch._C._jit_pass_vulkan_insert_prepacked_ops(scripted_model._c)
if fuse_clamping_ops or prepack_removal:
scripted_model._c = torch._C._freeze_module(scripted_model._c)
if fuse_clamping_ops:
torch._C._jit_pass_vulkan_fuse_clamp_w_prepacked_conv(scripted_model._c)
Reported by Pylint.
Line: 25
Column: 9
scripted_model.eval()
input_data = torch.normal(1, 20, size=data_shape)
ref_result = scripted_model(input_data)
torch._C._jit_pass_vulkan_insert_prepacked_ops(scripted_model._c)
if fuse_clamping_ops or prepack_removal:
scripted_model._c = torch._C._freeze_module(scripted_model._c)
if fuse_clamping_ops:
torch._C._jit_pass_vulkan_fuse_clamp_w_prepacked_conv(scripted_model._c)
if prepack_removal:
Reported by Pylint.
Line: 25
Column: 9
scripted_model.eval()
input_data = torch.normal(1, 20, size=data_shape)
ref_result = scripted_model(input_data)
torch._C._jit_pass_vulkan_insert_prepacked_ops(scripted_model._c)
if fuse_clamping_ops or prepack_removal:
scripted_model._c = torch._C._freeze_module(scripted_model._c)
if fuse_clamping_ops:
torch._C._jit_pass_vulkan_fuse_clamp_w_prepacked_conv(scripted_model._c)
if prepack_removal:
Reported by Pylint.
Line: 25
Column: 56
scripted_model.eval()
input_data = torch.normal(1, 20, size=data_shape)
ref_result = scripted_model(input_data)
torch._C._jit_pass_vulkan_insert_prepacked_ops(scripted_model._c)
if fuse_clamping_ops or prepack_removal:
scripted_model._c = torch._C._freeze_module(scripted_model._c)
if fuse_clamping_ops:
torch._C._jit_pass_vulkan_fuse_clamp_w_prepacked_conv(scripted_model._c)
if prepack_removal:
Reported by Pylint.
Line: 27
Column: 13
ref_result = scripted_model(input_data)
torch._C._jit_pass_vulkan_insert_prepacked_ops(scripted_model._c)
if fuse_clamping_ops or prepack_removal:
scripted_model._c = torch._C._freeze_module(scripted_model._c)
if fuse_clamping_ops:
torch._C._jit_pass_vulkan_fuse_clamp_w_prepacked_conv(scripted_model._c)
if prepack_removal:
torch._C._jit_pass_vulkan_fold_prepacking_ops(scripted_model._c)
Reported by Pylint.
caffe2/python/checkpoint_test.py
50 issues
Line: 103
Column: 22
tmpdir = tempfile.mkdtemp()
def builder():
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = CheckpointManager(tmpdir, 'temp_node', 'minidb')
return session, checkpoint
self.run_with(builder)
Reported by Pylint.
Line: 116
Column: 13
try:
tmpdir = tempfile.mkdtemp()
def builder():
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
return session, checkpoint
Reported by Pylint.
Line: 117
Column: 22
tmpdir = tempfile.mkdtemp()
def builder():
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
return session, checkpoint
self.run_with(builder)
Reported by Pylint.
Line: 153
Column: 22
tmpdir = tempfile.mkdtemp()
workspace.ResetWorkspace()
for node_id in range(num_nodes):
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
with Cluster():
with Job() as job:
build_pipeline(node_id)
Reported by Pylint.
Line: 168
Column: 18
# (only blobs on init_group are checkpointed)
self.assertEquals(len(ws.blobs), 17)
ws = workspace.C.Workspace()
session = LocalSession(ws)
self.assertEquals(len(ws.blobs), 0)
model_blob_names = ['trainer_1/task_2/GivenTensorInt64Fill:0',
'trainer_2/task_2/GivenTensorInt64Fill:0']
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
Reported by Pylint.
Line: 218
Column: 22
# Create and run the job runner.
for node_id in range(3):
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
with Cluster():
with Job() as job:
build_pipeline(node_id)
Reported by Pylint.
Line: 252
Column: 18
# Check the saving checkpoint failure does not cause job failure
workspace.ResetWorkspace()
for node_id in range(num_nodes):
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
with Cluster():
with Job() as job:
build_pipeline(node_id)
Reported by Pylint.
Line: 297
Column: 14
epoch_limiter(job, 1)
ws = workspace.C.Workspace()
session = LocalSession(ws)
job_runner = JobRunner(job)
job_runner.train(session)
expected_result = np.full(8, 2.0).astype(np.float32)
Reported by Pylint.
Line: 315
Column: 18
"""
try:
tmpdir = tempfile.mkdtemp()
ws = workspace.C.Workspace()
session = LocalSession(ws)
checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb')
with Job() as job:
outputs = build_pipeline(node_id=0)
Reported by Pylint.
Line: 47
Column: 17
def local_copy_op(src, dest):
def copy_op(inputs, outputs):
shutil.copyfile(src, dest)
return copy_op
class UploadToLocalFile(UploadTaskGroupBuilder):
Reported by Pylint.
caffe2/quantization/server/conv_dnnlowp_acc16_op_test.py
50 issues
Line: 6
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import (
check_quantized_results_close,
Reported by Pylint.
Line: 14
Column: 1
check_quantized_results_close,
run_conv_or_fc
)
from hypothesis import assume, given, settings
dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(
[
Reported by Pylint.
Line: 65
Column: 9
preserve_activation_sparsity,
preserve_weight_sparsity,
gc,
dc,
):
assume(group == 1 or dilation == 1)
assume(size >= dilation * (kernel - 1) + 1)
input_channels = input_channels_per_group * group
Reported by Pylint.
Line: 117
Column: 9
# No input quantization error in bias
b = np.round(np.random.randn(output_channels)).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine", "order"])
outputs = []
op_engine_list = [
("Conv", ""),
("Conv", "DNNLOWP_ACC16"),
Reported by Pylint.
Line: 244
Column: 9
preserve_activation_sparsity,
preserve_weight_sparsity,
gc,
dc,
):
assume(group == 1 or dilation == 1)
assume(size >= dilation * (kernel - 1) + 1)
input_channels = input_channels_per_group * group
Reported by Pylint.
Line: 284
Column: 9
b = np.round(np.random.randn(output_channels)).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine", "order"])
outputs = []
op_engine_list = [
("Conv", ""),
("Conv", "DNNLOWP_ACC16"),
Reported by Pylint.
Line: 1
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
Reported by Pylint.
Line: 28
Column: 1
)
class DNNLowPOpConvAcc16OpTest(hu.HypothesisTestCase):
# correctness test with no quantization error in inputs
@given(
stride=st.integers(1, 2),
pad=st.integers(0, 2),
kernel=st.integers(1, 5),
Reported by Pylint.
Line: 47
Column: 5
preserve_weight_sparsity=st.booleans(),
**hu.gcs_cpu_only
)
@settings(deadline=10000)
def test_dnnlowp_conv_acc16_int(
self,
stride,
pad,
kernel,
Reported by Pylint.
Line: 47
Column: 5
preserve_weight_sparsity=st.booleans(),
**hu.gcs_cpu_only
)
@settings(deadline=10000)
def test_dnnlowp_conv_acc16_int(
self,
stride,
pad,
kernel,
Reported by Pylint.
torch/nn/utils/parametrize.py
50 issues
Line: 47
Column: 5
for x in xs:
out_rnn = self.rnn_cell(x, out_rnn)
"""
global _cache
global _cache_enabled
_cache_enabled += 1
try:
yield
finally:
Reported by Pylint.
Line: 48
Column: 5
out_rnn = self.rnn_cell(x, out_rnn)
"""
global _cache
global _cache_enabled
_cache_enabled += 1
try:
yield
finally:
_cache_enabled -= 1
Reported by Pylint.
Line: 312
Column: 9
@torch.jit.unused
def get_cached_parametrization(parametrization) -> Tensor:
global _cache
key = (id(module), tensor_name)
tensor = _cache.get(key)
if tensor is None:
tensor = parametrization()
_cache[key] = tensor
Reported by Pylint.
Line: 327
Column: 18
# Scripting
raise RuntimeError('Caching is not implemented for scripting. '
'Either disable caching or avoid scripting.')
elif torch._C._get_tracing_state() is not None:
# Tracing
raise RuntimeError('Cannot trace a model while caching parametrizations.')
else:
return get_cached_parametrization(parametrization)
else:
Reported by Pylint.
Line: 327
Column: 18
# Scripting
raise RuntimeError('Caching is not implemented for scripting. '
'Either disable caching or avoid scripting.')
elif torch._C._get_tracing_state() is not None:
# Tracing
raise RuntimeError('Cannot trace a model while caching parametrizations.')
else:
return get_cached_parametrization(parametrization)
else:
Reported by Pylint.
Line: 512
Column: 59
module.parametrizations[tensor_name].append(parametrization)
# If unsafe was True in previous parametrization, keep it enabled
module.parametrizations[tensor_name].unsafe |= unsafe # type: ignore[index, union-attr]
elif tensor_name in module._buffers or tensor_name in module._parameters:
# Set the parametrization mechanism
# Fetch the original buffer or parameter
original = getattr(module, tensor_name)
# We create this early to check for possible errors
parametrizations = ParametrizationList([parametrization], original, unsafe=unsafe)
Reported by Pylint.
Line: 512
Column: 25
module.parametrizations[tensor_name].append(parametrization)
# If unsafe was True in previous parametrization, keep it enabled
module.parametrizations[tensor_name].unsafe |= unsafe # type: ignore[index, union-attr]
elif tensor_name in module._buffers or tensor_name in module._parameters:
# Set the parametrization mechanism
# Fetch the original buffer or parameter
original = getattr(module, tensor_name)
# We create this early to check for possible errors
parametrizations = ParametrizationList([parametrization], original, unsafe=unsafe)
Reported by Pylint.
Line: 1
Column: 1
import torch
from torch.nn.modules.container import ModuleList, ModuleDict, Module
from torch.nn.parameter import Parameter
from torch import Tensor
import collections
from contextlib import contextmanager
from typing import Union, Optional, Dict, Tuple, Sequence
Reported by Pylint.
Line: 6
Column: 1
from torch.nn.parameter import Parameter
from torch import Tensor
import collections
from contextlib import contextmanager
from typing import Union, Optional, Dict, Tuple, Sequence
_cache_enabled = 0
Reported by Pylint.
Line: 7
Column: 1
from torch import Tensor
import collections
from contextlib import contextmanager
from typing import Union, Optional, Dict, Tuple, Sequence
_cache_enabled = 0
_cache: Dict[Tuple[int, str], Optional[Tensor]] = {}
Reported by Pylint.
torch/distributed/elastic/agent/server/api.py
50 issues
Line: 496
Column: 36
raise NotImplementedError()
@abc.abstractmethod
def _shutdown(self, death_sig: signal.Signals = signal.SIGTERM) -> None:
"""
Cleans up any resources that were allocated during the agent's work.
Args:
death_sig: Signal to send to the child process, SIGTERM is default
Reported by Pylint.
Line: 76
Column: 3
local_world_size: int
rdzv_handler: rdzv.RendezvousHandler
fn: Optional[Callable] = None
# TODO @kiuk - make entrypoint a required field
entrypoint: Union[Callable, str, None] = None
args: Tuple = ()
max_restarts: int = 3
monitor_interval: float = 30.0
master_port: Optional[int] = None
Reported by Pylint.
Line: 372
Column: 17
host="localhost", port=None, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM
)
for addr in addrs:
family, type, proto, _, _ = addr
s = socket.socket(family, type, proto)
try:
s.bind(("localhost", 0))
s.listen(0)
return s
Reported by Pylint.
Line: 552
Column: 9
master_addr, master_port = self._get_master_addr_port(store)
restart_count = spec.max_restarts - self._remaining_restarts
log.info(
f"[{spec.role}] Rendezvous complete for workers. Result:\n"
f" restart_count={restart_count}\n"
f" master_addr={master_addr}\n"
f" master_port={master_port}\n"
f" group_rank={group_rank}\n"
Reported by Pylint.
Line: 672
Column: 9
of state to ``_monitor_workers()`` method
"""
role = worker_group.spec.role
log.info(f"[{role}] Rendezvous'ing worker group")
# TODO after stopping workers, wait at least monitor_interval*2 for
# workers on different nodes to fail on a collective op before waiting
# on the rdzv barrier, this way we ensure that nodes enter rdzv
# at around the same time and reduce false positive rdzv timeout errors
Reported by Pylint.
Line: 674
Column: 3
role = worker_group.spec.role
log.info(f"[{role}] Rendezvous'ing worker group")
# TODO after stopping workers, wait at least monitor_interval*2 for
# workers on different nodes to fail on a collective op before waiting
# on the rdzv barrier, this way we ensure that nodes enter rdzv
# at around the same time and reduce false positive rdzv timeout errors
self._rendezvous(worker_group)
Reported by Pylint.
Line: 680
Column: 9
# at around the same time and reduce false positive rdzv timeout errors
self._rendezvous(worker_group)
log.info(f"[{role}] Starting worker group")
worker_ids = self._start_workers(worker_group)
for local_rank, w_id in worker_ids.items():
worker = worker_group.workers[local_rank]
worker.id = w_id
Reported by Pylint.
Line: 697
Column: 9
"""
role = worker_group.spec.role
log.info(f"[{role}] Stopping worker group")
self._stop_workers(worker_group)
worker_group.state = WorkerState.STOPPED
self._initialize_workers(worker_group)
# pyre-fixme[56]: Pyre was not able to infer the type of the decorator
Reported by Pylint.
Line: 715
Column: 13
self._record_worker_events(result)
return result
except SignalException as e:
log.warning(f"Received {e.sigval} death signal, shutting down workers")
self._shutdown(e.sigval)
shutdown_called = True
raise
finally:
if not shutdown_called:
Reported by Pylint.
Line: 833
Column: 9
spec = self._worker_group.spec
role = spec.role
log.info(
f"[{role}] starting workers for entrypoint: {spec.get_entrypoint_name()}"
)
self._initialize_workers(self._worker_group)
monitor_interval = spec.monitor_interval
Reported by Pylint.
test/distributed/launcher/run_test.py
50 issues
Line: 19
Column: 1
from unittest import mock
from unittest.mock import Mock, patch
import torch.distributed.run as launch
from torch.distributed.elastic.agent.server.api import RunResult, WorkerState
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
Reported by Pylint.
Line: 20
Column: 1
from unittest.mock import Mock, patch
import torch.distributed.run as launch
from torch.distributed.elastic.agent.server.api import RunResult, WorkerState
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
Reported by Pylint.
Line: 21
Column: 1
import torch.distributed.run as launch
from torch.distributed.elastic.agent.server.api import RunResult, WorkerState
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
Reported by Pylint.
Line: 22
Column: 1
import torch.distributed.run as launch
from torch.distributed.elastic.agent.server.api import RunResult, WorkerState
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
sandcastle_skip_if,
Reported by Pylint.
Line: 23
Column: 1
from torch.distributed.elastic.agent.server.api import RunResult, WorkerState
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
sandcastle_skip_if,
)
Reported by Pylint.
Line: 24
Column: 1
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous.etcd_server import EtcdServer
from torch.distributed.elastic.utils import get_socket_with_port
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
sandcastle_skip_if,
)
Reported by Pylint.
Line: 450
Column: 9
def test_min_max_nodes_parse(self):
min_nodes, max_nodes = launch.parse_min_max_nnodes("1")
self.assertTrue(min_nodes, max_nodes)
self.assertTrue(1, min_nodes)
min_nodes, max_nodes = launch.parse_min_max_nnodes("2:20")
self.assertTrue(2, min_nodes)
self.assertTrue(20, max_nodes)
with self.assertRaises(RuntimeError):
launch.parse_min_max_nnodes("2:20:30")
Reported by Pylint.
Line: 452
Column: 9
self.assertTrue(min_nodes, max_nodes)
self.assertTrue(1, min_nodes)
min_nodes, max_nodes = launch.parse_min_max_nnodes("2:20")
self.assertTrue(2, min_nodes)
self.assertTrue(20, max_nodes)
with self.assertRaises(RuntimeError):
launch.parse_min_max_nnodes("2:20:30")
@patch("torch.distributed.launcher.api.LocalElasticAgent")
Reported by Pylint.
Line: 453
Column: 9
self.assertTrue(1, min_nodes)
min_nodes, max_nodes = launch.parse_min_max_nnodes("2:20")
self.assertTrue(2, min_nodes)
self.assertTrue(20, max_nodes)
with self.assertRaises(RuntimeError):
launch.parse_min_max_nnodes("2:20:30")
@patch("torch.distributed.launcher.api.LocalElasticAgent")
def test_launch_shutdown(self, agent_mock_cls):
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import multiprocessing as mp
import os
Reported by Pylint.
caffe2/python/operator_test/elementwise_logical_ops_test.py
50 issues
Line: 9
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
Reported by Pylint.
Line: 10
Column: 1
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
def mux(select, left, right):
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, settings
Reported by Pylint.
Line: 12
Column: 1
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
def mux(select, left, right):
return [np.vectorize(lambda c, x, y: x if c else y)(select, left, right)]
Reported by Pylint.
Line: 15
Column: 1
import unittest
def mux(select, left, right):
return [np.vectorize(lambda c, x, y: x if c else y)(select, left, right)]
def rowmux(select_vec, left, right):
select = [[s] * len(left) for s in select_vec]
Reported by Pylint.
Line: 19
Column: 1
return [np.vectorize(lambda c, x, y: x if c else y)(select, left, right)]
def rowmux(select_vec, left, right):
select = [[s] * len(left) for s in select_vec]
return mux(select, left, right)
class TestWhere(serial.SerializedTestCase):
Reported by Pylint.
Line: 24
Column: 1
return mux(select, left, right)
class TestWhere(serial.SerializedTestCase):
def test_reference(self):
self.assertTrue((
np.array([1, 4]) == mux([True, False],
[1, 2],
Reported by Pylint.
Line: 26
Column: 5
class TestWhere(serial.SerializedTestCase):
def test_reference(self):
self.assertTrue((
np.array([1, 4]) == mux([True, False],
[1, 2],
[3, 4])[0]
).all())
Reported by Pylint.
Line: 42
Column: 5
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs_cpu_only)
@settings(deadline=10000)
def test_where(self, N, gc, dc, engine):
C = np.random.rand(N).astype(bool)
X = np.random.rand(N).astype(np.float32)
Y = np.random.rand(N).astype(np.float32)
op = core.CreateOperator("Where", ["C", "X", "Y"], ["Z"], engine=engine)
self.assertDeviceChecks(dc, op, [C, X, Y], [0])
Reported by Pylint.
Line: 42
Column: 5
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs_cpu_only)
@settings(deadline=10000)
def test_where(self, N, gc, dc, engine):
C = np.random.rand(N).astype(bool)
X = np.random.rand(N).astype(np.float32)
Y = np.random.rand(N).astype(np.float32)
op = core.CreateOperator("Where", ["C", "X", "Y"], ["Z"], engine=engine)
self.assertDeviceChecks(dc, op, [C, X, Y], [0])
Reported by Pylint.
.github/scripts/generate_ci_workflows.py
50 issues
Line: 492
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b701_jinja2_autoescape_false.html
]
if __name__ == "__main__":
jinja_env = jinja2.Environment(
variable_start_string="!{{",
loader=jinja2.FileSystemLoader(str(GITHUB_DIR.joinpath("templates"))),
)
template_and_workflows = [
(jinja_env.get_template("linux_ci_workflow.yml.j2"), LINUX_WORKFLOWS),
Reported by Bandit.
Line: 10
Column: 1
import jinja2
import json
import os
from typing_extensions import Literal
YamlShellBool = Literal["''", 1]
Arch = Literal["windows", "linux"]
DOCKER_REGISTRY = "308535385114.dkr.ecr.us-east-1.amazonaws.com"
Reported by Pylint.
Line: 57
Column: 3
trigger_action_only: bool = False
def gen_root_job_condition(self) -> None:
# TODO: Make conditions strict
# At the beginning of the rollout of ciflow, we keep everything the same as what we have
# Once fully rollout, we can have strict constraints
# e.g. ADD env.GITHUB_ACTOR == '{self.trigger_actor}
# REMOVE github.event.action !='{self.trigger_action}'
label_conditions = [
Reported by Pylint.
Line: 506
Column: 16
for w in existing_workflows:
try:
os.remove(w)
except Exception as e:
print(f"Error occurred when deleting file {w}: {e}")
ciflow_ruleset = CIFlowRuleset()
for template, workflows in template_and_workflows:
for workflow in workflows:
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Dict, Set
import jinja2
import json
import os
Reported by Pylint.
Line: 8
Column: 1
from typing import Dict, Set
import jinja2
import json
import os
from typing_extensions import Literal
YamlShellBool = Literal["''", 1]
Arch = Literal["windows", "linux"]
Reported by Pylint.
Line: 9
Column: 1
import jinja2
import json
import os
from typing_extensions import Literal
YamlShellBool = Literal["''", 1]
Arch = Literal["windows", "linux"]
Reported by Pylint.
Line: 43
Column: 1
@dataclass
class CIFlowConfig:
enabled: bool = False
labels: Set[str] = field(default_factory=set)
trigger_action: str = 'unassigned'
trigger_actor: str = 'pytorchbot'
root_job_name: str = 'ciflow_should_run'
Reported by Pylint.
Line: 56
Column: 5
# ciflow (via probot) is not automated yet.
trigger_action_only: bool = False
def gen_root_job_condition(self) -> None:
# TODO: Make conditions strict
# At the beginning of the rollout of ciflow, we keep everything the same as what we have
# Once fully rollout, we can have strict constraints
# e.g. ADD env.GITHUB_ACTOR == '{self.trigger_actor}
# REMOVE github.event.action !='{self.trigger_action}'
Reported by Pylint.
Line: 63
Column: 1
# e.g. ADD env.GITHUB_ACTOR == '{self.trigger_actor}
# REMOVE github.event.action !='{self.trigger_action}'
label_conditions = [
f"contains(github.event.pull_request.labels.*.name, '{label}')" for label in sorted(self.labels)]
self.root_job_condition = f"(github.event_name != 'pull_request') || " \
f"(github.event.action !='{self.trigger_action}') || " \
f"({' || '.join(label_conditions)})"
def reset_root_job(self) -> None:
Reported by Pylint.
tools/actions_local_runner.py
49 issues
Line: 117
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b604_any_other_function_with_shell_equals_true.html
proc = await asyncio.create_subprocess_shell(
cmd_str,
shell=True,
cwd=REPO_ROOT,
env=env,
stdout=subprocess.PIPE if redirect else None,
stderr=subprocess.PIPE if redirect else None,
executable=shutil.which("bash"),
Reported by Bandit.
Line: 317
Column: 1
)
class YamlStep(Check):
def __init__(self, step: Dict[str, Any], job_name: str, quiet: bool):
super().__init__(files=None, quiet=quiet)
self.step = step
self.name = f'{job_name}: {self.step["name"]}'
Reported by Pylint.
Line: 325
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
async def full(self) -> CommandResult:
env = os.environ.copy()
env["GITHUB_WORKSPACE"] = "/tmp"
script = self.step["run"]
if self.quiet:
# TODO: Either lint that GHA scripts only use 'set -eux' or make this more
# resilient
Reported by Bandit.
Line: 329
Column: 3
script = self.step["run"]
if self.quiet:
# TODO: Either lint that GHA scripts only use 'set -eux' or make this more
# resilient
script = script.replace("set -eux", "set -eu")
script = re.sub(r"^time ", "", script, flags=re.MULTILINE)
return await shell_cmd(script, env=env)
Reported by Pylint.
Line: 338
Column: 5
def changed_files() -> Optional[List[str]]:
changed_files: Optional[List[str]] = None
try:
changed_files = sorted(find_changed_files())
except Exception:
# If the git commands failed for some reason, bail out and use the whole list
print(
Reported by Pylint.
Line: 341
Column: 12
changed_files: Optional[List[str]] = None
try:
changed_files = sorted(find_changed_files())
except Exception:
# If the git commands failed for some reason, bail out and use the whole list
print(
"Could not query git for changed files, falling back to testing all files instead",
file=sys.stderr,
)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
import os
import argparse
import yaml
import asyncio
Reported by Pylint.
Line: 4
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
import os
import argparse
import yaml
import asyncio
Reported by Bandit.
Line: 9
Column: 1
import os
import argparse
import yaml
import asyncio
import shutil
import re
import fnmatch
import shlex
import configparser
Reported by Pylint.
Line: 10
Column: 1
import argparse
import yaml
import asyncio
import shutil
import re
import fnmatch
import shlex
import configparser
Reported by Pylint.