The following issues were found
torch/testing/_internal/distributed/pipe_with_ddp_test.py
40 issues
Line: 82
Column: 63
def _run_basic_test(self, backend, checkpoint, find_unused_parameters=False, static_graph=False):
dist.init_process_group(
backend=backend,
init_method=INIT_METHOD_TEMPLATE.format(file_name=self.file_name),
world_size=self.world_size,
rank=self.rank,
)
# Use 4 GPUs, two replicas of a pipe across GPU 0 and 1 and another
Reported by Pylint.
Line: 84
Column: 18
backend=backend,
init_method=INIT_METHOD_TEMPLATE.format(file_name=self.file_name),
world_size=self.world_size,
rank=self.rank,
)
# Use 4 GPUs, two replicas of a pipe across GPU 0 and 1 and another
# pipe between GPU 2 and 3. Both replicas are replicated via DDP.
fc1 = nn.Linear(16, 8, bias=False).cuda(2 * self.rank)
Reported by Pylint.
Line: 89
Column: 53
# Use 4 GPUs, two replicas of a pipe across GPU 0 and 1 and another
# pipe between GPU 2 and 3. Both replicas are replicated via DDP.
fc1 = nn.Linear(16, 8, bias=False).cuda(2 * self.rank)
class MyModule(nn.Module):
def __init__(self, device):
super(MyModule, self).__init__()
self.fc2 = nn.Linear(8, 4, bias=False).cuda(device)
Reported by Pylint.
Line: 103
Column: 31
else:
return self.fc3(self.fc2(inp))
layer2 = MyModule(2 * self.rank + 1)
model = nn.Sequential(
fc1,
layer2
)
model = Pipe(model, chunks=2, checkpoint=checkpoint)
Reported by Pylint.
Line: 115
Column: 65
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
model_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
out = model(model_input).local_value()
out.sum().backward()
# Run forward again for find_unused_parameters to trigger any potential errors.
if find_unused_parameters:
Reported by Pylint.
Line: 115
Column: 23
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
model_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
out = model(model_input).local_value()
out.sum().backward()
# Run forward again for find_unused_parameters to trigger any potential errors.
if find_unused_parameters:
Reported by Pylint.
Line: 115
Column: 51
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
model_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
out = model(model_input).local_value()
out.sum().backward()
# Run forward again for find_unused_parameters to trigger any potential errors.
if find_unused_parameters:
Reported by Pylint.
Line: 123
Column: 62
if find_unused_parameters:
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
unused_param_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
model(unused_param_input).local_value().sum().backward()
# Run a few more iterations of fwd + bwd to ensure gradient synchronization
# occurs properly across iterations via delay_all_reduce/bucketized allreduce.
for _ in range(3):
Reported by Pylint.
Line: 123
Column: 34
if find_unused_parameters:
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
unused_param_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
model(unused_param_input).local_value().sum().backward()
# Run a few more iterations of fwd + bwd to ensure gradient synchronization
# occurs properly across iterations via delay_all_reduce/bucketized allreduce.
for _ in range(3):
Reported by Pylint.
Line: 123
Column: 76
if find_unused_parameters:
# Ensure inputs are different across ranks to verify that gradient
# sync indeed occurs.
unused_param_input = torch.rand(16, 16).cuda(2 * self.rank) * (self.rank + 1)
model(unused_param_input).local_value().sum().backward()
# Run a few more iterations of fwd + bwd to ensure gradient synchronization
# occurs properly across iterations via delay_all_reduce/bucketized allreduce.
for _ in range(3):
Reported by Pylint.
aten/src/ATen/native/quantized/cpu/qnnpack/deps/clog/src/clog.c
40 issues
Line: 70
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
if (module == NULL) {
memcpy(stack_buffer, CLOG_FATAL_PREFIX, CLOG_FATAL_PREFIX_LENGTH);
} else {
prefix_chars = snprintf(
stack_buffer, CLOG_STACK_BUFFER_SIZE, CLOG_FATAL_PREFIX_FORMAT, module);
if (prefix_chars < 0) {
/* Format error in prefix (possible if prefix is modified): skip prefix
* and continue as if nothing happened. */
prefix_chars = 0;
Reported by FlawFinder.
Line: 86
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
* use on-heap buffer. Do not even try to format the string into on-stack
* buffer.
*/
format_chars = vsnprintf(NULL, 0, format, args);
} else {
format_chars = vsnprintf(
&stack_buffer[prefix_chars],
CLOG_STACK_BUFFER_SIZE - prefix_chars - CLOG_SUFFIX_LENGTH,
format,
Reported by FlawFinder.
Line: 88
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
*/
format_chars = vsnprintf(NULL, 0, format, args);
} else {
format_chars = vsnprintf(
&stack_buffer[prefix_chars],
CLOG_STACK_BUFFER_SIZE - prefix_chars - CLOG_SUFFIX_LENGTH,
format,
args);
}
Reported by FlawFinder.
Line: 109
Column: 7
CWE codes:
134
Suggestion:
Use a constant for the format specification
if (prefix_chars > CLOG_STACK_BUFFER_SIZE) {
/* Prefix didn't fit into on-stack buffer, re-format it again to on-heap
* buffer */
snprintf(
heap_buffer,
prefix_chars + 1 /* for '\0'-terminator */,
CLOG_FATAL_PREFIX_FORMAT,
module);
} else {
Reported by FlawFinder.
Line: 118
Column: 5
CWE codes:
134
Suggestion:
Use a constant for the format specification
/* Copy pre-formatted prefix from on-stack buffer to on-heap buffer */
memcpy(heap_buffer, stack_buffer, prefix_chars);
}
vsnprintf(
heap_buffer + prefix_chars,
format_chars + CLOG_SUFFIX_LENGTH,
format,
args_copy);
out_buffer = heap_buffer;
Reported by FlawFinder.
Line: 164
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
if (module == NULL) {
memcpy(stack_buffer, CLOG_ERROR_PREFIX, CLOG_ERROR_PREFIX_LENGTH);
} else {
prefix_chars = snprintf(
stack_buffer, CLOG_STACK_BUFFER_SIZE, CLOG_ERROR_PREFIX_FORMAT, module);
if (prefix_chars < 0) {
/* Format error in prefix (possible if prefix is modified): skip prefix
* and continue as if nothing happened. */
prefix_chars = 0;
Reported by FlawFinder.
Line: 180
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
* use on-heap buffer. Do not even try to format the string into on-stack
* buffer.
*/
format_chars = vsnprintf(NULL, 0, format, args);
} else {
format_chars = vsnprintf(
&stack_buffer[prefix_chars],
CLOG_STACK_BUFFER_SIZE - prefix_chars - CLOG_SUFFIX_LENGTH,
format,
Reported by FlawFinder.
Line: 182
Column: 20
CWE codes:
134
Suggestion:
Use a constant for the format specification
*/
format_chars = vsnprintf(NULL, 0, format, args);
} else {
format_chars = vsnprintf(
&stack_buffer[prefix_chars],
CLOG_STACK_BUFFER_SIZE - prefix_chars - CLOG_SUFFIX_LENGTH,
format,
args);
}
Reported by FlawFinder.
Line: 203
Column: 7
CWE codes:
134
Suggestion:
Use a constant for the format specification
if (prefix_chars > CLOG_STACK_BUFFER_SIZE) {
/* Prefix didn't fit into on-stack buffer, re-format it again to on-heap
* buffer */
snprintf(
heap_buffer,
prefix_chars + 1 /* for '\0'-terminator */,
CLOG_ERROR_PREFIX_FORMAT,
module);
} else {
Reported by FlawFinder.
Line: 212
Column: 5
CWE codes:
134
Suggestion:
Use a constant for the format specification
/* Copy pre-formatted prefix from on-stack buffer to on-heap buffer */
memcpy(heap_buffer, stack_buffer, prefix_chars);
}
vsnprintf(
heap_buffer + prefix_chars,
format_chars + CLOG_SUFFIX_LENGTH,
format,
args_copy);
out_buffer = heap_buffer;
Reported by FlawFinder.
caffe2/python/operator_test/feature_maps_ops_test.py
40 issues
Line: 1
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test_util import TestCase
import numpy as np
Reported by Pylint.
Line: 10
Column: 1
import numpy as np
class TestFeatureMapsOps(TestCase):
def test_merge_dense_feature_tensors(self):
op = core.CreateOperator(
"MergeDenseFeatureTensors",
[
Reported by Pylint.
Line: 12
Column: 5
class TestFeatureMapsOps(TestCase):
def test_merge_dense_feature_tensors(self):
op = core.CreateOperator(
"MergeDenseFeatureTensors",
[
"in1", "in1_presence",
],
Reported by Pylint.
Line: 12
Column: 5
class TestFeatureMapsOps(TestCase):
def test_merge_dense_feature_tensors(self):
op = core.CreateOperator(
"MergeDenseFeatureTensors",
[
"in1", "in1_presence",
],
Reported by Pylint.
Line: 13
Column: 9
class TestFeatureMapsOps(TestCase):
def test_merge_dense_feature_tensors(self):
op = core.CreateOperator(
"MergeDenseFeatureTensors",
[
"in1", "in1_presence",
],
[
Reported by Pylint.
Line: 49
Column: 5
)
def test_merge_single_scalar_feature_tensors(self):
op = core.CreateOperator(
"MergeSingleScalarFeatureTensors",
[
"in1", "in1_presence",
"in2", "in2_presence",
Reported by Pylint.
Line: 49
Column: 5
)
def test_merge_single_scalar_feature_tensors(self):
op = core.CreateOperator(
"MergeSingleScalarFeatureTensors",
[
"in1", "in1_presence",
"in2", "in2_presence",
Reported by Pylint.
Line: 50
Column: 9
def test_merge_single_scalar_feature_tensors(self):
op = core.CreateOperator(
"MergeSingleScalarFeatureTensors",
[
"in1", "in1_presence",
"in2", "in2_presence",
],
Reported by Pylint.
Line: 96
Column: 5
np.array([11.1, 12.1, 12.2], dtype=np.float)
)
def test_merge_single_scalar_feature_tensors_gradient(self):
op = core.CreateOperator(
"MergeSingleScalarFeatureTensorsGradient",
[
"in1_presence",
"in2_presence",
Reported by Pylint.
Line: 96
Column: 5
np.array([11.1, 12.1, 12.2], dtype=np.float)
)
def test_merge_single_scalar_feature_tensors_gradient(self):
op = core.CreateOperator(
"MergeSingleScalarFeatureTensorsGradient",
[
"in1_presence",
"in2_presence",
Reported by Pylint.
caffe2/quantization/server/fully_connected_dnnlowp_acc16_op_test.py
40 issues
Line: 6
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import (
check_quantized_results_close,
Reported by Pylint.
Line: 14
Column: 1
check_quantized_results_close,
run_conv_or_fc
)
from hypothesis import given
dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
Reported by Pylint.
Line: 40
Column: 9
in_quantized,
out_quantized,
gc,
dc,
):
# X and W have scale 1, so exactly represented after quantization
# This was made sure by having at least one 0 and one 255 for unsigned
# 8-bit tensors, and at least one -128 and one 127 for signed 8-bit
# tensors.
Reported by Pylint.
Line: 74
Column: 9
# No input quantization error in bias
b = np.round(np.random.randn(output_channels)).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine"])
outputs = []
op_engine_list = [
("FC", ""),
("FC", "DNNLOWP_ACC16"),
Reported by Pylint.
Line: 137
Column: 9
out_quantized,
prepack_weight,
gc,
dc,
):
# X and W have scale 1, so exactly represented after quantization
# This was made sure by having at least one 0 and one 255 for unsigned
# 8-bit tensors, and at least one -128 and one 127 for signed 8-bit
# tensors.
Reported by Pylint.
Line: 169
Column: 9
W[:, 1] = W_min + 128
# No input quantization error in bias
b = np.round(np.random.randn(output_channels)).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine"])
outputs = []
op_engine_list = [
("FC", ""),
("FC", "DNNLOWP_ACC16"),
Reported by Pylint.
Line: 1
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
Reported by Pylint.
Line: 21
Column: 1
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
class DNNLowPFullyConnectedAcc16OpTest(hu.HypothesisTestCase):
# correctness test with no quantization error in inputs
# fbgemm currently only supports N a multiple of 64
@given(
input_channels=st.sampled_from([32, 64]),
output_channels=st.sampled_from([64, 128, 256]),
Reported by Pylint.
Line: 31
Column: 5
in_quantized=st.booleans(),
out_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_fully_connected_acc16_int(
self,
input_channels,
output_channels,
batch_size,
Reported by Pylint.
Line: 31
Column: 5
in_quantized=st.booleans(),
out_quantized=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_fully_connected_acc16_int(
self,
input_channels,
output_channels,
batch_size,
Reported by Pylint.
torch/fx/experimental/unification/multipledispatch/dispatcher.py
40 issues
Line: 3
Column: 1
from warnings import warn
import inspect
from .conflict import ordering, ambiguities, super_signature, AmbiguityWarning
from .utils import expand_tuples
from .variadic import Variadic, isvariadic
import itertools as itl
class MDNotImplementedError(NotImplementedError):
Reported by Pylint.
Line: 4
Column: 1
from warnings import warn
import inspect
from .conflict import ordering, ambiguities, super_signature, AmbiguityWarning
from .utils import expand_tuples
from .variadic import Variadic, isvariadic
import itertools as itl
class MDNotImplementedError(NotImplementedError):
Reported by Pylint.
Line: 5
Column: 1
import inspect
from .conflict import ordering, ambiguities, super_signature, AmbiguityWarning
from .utils import expand_tuples
from .variadic import Variadic, isvariadic
import itertools as itl
class MDNotImplementedError(NotImplementedError):
""" A NotImplementedError for multiple dispatch """
Reported by Pylint.
Line: 13
Column: 32
""" A NotImplementedError for multiple dispatch """
def ambiguity_warn(dispatcher, ambiguities):
""" Raise warning when ambiguity is detected
Parameters
----------
dispatcher : Dispatcher
The dispatcher on which the ambiguity was detected
Reported by Pylint.
Line: 37
Column: 22
)
def restart_ordering(on_ambiguity=ambiguity_warn):
"""Deprecated interface to temporarily resume ordering.
"""
warn(
'restart_ordering is deprecated, if you would like to eagerly order'
'the dispatchers, you should call the ``reorder()`` method on each'
Reported by Pylint.
Line: 75
Column: 5
# we're not matching a variadic argument, so move to the next
# element in the signature
sig = next(sigiter)
else:
try:
sig = next(sigiter)
except StopIteration:
assert isvariadic(sig)
yield True
Reported by Pylint.
Line: 241
Column: 9
return self.reorder()
def reorder(self, on_ambiguity=ambiguity_warn):
self._ordering = od = ordering(self.funcs)
amb = ambiguities(self.funcs)
if amb:
on_ambiguity(self, amb)
return od
Reported by Pylint.
Line: 254
Column: 17
except KeyError:
func = self.dispatch(*types)
if not func:
raise NotImplementedError(
'Could not find signature for %s: <%s>' %
(self.name, str_signature(types)))
self._cache[types] = func
try:
return func(*args, **kwargs)
Reported by Pylint.
Line: 270
Column: 13
except MDNotImplementedError:
pass
raise NotImplementedError(
"Matching functions for "
"%s: <%s> found, but none completed successfully" % (
self.name, str_signature(types),),)
def __str__(self):
Reported by Pylint.
Line: 333
Column: 9
def __setstate__(self, d):
self.name = d['name']
self.funcs = d['funcs']
self._ordering = ordering(self.funcs)
self._cache = dict()
@property
def __doc__(self):
docs = ["Multiply dispatched method: %s" % self.name]
Reported by Pylint.
test/fx/test_fx_const_fold.py
40 issues
Line: 3
Column: 1
import unittest
import torch
from torch.fx.experimental import const_fold
class TestConstFold(unittest.TestCase):
def _verify_const_fold_mod(self, mod_folded: const_fold.FoldedGraphModule):
self.assertTrue(mod_folded.const_subgraph_module is not None)
Reported by Pylint.
Line: 4
Column: 1
import unittest
import torch
from torch.fx.experimental import const_fold
class TestConstFold(unittest.TestCase):
def _verify_const_fold_mod(self, mod_folded: const_fold.FoldedGraphModule):
self.assertTrue(mod_folded.const_subgraph_module is not None)
Reported by Pylint.
Line: 1
Column: 1
import unittest
import torch
from torch.fx.experimental import const_fold
class TestConstFold(unittest.TestCase):
def _verify_const_fold_mod(self, mod_folded: const_fold.FoldedGraphModule):
self.assertTrue(mod_folded.const_subgraph_module is not None)
Reported by Pylint.
Line: 7
Column: 1
from torch.fx.experimental import const_fold
class TestConstFold(unittest.TestCase):
def _verify_const_fold_mod(self, mod_folded: const_fold.FoldedGraphModule):
self.assertTrue(mod_folded.const_subgraph_module is not None)
# Check that the constants are attributes in the main subgraph.
num_folded_attrs = 0
Reported by Pylint.
Line: 41
Column: 9
output
"""
class ConstFoldTestModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.attr_1 = torch.nn.Parameter(torch.tensor([[-0.9]]))
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
Reported by Pylint.
Line: 41
Column: 9
output
"""
class ConstFoldTestModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.attr_1 = torch.nn.Parameter(torch.tensor([[-0.9]]))
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
Reported by Pylint.
Line: 47
Column: 13
self.attr_1 = torch.nn.Parameter(torch.tensor([[-0.9]]))
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
def forward(self, x, y):
a = self.attr_1 + self.attr_1
x = x - a
return x * y + self.attr_2
mod = ConstFoldTestModule()
Reported by Pylint.
Line: 47
Column: 13
self.attr_1 = torch.nn.Parameter(torch.tensor([[-0.9]]))
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
def forward(self, x, y):
a = self.attr_1 + self.attr_1
x = x - a
return x * y + self.attr_2
mod = ConstFoldTestModule()
Reported by Pylint.
Line: 47
Column: 13
self.attr_1 = torch.nn.Parameter(torch.tensor([[-0.9]]))
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
def forward(self, x, y):
a = self.attr_1 + self.attr_1
x = x - a
return x * y + self.attr_2
mod = ConstFoldTestModule()
Reported by Pylint.
Line: 48
Column: 17
self.attr_2 = torch.nn.Parameter(torch.tensor([[17.1]]))
def forward(self, x, y):
a = self.attr_1 + self.attr_1
x = x - a
return x * y + self.attr_2
mod = ConstFoldTestModule()
mod_folded: const_fold.FoldedGraphModule = const_fold.split_const_subgraphs(mod)
Reported by Pylint.
tools/testing/test_selections.py
40 issues
Line: 17
Column: 1
)
from typing import Any, Dict, List, Optional, Tuple, cast
from typing_extensions import TypedDict
class JobTimeJSON(TypedDict):
commit: str
JOB_BASE_NAME: str
job_times: Dict[str, float]
Reported by Pylint.
Line: 6
Column: 1
import os
import subprocess
from tools.stats.s3_stat_parser import (
get_previous_reports_for_branch,
get_previous_reports_for_pr,
Report, Version2Report,
HAVE_BOTO3)
from tools.stats.import_test_stats import (
Reported by Pylint.
Line: 160
Column: 12
def _query_changed_test_files() -> List[str]:
cmd = ["git", "diff", "--name-only", "origin/master", "HEAD"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode != 0:
raise RuntimeError("Unable to get changed files")
lines = proc.stdout.decode().strip().split("\n")
Reported by Pylint.
Line: 251
Column: 16
if len(prioritized_tests) == 0:
try:
changed_files = _query_changed_test_files()
except Exception:
# If unable to get changed files from git, quit without doing any sorting
return tests
prefix = f"test{os.path.sep}"
prioritized_tests = [f for f in changed_files if f.startswith(prefix) and f.endswith(".py")]
Reported by Pylint.
Line: 279
Column: 3
return tests
# TODO Refactor this and unify with tools.stats.export_slow_tests
def export_S3_test_times(test_times_filename: Optional[str] = None) -> Dict[str, float]:
test_times: Dict[str, float] = _pull_job_times_from_S3()
if test_times_filename is not None:
print(f'Exporting S3 test stats to {test_times_filename}.')
if os.path.exists(test_times_filename):
Reported by Pylint.
Line: 1
Column: 1
import csv
import json
import os
import subprocess
from tools.stats.s3_stat_parser import (
get_previous_reports_for_branch,
get_previous_reports_for_pr,
Report, Version2Report,
Reported by Pylint.
Line: 4
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import csv
import json
import os
import subprocess
from tools.stats.s3_stat_parser import (
get_previous_reports_for_branch,
get_previous_reports_for_pr,
Report, Version2Report,
Reported by Bandit.
Line: 16
Column: 1
get_slow_tests
)
from typing import Any, Dict, List, Optional, Tuple, cast
from typing_extensions import TypedDict
class JobTimeJSON(TypedDict):
commit: str
JOB_BASE_NAME: str
Reported by Pylint.
Line: 19
Column: 1
from typing import Any, Dict, List, Optional, Tuple, cast
from typing_extensions import TypedDict
class JobTimeJSON(TypedDict):
commit: str
JOB_BASE_NAME: str
job_times: Dict[str, float]
Reported by Pylint.
Line: 19
Column: 1
from typing import Any, Dict, List, Optional, Tuple, cast
from typing_extensions import TypedDict
class JobTimeJSON(TypedDict):
commit: str
JOB_BASE_NAME: str
job_times: Dict[str, float]
Reported by Pylint.
test/distributed/elastic/timer/local_timer_test.py
40 issues
Line: 12
Column: 1
import unittest
import unittest.mock as mock
import torch.distributed.elastic.timer as timer
from torch.distributed.elastic.timer.api import TimerRequest
from torch.distributed.elastic.timer.local_timer import MultiprocessingRequestQueue
from torch.testing._internal.common_utils import (
TEST_WITH_TSAN,
run_tests,
Reported by Pylint.
Line: 13
Column: 1
import unittest.mock as mock
import torch.distributed.elastic.timer as timer
from torch.distributed.elastic.timer.api import TimerRequest
from torch.distributed.elastic.timer.local_timer import MultiprocessingRequestQueue
from torch.testing._internal.common_utils import (
TEST_WITH_TSAN,
run_tests,
IS_WINDOWS,
Reported by Pylint.
Line: 14
Column: 1
import torch.distributed.elastic.timer as timer
from torch.distributed.elastic.timer.api import TimerRequest
from torch.distributed.elastic.timer.local_timer import MultiprocessingRequestQueue
from torch.testing._internal.common_utils import (
TEST_WITH_TSAN,
run_tests,
IS_WINDOWS,
IS_MACOS,
Reported by Pylint.
Line: 15
Column: 1
import torch.distributed.elastic.timer as timer
from torch.distributed.elastic.timer.api import TimerRequest
from torch.distributed.elastic.timer.local_timer import MultiprocessingRequestQueue
from torch.testing._internal.common_utils import (
TEST_WITH_TSAN,
run_tests,
IS_WINDOWS,
IS_MACOS,
sandcastle_skip_if,
Reported by Pylint.
Line: 201
Column: 13
"""
checks that the watchdog function ran wait/interval +- 1 times
"""
self.server._run_watchdog = mock.MagicMock(wraps=self.server._run_watchdog)
wait = 0.1
self.server.start()
time.sleep(wait)
Reported by Pylint.
Line: 201
Column: 62
"""
checks that the watchdog function ran wait/interval +- 1 times
"""
self.server._run_watchdog = mock.MagicMock(wraps=self.server._run_watchdog)
wait = 0.1
self.server.start()
time.sleep(wait)
Reported by Pylint.
Line: 208
Column: 35
self.server.start()
time.sleep(wait)
self.server.stop()
watchdog_call_count = self.server._run_watchdog.call_count
self.assertGreaterEqual(watchdog_call_count, int(wait / self.max_interval) - 1)
self.assertLessEqual(watchdog_call_count, int(wait / self.max_interval) + 1)
def test_watchdog_empty_queue(self):
"""
Reported by Pylint.
Line: 216
Column: 13
"""
checks that the watchdog can run on an empty queue
"""
self.server._run_watchdog()
def _expired_timer(self, pid, scope):
expired = time.time() - 60
return TimerRequest(worker_id=pid, scope_id=scope, expiration_time=expired)
Reported by Pylint.
Line: 240
Column: 13
self.mp_queue.put(self._expired_timer(pid=test_pid, scope="test1"))
self.mp_queue.put(self._valid_timer(pid=test_pid, scope="test2"))
self.server._run_watchdog()
self.assertEqual(0, len(self.server._timers))
mock_os_kill.assert_called_once_with(test_pid, signal.SIGKILL)
@mock.patch("os.kill")
Reported by Pylint.
Line: 242
Column: 37
self.server._run_watchdog()
self.assertEqual(0, len(self.server._timers))
mock_os_kill.assert_called_once_with(test_pid, signal.SIGKILL)
@mock.patch("os.kill")
def test_acquire_release(self, mock_os_kill):
"""
Reported by Pylint.
tools/linter/clang_tidy/run.py
40 issues
Line: 173
Column: 1
return base
async def run_shell_command(
cmd: List[str], on_completed: Any = None, *args: Any
) -> CommandResult:
"""Executes a shell command and runs an optional callback when complete"""
if VERBOSE:
log("Running: ", " ".join(cmd))
Reported by Pylint.
Line: 258
Column: 13
if options.print_include_paths:
base += ["--extra-arg", "-v"]
if options.include_dir:
for dir in options.include_dir:
base += ["--extra-arg", f"-I{dir}"]
base += options.extra_args
if line_filters:
base += ["-line-filter", json.dumps(line_filters)]
Reported by Pylint.
Line: 465
Column: 5
async def _run(options: Any) -> Tuple[CommandResult, List[ClangTidyWarning]]:
# These flags are pervasive enough to set it globally. It makes the code
# cleaner compared to threading it through every single function.
global VERBOSE
global QUIET
VERBOSE = options.verbose
QUIET = options.quiet
# Normalize the paths first
Reported by Pylint.
Line: 466
Column: 5
# These flags are pervasive enough to set it globally. It makes the code
# cleaner compared to threading it through every single function.
global VERBOSE
global QUIET
VERBOSE = options.verbose
QUIET = options.quiet
# Normalize the paths first
paths = [path.rstrip("/") for path in options.paths]
Reported by Pylint.
Line: 471
Column: 5
QUIET = options.quiet
# Normalize the paths first
paths = [path.rstrip("/") for path in options.paths]
# Filter files
if options.diff_file:
files, line_filters = filter_from_diff_file(options.paths, options.diff_file)
else:
Reported by Pylint.
Line: 45
Column: 1
QUIET = False
def log(*args: Any, **kwargs: Any) -> None:
if not QUIET:
print(*args, **kwargs)
class CommandResult:
Reported by Pylint.
Line: 50
Column: 1
print(*args, **kwargs)
class CommandResult:
def __init__(self, returncode: int, stdout: str, stderr: str):
self.returncode = returncode
self.stdout = stdout.strip()
self.stderr = stderr.strip()
Reported by Pylint.
Line: 56
Column: 5
self.stdout = stdout.strip()
self.stderr = stderr.strip()
def failed(self) -> bool:
return self.returncode != 0
def __add__(self, other: "CommandResult") -> "CommandResult":
return CommandResult(
self.returncode + other.returncode,
Reported by Pylint.
Line: 77
Column: 1
)
class ProgressMeter:
def __init__(
self, num_items: int, start_msg: str = "", disable_progress_bar: bool = False
) -> None:
self.num_items = num_items
self.num_processed = 0
Reported by Pylint.
Line: 77
Column: 1
)
class ProgressMeter:
def __init__(
self, num_items: int, start_msg: str = "", disable_progress_bar: bool = False
) -> None:
self.num_items = num_items
self.num_processed = 0
Reported by Pylint.
caffe2/contrib/playground/checkpoint.py
40 issues
Line: 30
Column: 14
def initialize_master_xpu_model_params(model, weights_file, opts, reset_epoch):
log.info("Initializing model params from file: {}".format(weights_file))
with open(weights_file, 'r') as fopen:
blobs = pickle.load(fopen)
if 'blobs' in blobs:
blobs = blobs['blobs']
Reported by Pylint.
Line: 32
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b301-pickle
def initialize_master_xpu_model_params(model, weights_file, opts, reset_epoch):
log.info("Initializing model params from file: {}".format(weights_file))
with open(weights_file, 'r') as fopen:
blobs = pickle.load(fopen)
if 'blobs' in blobs:
blobs = blobs['blobs']
start_epoch = 0
best_metric = float('-inf')
Reported by Bandit.
Line: 39
Column: 18
start_epoch = 0
best_metric = float('-inf')
if 'epoch' in blobs:
log.info('epoch {} is found in model file'.format(blobs['epoch']))
if not reset_epoch:
start_epoch = blobs['epoch']
else:
log.info('Reset epoch')
else:
Reported by Pylint.
Line: 53
Column: 18
best_metric = blobs['best_metric']
if model is not None:
log.info('initialize model parameters using weights file: {}'.format(
weights_file
))
ws_blobs = workspace.Blobs()
unscoped_blob_names = OrderedDict()
for blob in model.GetAllParams():
Reported by Pylint.
Line: 81
Column: 33
ws_blob = workspace.FetchBlob(scoped_blob_name)
if not ws_blob.shape == blobs[unscoped_blob_name].shape:
log.info(
('Workspace blob {} with shape {} does '
'not match weights file shape {}').format(
unscoped_blob_name, ws_blob.shape,
blobs[unscoped_blob_name].shape)
)
else:
Reported by Pylint.
Line: 92
Column: 18
blobs[unscoped_blob_name].astype(
np.float32, copy=False))
else:
log.info('Skip initializing model parameters from file: {}'.format(
weights_file
))
log.info('Complete initialize_master_xpu_model_params')
return start_epoch, lr, best_metric
Reported by Pylint.
Line: 116
Column: 22
for idx in range(params_per_xpu):
blobs = [param for param in params[idx::params_per_xpu]]
data = workspace.FetchBlob(blobs[0])
log.info('Broadcasting {} to'.format(str(blobs[0])))
for i, p in enumerate(blobs[1:]):
log.info(' |-> {}'.format(str(p)))
with core.DeviceScope(core.DeviceOption(caffe2_pb2_DEVICE, i+1)):
workspace.FeedBlob(p, data)
log.info("Complete parameter broadcast")
Reported by Pylint.
Line: 118
Column: 26
data = workspace.FetchBlob(blobs[0])
log.info('Broadcasting {} to'.format(str(blobs[0])))
for i, p in enumerate(blobs[1:]):
log.info(' |-> {}'.format(str(p)))
with core.DeviceScope(core.DeviceOption(caffe2_pb2_DEVICE, i+1)):
workspace.FeedBlob(p, data)
log.info("Complete parameter broadcast")
Reported by Pylint.
Line: 124
Column: 23
log.info("Complete parameter broadcast")
def save_model_params(is_checkpoint, model, checkpoint_path, epoch, opts, best_metric):
# best_metric=float('-inf')
if checkpoint_path is None:
return None
try:
Reported by Pylint.
Line: 133
Column: 12
save_model_params_blob(
model, checkpoint_path, epoch, opts, best_metric
)
except Exception as e:
log.warning('Exception from save_model_params {}'.format(str(e)))
return checkpoint_path
def save_model_params_blob(model, params_file, epoch, opts, best_metric):
Reported by Pylint.