The following issues were found
caffe2/python/operator_test/reduction_ops_test.py
55 issues
Line: 7
Column: 1
from caffe2.python import core, workspace
from hypothesis import assume, given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
Reported by Pylint.
Line: 10
Column: 1
from hypothesis import assume, given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
class TestReductionOps(serial.SerializedTestCase):
Reported by Pylint.
Line: 17
Column: 43
class TestReductionOps(serial.SerializedTestCase):
@serial.given(n=st.integers(5, 8), **hu.gcs)
def test_elementwise_sum(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def sum_op(X):
return [np.sum(X)]
Reported by Pylint.
Line: 46
Column: 47
@given(n=st.integers(5, 8), **hu.gcs)
@settings(deadline=10000)
def test_elementwise_int_sum(self, n, gc, dc):
X = np.random.rand(n).astype(np.int32)
def sum_op(X):
return [np.sum(X)]
Reported by Pylint.
Line: 97
Column: 43
)
@given(n=st.integers(5, 8), **hu.gcs)
def test_elementwise_avg(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def avg_op(X):
return [np.mean(X)]
Reported by Pylint.
Line: 129
Column: 54
m=st.integers(1, 3),
n=st.integers(1, 4),
**hu.gcs)
def test_rowwise_max(self, batch_size, m, n, gc, dc):
X = np.random.rand(batch_size, m, n).astype(np.float32)
def rowwise_max(X):
return [np.max(X, axis=2)]
Reported by Pylint.
Line: 152
Column: 57
m=st.integers(1, 3),
n=st.integers(1, 4),
**hu.gcs)
def test_columnwise_max(self, batch_size, m, n, gc, dc):
X = np.random.rand(batch_size, m, n).astype(np.float32)
def columnwise_max(X):
return [np.max(X, axis=1)]
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
from hypothesis import assume, given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
Reported by Pylint.
Line: 14
Column: 1
import numpy as np
class TestReductionOps(serial.SerializedTestCase):
@serial.given(n=st.integers(5, 8), **hu.gcs)
def test_elementwise_sum(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
Reported by Pylint.
Line: 17
Column: 5
class TestReductionOps(serial.SerializedTestCase):
@serial.given(n=st.integers(5, 8), **hu.gcs)
def test_elementwise_sum(self, n, gc, dc):
X = np.random.rand(n).astype(np.float32)
def sum_op(X):
return [np.sum(X)]
Reported by Pylint.
test/test_quantization.py
55 issues
Line: 3
Column: 1
# -*- coding: utf-8 -*-
from torch.testing._internal.common_utils import run_tests
# Quantization core tests. These include tests for
# - quantized kernels
# - quantized functional operators
# - quantized workflow modules
# - quantized workflow operators
Reported by Pylint.
Line: 13
Column: 3
# - quantized tensor
# 1. Quantized Kernels
# TODO: merge the different quantized op tests into one test class
from quantization.core.test_quantized_op import TestQuantizedOps # noqa: F401
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
Reported by Pylint.
Line: 14
Column: 1
# 1. Quantized Kernels
# TODO: merge the different quantized op tests into one test class
from quantization.core.test_quantized_op import TestQuantizedOps # noqa: F401
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
Reported by Pylint.
Line: 15
Column: 1
# 1. Quantized Kernels
# TODO: merge the different quantized op tests into one test class
from quantization.core.test_quantized_op import TestQuantizedOps # noqa: F401
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
Reported by Pylint.
Line: 16
Column: 1
# TODO: merge the different quantized op tests into one test class
from quantization.core.test_quantized_op import TestQuantizedOps # noqa: F401
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
Reported by Pylint.
Line: 17
Column: 1
from quantization.core.test_quantized_op import TestQuantizedOps # noqa: F401
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedRNNOp # noqa: F401
Reported by Pylint.
Line: 18
Column: 1
from quantization.core.test_quantized_op import TestQNNPackOps # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedRNNOp # noqa: F401
# 2. Quantized Functional/Workflow Ops
Reported by Pylint.
Line: 19
Column: 1
from quantization.core.test_quantized_op import TestQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedRNNOp # noqa: F401
# 2. Quantized Functional/Workflow Ops
from quantization.core.test_quantized_functional import TestQuantizedFunctionalOps # noqa: F401
Reported by Pylint.
Line: 20
Column: 1
from quantization.core.test_quantized_op import TestQuantizedConv # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedRNNOp # noqa: F401
# 2. Quantized Functional/Workflow Ops
from quantization.core.test_quantized_functional import TestQuantizedFunctionalOps # noqa: F401
from quantization.core.test_workflow_ops import TestFakeQuantizeOps # noqa: F401
Reported by Pylint.
Line: 21
Column: 1
from quantization.core.test_quantized_op import TestDynamicQuantizedLinear # noqa: F401
from quantization.core.test_quantized_op import TestComparatorOps # noqa: F401
from quantization.core.test_quantized_op import TestPadding # noqa: F401
from quantization.core.test_quantized_op import TestQuantizedEmbeddingOps # noqa: F401
from quantization.core.test_quantized_op import TestDynamicQuantizedRNNOp # noqa: F401
# 2. Quantized Functional/Workflow Ops
from quantization.core.test_quantized_functional import TestQuantizedFunctionalOps # noqa: F401
from quantization.core.test_workflow_ops import TestFakeQuantizeOps # noqa: F401
from quantization.core.test_workflow_ops import TestFusedObsFakeQuant # noqa: F401
Reported by Pylint.
caffe2/python/operator_test/piecewise_linear_transform_test.py
55 issues
Line: 10
Column: 1
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
Reported by Pylint.
Line: 11
Column: 1
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
Reported by Pylint.
Line: 52
Column: 1
intercepts=intercepts.flatten().tolist(),
)
def piecewise(x, *args, **kw):
x_0 = self.transform(
x[:, 0], bounds[0, :], slopes[0, :], intercepts[0, :])
x_1 = self.transform(
x[:, 1], bounds[1, :], slopes[1, :], intercepts[1, :])
Reported by Pylint.
Line: 52
Column: 1
intercepts=intercepts.flatten().tolist(),
)
def piecewise(x, *args, **kw):
x_0 = self.transform(
x[:, 0], bounds[0, :], slopes[0, :], intercepts[0, :])
x_1 = self.transform(
x[:, 1], bounds[1, :], slopes[1, :], intercepts[1, :])
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
Reported by Pylint.
Line: 13
Column: 1
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
import unittest
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
def constrain(self, v, min_val, max_val):
def constrain_internal(x):
Reported by Pylint.
Line: 16
Column: 1
import unittest
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
def constrain(self, v, min_val, max_val):
def constrain_internal(x):
return min(max(x, min_val), max_val)
return np.array([constrain_internal(x) for x in v])
Reported by Pylint.
Line: 17
Column: 5
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
def constrain(self, v, min_val, max_val):
def constrain_internal(x):
return min(max(x, min_val), max_val)
return np.array([constrain_internal(x) for x in v])
def transform(self, x, bounds, slopes, intercepts):
Reported by Pylint.
Line: 17
Column: 5
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
def constrain(self, v, min_val, max_val):
def constrain_internal(x):
return min(max(x, min_val), max_val)
return np.array([constrain_internal(x) for x in v])
def transform(self, x, bounds, slopes, intercepts):
Reported by Pylint.
Line: 17
Column: 5
class TestPiecewiseLinearTransform(serial.SerializedTestCase):
def constrain(self, v, min_val, max_val):
def constrain_internal(x):
return min(max(x, min_val), max_val)
return np.array([constrain_internal(x) for x in v])
def transform(self, x, bounds, slopes, intercepts):
Reported by Pylint.
torch/ao/nn/sparse/quantized/linear.py
55 issues
Line: 11
Column: 66
class LinearPackedParams(torch.nn.Module):
_version = 1
def __init__(self, row_block_size=1, col_block_size=4, dtype=torch.qint8):
super().__init__()
self.prepack_op = torch.ops.sparse.qlinear_prepack
self.unpack_op = torch.ops.sparse.qlinear_unpack
if dtype != torch.qint8:
Reported by Pylint.
Line: 16
Column: 21
self.prepack_op = torch.ops.sparse.qlinear_prepack
self.unpack_op = torch.ops.sparse.qlinear_unpack
if dtype != torch.qint8:
raise NotImplementedError("Linear prepacking only supports QINT8")
self.dtype = dtype
wq = torch._empty_affine_quantized([1, 1], scale=1.0, zero_point=0, dtype=torch.qint8)
self.set_weight_bias(wq, None, row_block_size, col_block_size)
# Hack to make torch.jit.script/torch.jit.load work
Reported by Pylint.
Line: 19
Column: 14
if dtype != torch.qint8:
raise NotImplementedError("Linear prepacking only supports QINT8")
self.dtype = dtype
wq = torch._empty_affine_quantized([1, 1], scale=1.0, zero_point=0, dtype=torch.qint8)
self.set_weight_bias(wq, None, row_block_size, col_block_size)
# Hack to make torch.jit.script/torch.jit.load work
# Once we have self.unpack_op working we wont need this.
self.__annotations__['bias'] = Optional[torch.Tensor]
Reported by Pylint.
Line: 19
Column: 83
if dtype != torch.qint8:
raise NotImplementedError("Linear prepacking only supports QINT8")
self.dtype = dtype
wq = torch._empty_affine_quantized([1, 1], scale=1.0, zero_point=0, dtype=torch.qint8)
self.set_weight_bias(wq, None, row_block_size, col_block_size)
# Hack to make torch.jit.script/torch.jit.load work
# Once we have self.unpack_op working we wont need this.
self.__annotations__['bias'] = Optional[torch.Tensor]
Reported by Pylint.
Line: 87
Column: 100
_version = 1
_FLOAT_MODULE = torch.nn.Linear
def __init__(self, in_features, out_features, row_block_size, col_block_size, bias=True, dtype=torch.qint8):
super().__init__()
if dtype != torch.qint8:
raise NotImplementedError("Only QINT8 is supported for Sparse Quantized Linear")
Reported by Pylint.
Line: 90
Column: 21
def __init__(self, in_features, out_features, row_block_size, col_block_size, bias=True, dtype=torch.qint8):
super().__init__()
if dtype != torch.qint8:
raise NotImplementedError("Only QINT8 is supported for Sparse Quantized Linear")
self.in_features = in_features
self.out_features = out_features
Reported by Pylint.
Line: 97
Column: 57
self.out_features = out_features
if bias:
bias = torch.zeros(self.out_features, dtype=torch.float)
else:
bias = None
qweight = torch._empty_affine_quantized([out_features, in_features],
scale=1, zero_point=0, dtype=torch.qint8)
Reported by Pylint.
Line: 97
Column: 20
self.out_features = out_features
if bias:
bias = torch.zeros(self.out_features, dtype=torch.float)
else:
bias = None
qweight = torch._empty_affine_quantized([out_features, in_features],
scale=1, zero_point=0, dtype=torch.qint8)
Reported by Pylint.
Line: 101
Column: 19
else:
bias = None
qweight = torch._empty_affine_quantized([out_features, in_features],
scale=1, zero_point=0, dtype=torch.qint8)
self._packed_params = LinearPackedParams(dtype)
self._packed_params.set_weight_bias(qweight, bias, row_block_size, col_block_size)
self.scale = 1.0
self.zero_point = 0
Reported by Pylint.
Line: 102
Column: 78
bias = None
qweight = torch._empty_affine_quantized([out_features, in_features],
scale=1, zero_point=0, dtype=torch.qint8)
self._packed_params = LinearPackedParams(dtype)
self._packed_params.set_weight_bias(qweight, bias, row_block_size, col_block_size)
self.scale = 1.0
self.zero_point = 0
Reported by Pylint.
caffe2/python/operator_test/resize_op_test.py
55 issues
Line: 6
Column: 1
import numpy as np
import hypothesis.strategies as st
import unittest
import caffe2.python.hypothesis_test_util as hu
from caffe2.python import core
from caffe2.proto import caffe2_pb2
from hypothesis import assume, given, settings
Reported by Pylint.
Line: 11
Column: 1
import caffe2.python.hypothesis_test_util as hu
from caffe2.python import core
from caffe2.proto import caffe2_pb2
from hypothesis import assume, given, settings
class TestResize(hu.HypothesisTestCase):
@given(height_scale=st.floats(0.25, 4.0) | st.just(2.0),
width_scale=st.floats(0.25, 4.0) | st.just(2.0),
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
import hypothesis.strategies as st
import unittest
import caffe2.python.hypothesis_test_util as hu
from caffe2.python import core
Reported by Pylint.
Line: 7
Column: 1
import numpy as np
import hypothesis.strategies as st
import unittest
import caffe2.python.hypothesis_test_util as hu
from caffe2.python import core
from caffe2.proto import caffe2_pb2
from hypothesis import assume, given, settings
Reported by Pylint.
Line: 14
Column: 1
from hypothesis import assume, given, settings
class TestResize(hu.HypothesisTestCase):
@given(height_scale=st.floats(0.25, 4.0) | st.just(2.0),
width_scale=st.floats(0.25, 4.0) | st.just(2.0),
height=st.integers(4, 32),
width=st.integers(4, 32),
num_channels=st.integers(1, 4),
Reported by Pylint.
Line: 25
Column: 5
order=st.sampled_from(["NCHW", "NHWC"]),
**hu.gcs)
@settings(max_examples=10, deadline=None)
def test_nearest(self, height_scale, width_scale, height, width,
num_channels, batch_size, seed, order,
gc, dc):
assume(order == "NCHW" or gc.device_type == caffe2_pb2.CPU)
# NHWC currently only supported for CPU. Ignore other devices.
Reported by Pylint.
Line: 25
Column: 5
order=st.sampled_from(["NCHW", "NHWC"]),
**hu.gcs)
@settings(max_examples=10, deadline=None)
def test_nearest(self, height_scale, width_scale, height, width,
num_channels, batch_size, seed, order,
gc, dc):
assume(order == "NCHW" or gc.device_type == caffe2_pb2.CPU)
# NHWC currently only supported for CPU. Ignore other devices.
Reported by Pylint.
Line: 25
Column: 5
order=st.sampled_from(["NCHW", "NHWC"]),
**hu.gcs)
@settings(max_examples=10, deadline=None)
def test_nearest(self, height_scale, width_scale, height, width,
num_channels, batch_size, seed, order,
gc, dc):
assume(order == "NCHW" or gc.device_type == caffe2_pb2.CPU)
# NHWC currently only supported for CPU. Ignore other devices.
Reported by Pylint.
Line: 25
Column: 5
order=st.sampled_from(["NCHW", "NHWC"]),
**hu.gcs)
@settings(max_examples=10, deadline=None)
def test_nearest(self, height_scale, width_scale, height, width,
num_channels, batch_size, seed, order,
gc, dc):
assume(order == "NCHW" or gc.device_type == caffe2_pb2.CPU)
# NHWC currently only supported for CPU. Ignore other devices.
Reported by Pylint.
Line: 35
Column: 9
dc = [d for d in dc if d.device_type == caffe2_pb2.CPU]
np.random.seed(seed)
op = core.CreateOperator(
"ResizeNearest",
["X"],
["Y"],
width_scale=width_scale,
height_scale=height_scale,
Reported by Pylint.
caffe2/python/operator_test/dataset_ops_test.py
55 issues
Line: 5
Column: 1
import operator
import string
import hypothesis.strategies as st
import numpy as np
import numpy.testing as npt
from caffe2.python import core, dataset, workspace
from caffe2.python.dataset import Const
from caffe2.python.schema import (
Reported by Pylint.
Line: 22
Column: 1
from_blob_list,
)
from caffe2.python.test_util import TestCase
from hypothesis import given
def _assert_arrays_equal(actual, ref, err_msg):
if ref.dtype.kind in ("S", "O", "U"):
np.testing.assert_array_equal(actual, ref, err_msg=err_msg)
Reported by Pylint.
Line: 46
Column: 1
@st.composite
def _sparse_features_map(draw, num_records, **kwargs):
sparse_maps_lengths = draw(
st.lists(
st.integers(min_value=1, max_value=10),
min_size=num_records,
max_size=num_records,
Reported by Pylint.
Line: 94
Column: 1
@st.composite
def _dense_features_map(draw, num_records, **kwargs):
float_lengths = draw(
st.lists(
st.integers(min_value=1, max_value=10),
min_size=num_records,
max_size=num_records,
Reported by Pylint.
Line: 122
Column: 1
@st.composite
def _dataset(draw, min_elements=3, max_elements=10, **kwargs):
schema = Struct(
# Dense Features Map
("floats", Map(Scalar(np.int32), Scalar(np.float32))),
# Sparse Features Map
(
Reported by Pylint.
Line: 168
Column: 32
class TestDatasetOps(TestCase):
@given(_dataset())
def test_pack_unpack(self, input):
"""
Tests if packing and unpacking of the whole dataset is an identity.
"""
(schema, contents, num_records) = input
Reported by Pylint.
Line: 172
Column: 28
"""
Tests if packing and unpacking of the whole dataset is an identity.
"""
(schema, contents, num_records) = input
dataset_fields = schema.field_names()
for pack_to_single_shared_ptr in (True, False):
net = core.Net("pack_unpack_net")
Reported by Pylint.
Line: 242
Column: 9
),
),
)
"""
This is what the flattened fields for this schema look like, along
with its type. Each one of these fields will be stored, read and
written as a tensor.
"""
expected_fields = [
Reported by Pylint.
Line: 267
Column: 13
]
zipped = zip(expected_fields, schema.field_names(), schema.field_types())
for (ref_name, ref_type), name, dtype in zipped:
self.assertEquals(ref_name, name)
self.assertEquals(np.dtype(ref_type), dtype)
"""
2. The contents of our dataset.
Contents as defined below could represent, for example, a log of
Reported by Pylint.
Line: 268
Column: 13
zipped = zip(expected_fields, schema.field_names(), schema.field_types())
for (ref_name, ref_type), name, dtype in zipped:
self.assertEquals(ref_name, name)
self.assertEquals(np.dtype(ref_type), dtype)
"""
2. The contents of our dataset.
Contents as defined below could represent, for example, a log of
search queries along with dense, sparse features and metadata.
Reported by Pylint.
caffe2/contrib/gloo/gloo_test.py
55 issues
Line: 8
Column: 1
from hypothesis import given, settings
import hypothesis.strategies as st
from multiprocessing import Process, Queue
import numpy as np
import os
Reported by Pylint.
Line: 9
Column: 1
from hypothesis import given, settings
import hypothesis.strategies as st
from multiprocessing import Process, Queue
import numpy as np
import os
import pickle
Reported by Pylint.
Line: 20
Column: 1
from caffe2.python import core, workspace, dyndep
import caffe2.python.hypothesis_test_util as hu
from gloo.python import IoError
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:redis_store_handler_ops")
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:store_ops")
dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
Reported by Pylint.
Line: 32
Column: 9
class TemporaryDirectory:
def __enter__(self):
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir
def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)
Reported by Pylint.
Line: 35
Column: 24
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir
def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)
class TestCase(hu.HypothesisTestCase):
test_counter = 0
Reported by Pylint.
Line: 54
Column: 20
fn(*args, **kwargs)
workspace.ResetWorkspace()
queue.put(True)
except Exception as ex:
queue.put(ex)
# Start N processes in the background
procs = []
for i in range(kwargs['comm_size']):
Reported by Pylint.
Line: 102
Column: 30
# If REDIS_HOST is set, use RedisStoreHandler for rendezvous.
if existing_cw is None:
redis_host = os.getenv("REDIS_HOST")
redis_port = int(os.getenv("REDIS_PORT", 6379))
if redis_host is not None:
workspace.RunOperatorOnce(
core.CreateOperator(
"RedisStoreHandlerCreate",
[],
Reported by Pylint.
Line: 159
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b301-pickle
"StoreGet",
[store_handler],
[blob]))
return pickle.loads(workspace.FetchBlob(blob))
def _test_broadcast(self,
comm_rank=None,
comm_size=None,
blob_size=None,
Reported by Bandit.
Line: 555
Column: 9
comm_size=None,
tmpdir=None,
):
store_handler, common_world = self.create_common_world(
comm_rank=comm_rank, comm_size=comm_size, tmpdir=tmpdir
)
net = core.Net("barrier")
net.Barrier(
Reported by Pylint.
Line: 604
Column: 9
# https://www.youtube.com/watch?v=QMFwFgG9NE8
closer = comm_rank == comm_size // 2,
store_handler, common_world = self.create_common_world(
comm_rank=comm_rank, comm_size=comm_size, tmpdir=tmpdir
)
net = core.Net("barrier_or_close")
if not closer:
Reported by Pylint.
caffe2/python/ideep/copy_op_test.py
55 issues
Line: 13
Column: 22
from caffe2.python import core, workspace
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class CopyTest(unittest.TestCase):
def _get_deep_device(self):
return caffe2_pb2.DeviceOption(device_type=caffe2_pb2.IDEEP)
def test_copy_to_ideep(self):
Reported by Pylint.
Line: 1
Column: 1
import unittest
import numpy as np
from random import randint
from caffe2.proto import caffe2_pb2
Reported by Pylint.
Line: 8
Column: 1
import unittest
import numpy as np
from random import randint
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
Reported by Pylint.
Line: 14
Column: 1
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class CopyTest(unittest.TestCase):
def _get_deep_device(self):
return caffe2_pb2.DeviceOption(device_type=caffe2_pb2.IDEEP)
def test_copy_to_ideep(self):
op = core.CreateOperator(
Reported by Pylint.
Line: 15
Column: 5
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class CopyTest(unittest.TestCase):
def _get_deep_device(self):
return caffe2_pb2.DeviceOption(device_type=caffe2_pb2.IDEEP)
def test_copy_to_ideep(self):
op = core.CreateOperator(
"CopyCPUToIDEEP",
Reported by Pylint.
Line: 18
Column: 5
def _get_deep_device(self):
return caffe2_pb2.DeviceOption(device_type=caffe2_pb2.IDEEP)
def test_copy_to_ideep(self):
op = core.CreateOperator(
"CopyCPUToIDEEP",
["X"],
["X_ideep"],
)
Reported by Pylint.
Line: 19
Column: 9
return caffe2_pb2.DeviceOption(device_type=caffe2_pb2.IDEEP)
def test_copy_to_ideep(self):
op = core.CreateOperator(
"CopyCPUToIDEEP",
["X"],
["X_ideep"],
)
op.device_option.CopyFrom(self._get_deep_device())
Reported by Pylint.
Line: 25
Column: 9
["X_ideep"],
)
op.device_option.CopyFrom(self._get_deep_device())
n = randint(1, 128)
c = randint(1, 64)
h = randint(1, 128)
w = randint(1, 128)
X = np.random.rand(n, c, h, w).astype(np.float32)
workspace.FeedBlob("X", X)
Reported by Pylint.
Line: 25
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
["X_ideep"],
)
op.device_option.CopyFrom(self._get_deep_device())
n = randint(1, 128)
c = randint(1, 64)
h = randint(1, 128)
w = randint(1, 128)
X = np.random.rand(n, c, h, w).astype(np.float32)
workspace.FeedBlob("X", X)
Reported by Bandit.
Line: 26
Column: 9
)
op.device_option.CopyFrom(self._get_deep_device())
n = randint(1, 128)
c = randint(1, 64)
h = randint(1, 128)
w = randint(1, 128)
X = np.random.rand(n, c, h, w).astype(np.float32)
workspace.FeedBlob("X", X)
workspace.RunOperatorOnce(op)
Reported by Pylint.
tools/codegen/api/autograd.py
55 issues
Line: 96
Column: 3
# Among these variants, we choose the one having the same name as the
# derivatives.yaml entry. If there is no exact match, then we choose the
# in-place variant.
# TODO: maybe the logic to search for all variants is no longer necessary?
func: NativeFunction
# The name of the generated autograd function.
# It's set only if we will calculate a derivative, i.e.
# 'args_with_derivatives' is not empty.
Reported by Pylint.
Line: 166
Column: 3
name: str
type: Type
# TODO: only to keep it byte-for-byte compatible with the old codegen, should remove.
cpp_type: str
# Represents a differentiable `Return`.
# How it it different from the `Return` type?
# - The name in `Return` is optional. Here it is always populated using the same
Reported by Pylint.
Line: 173
Column: 5
# How it it different from the `Return` type?
# - The name in `Return` is optional. Here it is always populated using the same
# `cpp.return_names()` method.
# TODO: some cpp naming logic (e.g. resolving name conflict) might be irrelevant?
# - It's processed Returns which are differentiable, in compliance with the
# `output_differentiability` field defined in derivatives.yaml (if specified),
# and are only used in the context of the autograd codegen;
@dataclass(frozen=True)
class DifferentiableOutput:
Reported by Pylint.
Line: 182
Column: 3
name: str
type: Type
# TODO: only to keep it byte-for-byte compatible with the old codegen, should remove.
cpp_type: str
@dataclass(frozen=True)
class NativeFunctionWithDifferentiabilityInfo:
func: NativeFunction
Reported by Pylint.
Line: 191
Column: 3
info: Optional[DifferentiabilityInfo]
fw_derivatives: Sequence[ForwardDerivative]
# TODO: Update comment below since it is out of date.
def dispatch_strategy(fn: NativeFunctionWithDifferentiabilityInfo) -> str:
"""How are we going to call the underlying implementation of a
declaration? There are two strategies:
- use_derived: we want to call the implementation on CPUDoubleType
(or a similar, derived Type instance). Because these derived
Reported by Pylint.
Line: 352
Column: 34
return result
def is_differentiable(name: str, type: Type, info: Optional[DifferentiabilityInfo]) -> bool:
return type.is_tensor_like() and (info is None or name not in info.non_differentiable_arg_names)
def gen_differentiable_outputs(fn: NativeFunctionWithDifferentiabilityInfo) -> List[DifferentiableOutput]:
f = fn.func
info = fn.info
Reported by Pylint.
Line: 1
Column: 1
from dataclasses import dataclass
import re
from typing import Optional, Sequence, List, Tuple, Match
from tools.codegen.api import cpp
from tools.codegen.api.types import Binding, NamedCType
from tools.codegen.model import NativeFunction, Type, SchemaKind
from tools.codegen.utils import IDENT_REGEX
Reported by Pylint.
Line: 14
Column: 1
# Note that it can be a derived property of an input argument, e.g.:
# we could save `other.scalar_type()` instead of the entire `other` tensor.
@dataclass(frozen=True)
class SavedAttribute:
# The NamedCType holds the updated name and cpp type of the attribute
# for the name, Suffix is appended if it's derived property, e.g.: `other_scalar_type`
nctype: NamedCType
# The expression to read the derived property at save time, e.g.:
Reported by Pylint.
Line: 26
Column: 1
# Represents a backward formula that calculates derivatives for one
# or more tensors.
@dataclass(frozen=True)
class Derivative:
# The formula string (legit C++ expression).
# Note that expressions against input arguments have been replaced with the
# corresponding saved attributes.
# E.g.:
# raw formula: `mul_tensor_backward(grad, self, other.scalar_type())`
Reported by Pylint.
Line: 50
Column: 1
# Represents a forward formula that calculates forward derivatives
# for one tensor.
@dataclass(frozen=True)
class ForwardDerivative:
# The formula string (legit C++ expression).
# Note that special keywords such as "linear" or "element_wise" have been
# replaced by the automatically generated formula.
formula: str
Reported by Pylint.
torch/distributed/elastic/rendezvous/etcd_rendezvous.py
54 issues
Line: 17
Column: 1
import time
from typing import Optional
import etcd # type: ignore[import]
from torch.distributed.elastic.rendezvous import (
RendezvousClosedError,
RendezvousError,
RendezvousHandler,
RendezvousParameters,
Reported by Pylint.
Line: 26
Column: 1
RendezvousTimeoutError,
)
from .utils import parse_rendezvous_endpoint
from .etcd_store import EtcdStore, cas_delay
_log_fmt = logging.Formatter("%(levelname)s %(asctime)s %(message)s")
_log_handler = logging.StreamHandler(sys.stderr)
Reported by Pylint.
Line: 27
Column: 1
)
from .utils import parse_rendezvous_endpoint
from .etcd_store import EtcdStore, cas_delay
_log_fmt = logging.Formatter("%(levelname)s %(asctime)s %(message)s")
_log_handler = logging.StreamHandler(sys.stderr)
_log_handler.setFormatter(_log_fmt)
Reported by Pylint.
Line: 145
Column: 3
self._rdzv_impl = rdzv_impl
def __del__(self):
# TODO: look into using weakref here instead.
del self._rdzv_impl
def get_backend(self) -> str:
return "etcd"
Reported by Pylint.
Line: 180
Column: 16
return 0
def get_run_id(self) -> str:
return self._rdzv_impl._run_id
def shutdown(self) -> bool:
try:
self.set_closed()
return True
Reported by Pylint.
Line: 186
Column: 16
try:
self.set_closed()
return True
except BaseException as e:
log.warning(f"Shutdown failed. Error occurred: {str(e)}")
return False
# TODO: we should probably handle a few additional errors,
Reported by Pylint.
Line: 187
Column: 13
self.set_closed()
return True
except BaseException as e:
log.warning(f"Shutdown failed. Error occurred: {str(e)}")
return False
# TODO: we should probably handle a few additional errors,
# like EtcdLeaderElectionInProgress and EtcdWatcherCleared. These are
Reported by Pylint.
Line: 191
Column: 3
return False
# TODO: we should probably handle a few additional errors,
# like EtcdLeaderElectionInProgress and EtcdWatcherCleared. These are
# only relevant for multi-node Etcd ensemble. A simple retry would work,
# but is verbose to add everywhere. Consider wrapping the client calls
# into auto-retry for these errors?
#
Reported by Pylint.
Line: 214
Column: 9
last_call_timeout,
):
self.client = client
log.info("Etcd machines: " + str(self.client.machines))
self._prefix = prefix
self._run_id = run_id
self._num_min_workers = num_min_workers
self._num_max_workers = num_max_workers
Reported by Pylint.
Line: 252
Column: 3
pass
def __del__(self):
# TODO: look into using weakref here instead.
if self._lease_run_id_stop is not None:
self._lease_run_id_stop.set()
if self._lease_this_rank_stop is not None:
self._lease_this_rank_stop.set()
Reported by Pylint.