The following issues were found
caffe2/python/test/executor_test.py
10 issues
Line: 16
Column: 1
from caffe2.python.test_util import TestCase
from hypothesis import given
import hypothesis.strategies as st
import unittest
Reported by Pylint.
Line: 17
Column: 1
from caffe2.python.test_util import TestCase
from hypothesis import given
import hypothesis.strategies as st
import unittest
EXECUTORS = ["parallel", "async_scheduling"]
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
from caffe2.python.test.executor_test_util import (
build_conv_model,
build_resnet50_dataparallel_model,
run_resnet50_epoch,
Reported by Pylint.
Line: 19
Column: 1
from hypothesis import given
import hypothesis.strategies as st
import unittest
EXECUTORS = ["parallel", "async_scheduling"]
ITERATIONS = 1
Reported by Pylint.
Line: 26
Column: 1
ITERATIONS = 1
class ExecutorCPUConvNetTest(ExecutorTestBase):
@given(executor=st.sampled_from(EXECUTORS),
model_name=st.sampled_from(executor_test_model_names()),
batch_size=st.sampled_from([1]),
num_workers=st.sampled_from([8]))
@executor_test_settings
Reported by Pylint.
Line: 32
Column: 5
batch_size=st.sampled_from([1]),
num_workers=st.sampled_from([8]))
@executor_test_settings
def test_executor(self, executor, model_name, batch_size, num_workers):
model = build_conv_model(model_name, batch_size)
model.Proto().num_workers = num_workers
def run_model():
iterations = ITERATIONS
Reported by Pylint.
Line: 51
Column: 1
@unittest.skipIf(not workspace.has_gpu_support, "no gpu")
class ExecutorGPUResNetTest(ExecutorTestBase):
@given(executor=st.sampled_from(EXECUTORS),
num_workers=st.sampled_from([8]))
@executor_test_settings
def test_executor(self, executor, num_workers):
model = build_resnet50_dataparallel_model(
Reported by Pylint.
Line: 55
Column: 5
@given(executor=st.sampled_from(EXECUTORS),
num_workers=st.sampled_from([8]))
@executor_test_settings
def test_executor(self, executor, num_workers):
model = build_resnet50_dataparallel_model(
num_gpus=workspace.NumGpuDevices(), batch_size=8, epoch_size=8)
model.Proto().num_workers = num_workers
def run_model():
Reported by Pylint.
Line: 71
Column: 1
)
class ExecutorFailingOpTest(TestCase):
def test_failing_op(self):
def create_failing_net(throw_exception):
net = core.Net("failing_net")
if throw_exception:
net.ThrowException([], [])
Reported by Pylint.
Line: 72
Column: 5
class ExecutorFailingOpTest(TestCase):
def test_failing_op(self):
def create_failing_net(throw_exception):
net = core.Net("failing_net")
if throw_exception:
net.ThrowException([], [])
else:
Reported by Pylint.
test/distributed/_sharding_spec/test_sharding_spec.py
10 issues
Line: 1
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase
from torch.distributed._sharding_spec import (
ChunkShardingSpec,
DevicePlacementSpec,
EnumerableShardingSpec,
ShardMetadata,
)
from torch.distributed._sharding_spec._internals import check_tensor
Reported by Pylint.
Line: 2
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase
from torch.distributed._sharding_spec import (
ChunkShardingSpec,
DevicePlacementSpec,
EnumerableShardingSpec,
ShardMetadata,
)
from torch.distributed._sharding_spec._internals import check_tensor
Reported by Pylint.
Line: 3
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase
from torch.distributed._sharding_spec import (
ChunkShardingSpec,
DevicePlacementSpec,
EnumerableShardingSpec,
ShardMetadata,
)
from torch.distributed._sharding_spec._internals import check_tensor
Reported by Pylint.
Line: 9
Column: 1
EnumerableShardingSpec,
ShardMetadata,
)
from torch.distributed._sharding_spec._internals import check_tensor
from torch.testing._internal.common_utils import (
run_tests,
sandcastle_skip_if,
)
Reported by Pylint.
Line: 11
Column: 1
)
from torch.distributed._sharding_spec._internals import check_tensor
from torch.testing._internal.common_utils import (
run_tests,
sandcastle_skip_if,
)
class TestShardingSpec(TestCase):
Reported by Pylint.
Line: 1
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase
from torch.distributed._sharding_spec import (
ChunkShardingSpec,
DevicePlacementSpec,
EnumerableShardingSpec,
ShardMetadata,
)
from torch.distributed._sharding_spec._internals import check_tensor
Reported by Pylint.
Line: 16
Column: 1
sandcastle_skip_if,
)
class TestShardingSpec(TestCase):
@sandcastle_skip_if(torch.cuda.device_count() < 2, '2 CUDA GPUs are needed')
def test_device_placement(self):
# valid devices
DevicePlacementSpec("cuda:0")
Reported by Pylint.
Line: 19
Column: 5
class TestShardingSpec(TestCase):
@sandcastle_skip_if(torch.cuda.device_count() < 2, '2 CUDA GPUs are needed')
def test_device_placement(self):
# valid devices
DevicePlacementSpec("cuda:0")
DevicePlacementSpec(torch.device(0))
DevicePlacementSpec(torch.device("cuda:0"))
DevicePlacementSpec("rank:0/cuda:0")
Reported by Pylint.
Line: 39
Column: 5
DevicePlacementSpec("rank:0/cpu2")
@sandcastle_skip_if(torch.cuda.device_count() < 2, '2 CUDA GPUs are needed')
def test_chunked_sharding_spec(self):
# Test valid specs.
ChunkShardingSpec(0, [torch.device(0), torch.device(1)])
# Named dimension.
ChunkShardingSpec("N", ["cuda:0", "cuda:1"])
ChunkShardingSpec(0, [torch.device("cuda:0"), torch.device("cuda:1")])
Reported by Pylint.
Line: 69
Column: 5
ChunkShardingSpec(0, ["rank:0/cuda:foo", "cuda:1"])
@sandcastle_skip_if(torch.cuda.device_count() < 2, '2 CUDA GPUs are needed')
def test_enumerable_sharding_spec(self):
# test valid specs
# test row-wise sharding
spec = EnumerableShardingSpec([
ShardMetadata(
Reported by Pylint.
caffe2/python/session_test.py
10 issues
Line: 55
Column: 14
out2 = pipe(out1, processor=proc2)
pipe(out2, dst_ds.writer())
ws = workspace.C.Workspace()
FeedRecord(src_blobs, src_values, ws)
session = LocalSession(ws)
session.run(init_net)
session.run(tg)
output = FetchRecord(dst_blobs, ws=ws)
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python.schema import (
Struct, FetchRecord, NewRecord, FeedRecord, InitEmptyRecord)
from caffe2.python import core, workspace
from caffe2.python.session import LocalSession
Reported by Pylint.
Line: 17
Column: 1
import numpy as np
class TestLocalSession(TestCase):
def test_local_session(self):
init_net = core.Net('init')
src_values = Struct(
('uid', np.array([1, 2, 6])),
('value', np.array([1.4, 1.6, 1.7])))
Reported by Pylint.
Line: 18
Column: 5
class TestLocalSession(TestCase):
def test_local_session(self):
init_net = core.Net('init')
src_values = Struct(
('uid', np.array([1, 2, 6])),
('value', np.array([1.4, 1.6, 1.7])))
expected_dst = Struct(
Reported by Pylint.
Line: 18
Column: 5
class TestLocalSession(TestCase):
def test_local_session(self):
init_net = core.Net('init')
src_values = Struct(
('uid', np.array([1, 2, 6])),
('value', np.array([1.4, 1.6, 1.7])))
expected_dst = Struct(
Reported by Pylint.
Line: 18
Column: 5
class TestLocalSession(TestCase):
def test_local_session(self):
init_net = core.Net('init')
src_values = Struct(
('uid', np.array([1, 2, 6])),
('value', np.array([1.4, 1.6, 1.7])))
expected_dst = Struct(
Reported by Pylint.
Line: 50
Column: 29
src_ds = Dataset(src_blobs)
dst_ds = Dataset(dst_blobs)
with TaskGroup() as tg:
out1 = pipe(src_ds.reader(), processor=proc1)
out2 = pipe(out1, processor=proc2)
pipe(out2, dst_ds.writer())
ws = workspace.C.Workspace()
Reported by Pylint.
Line: 55
Column: 9
out2 = pipe(out1, processor=proc2)
pipe(out2, dst_ds.writer())
ws = workspace.C.Workspace()
FeedRecord(src_blobs, src_values, ws)
session = LocalSession(ws)
session.run(init_net)
session.run(tg)
output = FetchRecord(dst_blobs, ws=ws)
Reported by Pylint.
Line: 62
Column: 13
session.run(tg)
output = FetchRecord(dst_blobs, ws=ws)
for a, b in zip(output.field_blobs(), expected_dst.field_blobs()):
np.testing.assert_array_equal(a, b)
Reported by Pylint.
Line: 62
Column: 16
session.run(tg)
output = FetchRecord(dst_blobs, ws=ws)
for a, b in zip(output.field_blobs(), expected_dst.field_blobs()):
np.testing.assert_array_equal(a, b)
Reported by Pylint.
caffe2/python/operator_test/bucketize_op_test.py
10 issues
Line: 6
Column: 1
from caffe2.python import core
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu
import numpy as np
class TestBucketizeOp(hu.HypothesisTestCase):
Reported by Pylint.
Line: 17
Column: 40
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
def test_bucketize_op(self, x, gc, dc):
length = np.random.randint(low=1, high=5)
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu
import numpy as np
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class TestBucketizeOp(hu.HypothesisTestCase):
@given(
x=hu.tensor(
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
Reported by Pylint.
Line: 17
Column: 5
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
def test_bucketize_op(self, x, gc, dc):
length = np.random.randint(low=1, high=5)
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
Reported by Pylint.
Line: 17
Column: 5
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
def test_bucketize_op(self, x, gc, dc):
length = np.random.randint(low=1, high=5)
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
Reported by Pylint.
Line: 17
Column: 5
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
def test_bucketize_op(self, x, gc, dc):
length = np.random.randint(low=1, high=5)
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
Reported by Pylint.
Line: 17
Column: 5
min_dim=1, max_dim=2, dtype=np.float32,
elements=hu.floats(min_value=-5, max_value=5)),
**hu.gcs)
def test_bucketize_op(self, x, gc, dc):
length = np.random.randint(low=1, high=5)
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
Reported by Pylint.
Line: 22
Column: 9
boundaries = np.random.randn(length) * 5
boundaries.sort()
def ref(x, boundaries):
bucket_idx = np.digitize(x, boundaries, right=True)
return [bucket_idx]
op = core.CreateOperator('Bucketize',
["X"], ["INDICES"],
Reported by Pylint.
Line: 26
Column: 9
bucket_idx = np.digitize(x, boundaries, right=True)
return [bucket_idx]
op = core.CreateOperator('Bucketize',
["X"], ["INDICES"],
boundaries=boundaries)
self.assertReferenceChecks(gc, op, [x, boundaries], ref)
Reported by Pylint.
test/cpp/jit/test_gpu.cpp
10 issues
Line: 101
// (These tests exercise IrGraphGenerator through a non-trivial IR,
// to make sure that it runs w/o crashing. The actual output is not
// validated)
TEST(NVFuserTest, IrGraphGenerator_CUDA) {
Fusion fusion;
FusionGuard fg(&fusion);
// Make sure we can handle empty IRs
TORCH_CHECK(!IrGraphGenerator::toGraphviz(
Reported by Cppcheck.
Line: 472
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
at::Tensor tv2_ref = input2 + 2.0;
at::Tensor output_ref = input1 + tv2_ref;
TORCH_CHECK(output_ref.equal(outputs[0]));
}
TEST(NVFuserTest, FusionCopy_CUDA) {
Fusion original_fusion;
Reported by FlawFinder.
Line: 1168
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
fe.compileFusion(fusion.get());
auto outputs = fe.runFusion({input1, input2});
at::Tensor output_ref = input1 * input2 * input1;
TORCH_CHECK(output_ref.equal(outputs[0]));
}
TEST(NVFuserTest, FusionForLoop_CUDA) {
// TODO(kir): re-enable this test
// due to the current "GpuLower guard" approach, we can only create
Reported by FlawFinder.
Line: 1246
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
at::Tensor output_ref = at::zeros_like(output, options);
output_ref = output_ref + 0.0 + 1.0 + 2.0 + 3.0;
TORCH_CHECK(output_ref.equal(output));
}
TEST(NVFuserTest, FusionCodeGen2_CUDA) {
Fusion fusion;
FusionGuard fg(&fusion);
Reported by FlawFinder.
Line: 1288
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
at::Tensor tv2_ref = input2 + 2.0;
at::Tensor output_ref = input1 + tv2_ref;
TORCH_CHECK(output_ref.equal(outputs[0]));
}
TEST(NVFuserTest, FusionSimplePWise_CUDA) {
Fusion fusion;
FusionGuard fg(&fusion);
Reported by FlawFinder.
Line: 1345
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
at::Tensor tv2_ref = input2 + 2.0;
at::Tensor output_ref = input1 + tv2_ref;
TORCH_CHECK(output_ref.equal(output));
}
TEST(NVFuserTest, FusionExecKernel_CUDA) {
Fusion fusion;
FusionGuard fg(&fusion);
Reported by FlawFinder.
Line: 1395
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
at::Tensor check = at::full({1, 128}, 4, options);
;
TORCH_CHECK(outputs[0].equal(check));
}
int ceilDiv_(int a, int b) {
return (a + b - 1) / b;
}
Reported by FlawFinder.
Line: 2413
Column: 26
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
fe.compileFusion(&fusion);
auto outputs = fe.runFusion({input0, input1});
TORCH_CHECK(outputs[0].equal(input0.add(input1.add(2.0))));
}
/*
* Helper function for single op testing that generates a codegen operand
*/
Reported by FlawFinder.
Line: 2566
Column: 21
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
TORCH_CHECK(
(output.scalar_type() == at::kBool
? output.equal(ref_output)
:
// The absolute Tolerance was raised to 1e-07 from 1e-08 to allow
// allow for the remainder function to pass.
output.allclose(ref_output, /*rtol*/ 1e-05, /*atol*/ 1e-07)),
"\nOp Type: -- ",
Reported by FlawFinder.
Line: 2899
Column: 18
CWE codes:
126
Suggestion:
This function is often discouraged by most C++ coding standards in favor of its safer alternatives provided since C++14. Consider using a form of this function that checks the second iterator before potentially overflowing it
ref_output = at::_cast_Half(at::_cast_Float(input1));
TORCH_CHECK(
outputs[0].equal(ref_output),
"\nOp Type: -- ",
"cast FP16->FP32->FP16",
" -- had a mismatch.\n",
"\nABS MAX DIFF: ",
outputs[0].sub(ref_output).abs().max(),
Reported by FlawFinder.
test/custom_backend/test_custom_backend.py
10 issues
Line: 3
Column: 1
import os
import tempfile
import torch
from backend import Model, to_custom_backend, get_custom_backend_library_path
from torch.testing._internal.common_utils import TestCase, run_tests
class TestCustomBackend(TestCase):
Reported by Pylint.
Line: 6
Column: 1
import torch
from backend import Model, to_custom_backend, get_custom_backend_library_path
from torch.testing._internal.common_utils import TestCase, run_tests
class TestCustomBackend(TestCase):
def setUp(self):
# Load the library containing the custom backend.
Reported by Pylint.
Line: 1
Column: 1
import os
import tempfile
import torch
from backend import Model, to_custom_backend, get_custom_backend_library_path
from torch.testing._internal.common_utils import TestCase, run_tests
class TestCustomBackend(TestCase):
Reported by Pylint.
Line: 6
Column: 1
import torch
from backend import Model, to_custom_backend, get_custom_backend_library_path
from torch.testing._internal.common_utils import TestCase, run_tests
class TestCustomBackend(TestCase):
def setUp(self):
# Load the library containing the custom backend.
Reported by Pylint.
Line: 9
Column: 1
from torch.testing._internal.common_utils import TestCase, run_tests
class TestCustomBackend(TestCase):
def setUp(self):
# Load the library containing the custom backend.
self.library_path = get_custom_backend_library_path()
torch.ops.load_library(self.library_path)
# Create an instance of the test Module and lower it for
Reported by Pylint.
Line: 10
Column: 5
class TestCustomBackend(TestCase):
def setUp(self):
# Load the library containing the custom backend.
self.library_path = get_custom_backend_library_path()
torch.ops.load_library(self.library_path)
# Create an instance of the test Module and lower it for
# the custom backend.
Reported by Pylint.
Line: 10
Column: 5
class TestCustomBackend(TestCase):
def setUp(self):
# Load the library containing the custom backend.
self.library_path = get_custom_backend_library_path()
torch.ops.load_library(self.library_path)
# Create an instance of the test Module and lower it for
# the custom backend.
Reported by Pylint.
Line: 22
Column: 9
"""
Test execution using the custom backend.
"""
a = torch.randn(4)
b = torch.randn(4)
# The custom backend is hardcoded to compute f(a, b) = (a + b, a - b).
expected = (a + b, a - b)
out = self.model(a, b)
self.assertTrue(expected[0].allclose(out[0]))
Reported by Pylint.
Line: 23
Column: 9
Test execution using the custom backend.
"""
a = torch.randn(4)
b = torch.randn(4)
# The custom backend is hardcoded to compute f(a, b) = (a + b, a - b).
expected = (a + b, a - b)
out = self.model(a, b)
self.assertTrue(expected[0].allclose(out[0]))
self.assertTrue(expected[1].allclose(out[1]))
Reported by Pylint.
Line: 40
Column: 9
self.test_execute()
# Save and load.
f = tempfile.NamedTemporaryFile(delete=False)
try:
f.close()
torch.jit.save(self.model, f.name)
loaded = torch.jit.load(f.name)
finally:
Reported by Pylint.
test/distributed/elastic/utils/data/cycling_iterator_test.py
10 issues
Line: 10
Column: 1
# LICENSE file in the root directory of this source tree.
import unittest
from torch.distributed.elastic.utils.data import CyclingIterator
class CyclingIteratorTest(unittest.TestCase):
def generator(self, epoch, stride, max_epochs):
# generate an continuously incrementing list each epoch
Reported by Pylint.
Line: 14
Column: 40
class CyclingIteratorTest(unittest.TestCase):
def generator(self, epoch, stride, max_epochs):
# generate an continuously incrementing list each epoch
# e.g. [0,1,2] [3,4,5] [6,7,8] ...
return iter([stride * epoch + i for i in range(0, stride)])
def test_cycling_iterator(self):
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import unittest
Reported by Pylint.
Line: 13
Column: 1
from torch.distributed.elastic.utils.data import CyclingIterator
class CyclingIteratorTest(unittest.TestCase):
def generator(self, epoch, stride, max_epochs):
# generate an continuously incrementing list each epoch
# e.g. [0,1,2] [3,4,5] [6,7,8] ...
return iter([stride * epoch + i for i in range(0, stride)])
Reported by Pylint.
Line: 14
Column: 5
class CyclingIteratorTest(unittest.TestCase):
def generator(self, epoch, stride, max_epochs):
# generate an continuously incrementing list each epoch
# e.g. [0,1,2] [3,4,5] [6,7,8] ...
return iter([stride * epoch + i for i in range(0, stride)])
def test_cycling_iterator(self):
Reported by Pylint.
Line: 14
Column: 5
class CyclingIteratorTest(unittest.TestCase):
def generator(self, epoch, stride, max_epochs):
# generate an continuously incrementing list each epoch
# e.g. [0,1,2] [3,4,5] [6,7,8] ...
return iter([stride * epoch + i for i in range(0, stride)])
def test_cycling_iterator(self):
Reported by Pylint.
Line: 19
Column: 5
# e.g. [0,1,2] [3,4,5] [6,7,8] ...
return iter([stride * epoch + i for i in range(0, stride)])
def test_cycling_iterator(self):
stride = 3
max_epochs = 90
def generator_fn(epoch):
return self.generator(epoch, stride, max_epochs)
Reported by Pylint.
Line: 26
Column: 9
def generator_fn(epoch):
return self.generator(epoch, stride, max_epochs)
it = CyclingIterator(n=max_epochs, generator_fn=generator_fn)
for i in range(0, stride * max_epochs):
self.assertEqual(i, next(it))
with self.assertRaises(StopIteration):
next(it)
Reported by Pylint.
Line: 33
Column: 5
with self.assertRaises(StopIteration):
next(it)
def test_cycling_iterator_start_epoch(self):
stride = 3
max_epochs = 2
start_epoch = 1
def generator_fn(epoch):
Reported by Pylint.
Line: 41
Column: 9
def generator_fn(epoch):
return self.generator(epoch, stride, max_epochs)
it = CyclingIterator(max_epochs, generator_fn, start_epoch)
for i in range(stride * start_epoch, stride * max_epochs):
self.assertEqual(i, next(it))
with self.assertRaises(StopIteration):
next(it)
Reported by Pylint.
caffe2/quantization/server/dequantize_dnnlowp_op_test.py
10 issues
Line: 6
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server.dnnlowp_test_utils import check_quantized_results_close
from hypothesis import given
Reported by Pylint.
Line: 10
Column: 1
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server.dnnlowp_test_utils import check_quantized_results_close
from hypothesis import given
dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
Reported by Pylint.
Line: 19
Column: 59
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Reported by Pylint.
Line: 1
Column: 1
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server.dnnlowp_test_utils import check_quantized_results_close
Reported by Pylint.
Line: 17
Column: 1
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
Reported by Pylint.
Line: 19
Column: 5
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Reported by Pylint.
Line: 19
Column: 5
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Reported by Pylint.
Line: 19
Column: 5
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Reported by Pylint.
Line: 19
Column: 5
class DNNLowPDequantizeOpTest(hu.HypothesisTestCase):
@given(size=st.integers(1024, 2048), is_empty=st.booleans(), **hu.gcs_cpu_only)
def test_dnnlowp_dequantize(self, size, is_empty, gc, dc):
if is_empty:
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Reported by Pylint.
Line: 24
Column: 9
size = 0
min_ = -10.0
max_ = 20.0
X = (np.random.rand(size) * (max_ - min_) + min_).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine"])
outputs = []
op_type_list = ["Dequantize", "Int8Dequantize"]
Reported by Pylint.
caffe2/python/nomnigraph_transformations.py
10 issues
Line: 1
Column: 1
from collections import defaultdict
import caffe2.python.nomnigraph as ng
from caffe2.python import core, utils
def transpose_network(nn):
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python import core, utils
def transpose_network(nn):
"""
Convert all Convolutions operators which are in the NCHW order
to NHWC order and also transform their inputs and outputs so that the
rest of the graph is not affected.
"""
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python import core, utils
def transpose_network(nn):
"""
Convert all Convolutions operators which are in the NCHW order
to NHWC order and also transform their inputs and outputs so that the
rest of the graph is not affected.
"""
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python import core, utils
def transpose_network(nn):
"""
Convert all Convolutions operators which are in the NCHW order
to NHWC order and also transform their inputs and outputs so that the
rest of the graph is not affected.
"""
Reported by Pylint.
Line: 20
Column: 1
# track outgoing tensors from NCHW2NHWC operators
outgoing = defaultdict(lambda: []) # input tensor -> list of operators
dfg = nn.dataFlow
orig_nodes = [x for x in nn.nodes]
for node in orig_nodes:
if node.isOperator() and node.name == "Conv":
arg_dict = utils.ArgsToDict(node.annotation.operator_def.arg)
# a missing "order" argument implies default NCHW order
if "order" in arg_dict and arg_dict["order"] != "NCHW":
Reported by Pylint.
Line: 27
Column: 1
# a missing "order" argument implies default NCHW order
if "order" in arg_dict and arg_dict["order"] != "NCHW":
continue
inputs = [x for x in node.inputs]
assert len(inputs) >= 2, "Conv operator should have two inputs"
outputs = [x for x in node.outputs]
assert len(outputs) >= 1, "Conv operator should have an output"
for inp in inputs:
nn.deleteEdge(inp, node)
Reported by Pylint.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if "order" in arg_dict and arg_dict["order"] != "NCHW":
continue
inputs = [x for x in node.inputs]
assert len(inputs) >= 2, "Conv operator should have two inputs"
outputs = [x for x in node.outputs]
assert len(outputs) >= 1, "Conv operator should have an output"
for inp in inputs:
nn.deleteEdge(inp, node)
for outp in outputs:
Reported by Bandit.
Line: 29
Column: 1
continue
inputs = [x for x in node.inputs]
assert len(inputs) >= 2, "Conv operator should have two inputs"
outputs = [x for x in node.outputs]
assert len(outputs) >= 1, "Conv operator should have an output"
for inp in inputs:
nn.deleteEdge(inp, node)
for outp in outputs:
nn.deleteEdge(node, outp)
Reported by Pylint.
Line: 30
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
inputs = [x for x in node.inputs]
assert len(inputs) >= 2, "Conv operator should have two inputs"
outputs = [x for x in node.outputs]
assert len(outputs) >= 1, "Conv operator should have an output"
for inp in inputs:
nn.deleteEdge(inp, node)
for outp in outputs:
nn.deleteEdge(node, outp)
# only the first two inputs of the Convolution the data and the
Reported by Bandit.
Line: 44
Column: 13
nn.createEdge(transp, new_inp)
outgoing[inputs[idx]].append(transp)
inputs[idx] = new_inp
for idx in range(len(outputs)):
new_outp = nn.createUniqueDataNode(outputs[idx].name)
transp = dfg.createNode(ng.NeuralNetOperator("NHWC2NCHW"))
nn.createEdge(transp, outputs[idx])
nn.createEdge(new_outp, transp)
incoming[outputs[idx]] = new_outp
Reported by Pylint.
caffe2/python/operator_test/async_net_barrier_test.py
10 issues
Line: 4
Column: 1
#!/usr/bin/env python3
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
from hypothesis import given
Reported by Pylint.
Line: 7
Column: 1
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
from hypothesis import given
class TestAsyncNetBarrierOp(hu.HypothesisTestCase):
@given(
n=st.integers(1, 5),
Reported by Pylint.
Line: 16
Column: 51
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]
barrier_op = core.CreateOperator(
"AsyncNetBarrier",
Reported by Pylint.
Line: 28
Column: 13
)
def reference_func(*args):
self.assertEquals(len(args), n)
return args
self.assertReferenceChecks(gc, barrier_op, test_inputs, reference_func)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
from hypothesis import given
Reported by Pylint.
Line: 10
Column: 1
from hypothesis import given
class TestAsyncNetBarrierOp(hu.HypothesisTestCase):
@given(
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
Reported by Pylint.
Line: 15
Column: 5
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]
barrier_op = core.CreateOperator(
Reported by Pylint.
Line: 15
Column: 5
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]
barrier_op = core.CreateOperator(
Reported by Pylint.
Line: 15
Column: 5
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]
barrier_op = core.CreateOperator(
Reported by Pylint.
Line: 15
Column: 5
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]
barrier_op = core.CreateOperator(
Reported by Pylint.