The following issues were found
caffe2/python/mkl/mkl_speed_test.py
13 issues
Line: 12
Column: 22
from caffe2.python import core, workspace, test_util
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
Reported by Pylint.
Line: 1
Column: 1
import unittest
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, test_util
Reported by Pylint.
Line: 13
Column: 1
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
workspace.FeedBlob("X", X)
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
workspace.FeedBlob("X", X)
workspace.FeedBlob("X_mkl", X, device_option=mkl_do)
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
workspace.FeedBlob("X", X)
workspace.FeedBlob("X_mkl", X, device_option=mkl_do)
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
workspace.FeedBlob("X", X)
workspace.FeedBlob("X_mkl", X, device_option=mkl_do)
Reported by Pylint.
Line: 15
Column: 9
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testReLUSpeed(self):
X = np.random.randn(128, 4096).astype(np.float32)
mkl_do = core.DeviceOption(caffe2_pb2.MKLDNN)
# Makes sure that feed works.
workspace.FeedBlob("X", X)
workspace.FeedBlob("X_mkl", X, device_option=mkl_do)
net = core.Net("test")
Reported by Pylint.
Line: 46
Column: 5
print("Relu CPU runtime {}, MKL runtime {}.".format(runtime[1], runtime[2]))
def testConvSpeed(self):
# We randomly select a shape to test the speed. Intentionally we
# test a batch size of 1 since this may be the most frequent use
# case for MKL during deployment time.
X = np.random.rand(1, 256, 27, 27).astype(np.float32) - 0.5
W = np.random.rand(192, 256, 3, 3).astype(np.float32) - 0.5
Reported by Pylint.
Line: 46
Column: 5
print("Relu CPU runtime {}, MKL runtime {}.".format(runtime[1], runtime[2]))
def testConvSpeed(self):
# We randomly select a shape to test the speed. Intentionally we
# test a batch size of 1 since this may be the most frequent use
# case for MKL during deployment time.
X = np.random.rand(1, 256, 27, 27).astype(np.float32) - 0.5
W = np.random.rand(192, 256, 3, 3).astype(np.float32) - 0.5
Reported by Pylint.
Line: 46
Column: 5
print("Relu CPU runtime {}, MKL runtime {}.".format(runtime[1], runtime[2]))
def testConvSpeed(self):
# We randomly select a shape to test the speed. Intentionally we
# test a batch size of 1 since this may be the most frequent use
# case for MKL during deployment time.
X = np.random.rand(1, 256, 27, 27).astype(np.float32) - 0.5
W = np.random.rand(192, 256, 3, 3).astype(np.float32) - 0.5
Reported by Pylint.
test/cpp/api/optim_baseline.py
13 issues
Line: 7
Column: 1
import math
import sys
import torch
import torch.optim
HEADER = """
#include <torch/types.h>
Reported by Pylint.
Line: 8
Column: 1
import sys
import torch
import torch.optim
HEADER = """
#include <torch/types.h>
Reported by Pylint.
Line: 67
Column: 5
optimizer = OPTIMIZERS[optimizer_name](model.parameters())
input = torch.tensor([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]], dtype=torch.float64)
values = []
for i in range(iterations):
optimizer.zero_grad()
Reported by Pylint.
Line: 28
Column: 1
"LBFGS_with_line_search" : lambda p: torch.optim.LBFGS(p, 1.0, line_search_fn="strong_wolfe"),
"Adam": lambda p: torch.optim.Adam(p, 1.0),
"Adam_with_weight_decay": lambda p: torch.optim.Adam(p, 1.0, weight_decay=1e-2),
"Adam_with_weight_decay_and_amsgrad": lambda p: torch.optim.Adam(p, 1.0, weight_decay=1e-6, amsgrad=True),
"AdamW": lambda p: torch.optim.AdamW(p, 1.0),
"AdamW_without_weight_decay": lambda p: torch.optim.AdamW(p, 1.0, weight_decay=0),
"AdamW_with_amsgrad": lambda p: torch.optim.AdamW(p, 1.0, amsgrad=True),
"Adagrad": lambda p: torch.optim.Adagrad(p, 1.0),
"Adagrad_with_weight_decay": lambda p: torch.optim.Adagrad(p, 1.0, weight_decay=1e-2),
Reported by Pylint.
Line: 34
Column: 1
"AdamW_with_amsgrad": lambda p: torch.optim.AdamW(p, 1.0, amsgrad=True),
"Adagrad": lambda p: torch.optim.Adagrad(p, 1.0),
"Adagrad_with_weight_decay": lambda p: torch.optim.Adagrad(p, 1.0, weight_decay=1e-2),
"Adagrad_with_weight_decay_and_lr_decay": lambda p: torch.optim.Adagrad(p, 1.0, weight_decay=1e-6, lr_decay=1e-3),
"RMSprop": lambda p: torch.optim.RMSprop(p, 0.1),
"RMSprop_with_weight_decay": lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-2),
"RMSprop_with_weight_decay_and_centered": lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-6, centered=True),
"RMSprop_with_weight_decay_and_centered_and_momentum":
lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-6, centered=True, momentum=0.9),
Reported by Pylint.
Line: 37
Column: 1
"Adagrad_with_weight_decay_and_lr_decay": lambda p: torch.optim.Adagrad(p, 1.0, weight_decay=1e-6, lr_decay=1e-3),
"RMSprop": lambda p: torch.optim.RMSprop(p, 0.1),
"RMSprop_with_weight_decay": lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-2),
"RMSprop_with_weight_decay_and_centered": lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-6, centered=True),
"RMSprop_with_weight_decay_and_centered_and_momentum":
lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-6, centered=True, momentum=0.9),
"SGD": lambda p: torch.optim.SGD(p, 0.1),
"SGD_with_weight_decay": lambda p: torch.optim.SGD(p, 0.1, weight_decay=1e-2),
"SGD_with_weight_decay_and_momentum": lambda p: torch.optim.SGD(p, 0.1, momentum=0.9, weight_decay=1e-2),
Reported by Pylint.
Line: 42
Column: 1
lambda p: torch.optim.RMSprop(p, 0.1, weight_decay=1e-6, centered=True, momentum=0.9),
"SGD": lambda p: torch.optim.SGD(p, 0.1),
"SGD_with_weight_decay": lambda p: torch.optim.SGD(p, 0.1, weight_decay=1e-2),
"SGD_with_weight_decay_and_momentum": lambda p: torch.optim.SGD(p, 0.1, momentum=0.9, weight_decay=1e-2),
"SGD_with_weight_decay_and_nesterov_momentum":
lambda p: torch.optim.SGD(p, 0.1, momentum=0.9, weight_decay=1e-6, nesterov=True),
}
Reported by Pylint.
Line: 48
Column: 1
}
def weight_init(module):
if isinstance(module, torch.nn.Linear):
stdev = 1.0 / math.sqrt(module.weight.size(1))
for p in module.parameters():
p.data.uniform_(-stdev, stdev)
Reported by Pylint.
Line: 51
Column: 13
def weight_init(module):
if isinstance(module, torch.nn.Linear):
stdev = 1.0 / math.sqrt(module.weight.size(1))
for p in module.parameters():
p.data.uniform_(-stdev, stdev)
def run(optimizer_name, iterations, sample_every):
torch.manual_seed(0)
Reported by Pylint.
Line: 55
Column: 1
p.data.uniform_(-stdev, stdev)
def run(optimizer_name, iterations, sample_every):
torch.manual_seed(0)
model = torch.nn.Sequential(
torch.nn.Linear(2, 3),
torch.nn.Sigmoid(),
torch.nn.Linear(3, 1),
Reported by Pylint.
caffe2/python/trt/transform.py
13 issues
Line: 31
Column: 9
def check_gpu_():
try:
C.get_cuda_version()
except Exception as _:
raise Exception("TensorRT related functions require CUDA support")
def convert_onnx_model_to_trt_op(onnx_model,
max_batch_size=64,
Reported by Pylint.
Line: 44
Column: 15
Convert the whole ONNX model to a TensorRT C2 op
"""
check_gpu_()
trt_str = C.onnx_to_trt_op(onnx_model.SerializeToString(),
_get_output_shapes(onnx_model.graph.output),
max_batch_size,
max_workspace_size,
verbosity,
debug_builder)
Reported by Pylint.
Line: 99
Column: 20
for k,v in input_shapes.items():
shape_hints[k] = v
pred_net_str = C.transform_trt(pred_net.SerializeToString(),
shape_hints,
max_batch_size,
max_workspace_size,
verbosity,
debug_builder,
Reported by Pylint.
Line: 33
Column: 8
try:
C.get_cuda_version()
except Exception as _:
raise Exception("TensorRT related functions require CUDA support")
def convert_onnx_model_to_trt_op(onnx_model,
max_batch_size=64,
max_workspace_size=2*1024*1024,
verbosity=1,
Reported by Pylint.
Line: 56
Column: 29
# Assume the workspace is already filled with init weights
def _infer_shapes(pred_net, inputs):
workspace.RunNetOnce(pred_net)
hints = {}
for op in pred_net.op:
for o in op.output:
if o not in hints:
Reported by Pylint.
Line: 29
Column: 1
return dict(zip(names, shapes))
def check_gpu_():
try:
C.get_cuda_version()
except Exception as _:
raise Exception("TensorRT related functions require CUDA support")
Reported by Pylint.
Line: 33
Column: 1
try:
C.get_cuda_version()
except Exception as _:
raise Exception("TensorRT related functions require CUDA support")
def convert_onnx_model_to_trt_op(onnx_model,
max_batch_size=64,
max_workspace_size=2*1024*1024,
verbosity=1,
Reported by Pylint.
Line: 50
Column: 5
max_workspace_size,
verbosity,
debug_builder)
op = caffe2_pb2.OperatorDef()
op.ParseFromString(trt_str)
return op
# Assume the workspace is already filled with init weights
Reported by Pylint.
Line: 59
Column: 9
def _infer_shapes(pred_net, inputs):
workspace.RunNetOnce(pred_net)
hints = {}
for op in pred_net.op:
for o in op.output:
if o not in hints:
blob = workspace.FetchBlob(o)
if hasattr(blob, 'shape'):
hints[o] = blob.shape
Reported by Pylint.
Line: 60
Column: 13
workspace.RunNetOnce(pred_net)
hints = {}
for op in pred_net.op:
for o in op.output:
if o not in hints:
blob = workspace.FetchBlob(o)
if hasattr(blob, 'shape'):
hints[o] = blob.shape
for i in op.input:
Reported by Pylint.
test/distributed/elastic/timer/local_timer_example.py
13 issues
Line: 14
Column: 1
import time
import unittest
import torch.distributed.elastic.timer as timer
import torch.multiprocessing as torch_mp
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
run_tests,
Reported by Pylint.
Line: 15
Column: 1
import unittest
import torch.distributed.elastic.timer as timer
import torch.multiprocessing as torch_mp
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
run_tests,
IS_WINDOWS,
Reported by Pylint.
Line: 16
Column: 1
import torch.distributed.elastic.timer as timer
import torch.multiprocessing as torch_mp
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
run_tests,
IS_WINDOWS,
IS_MACOS,
Reported by Pylint.
Line: 31
Column: 21
)
def _happy_function(rank, mp_queue):
timer.configure(timer.LocalTimerClient(mp_queue))
with timer.expires(after=1):
time.sleep(0.5)
Reported by Pylint.
Line: 37
Column: 21
time.sleep(0.5)
def _stuck_function(rank, mp_queue):
timer.configure(timer.LocalTimerClient(mp_queue))
with timer.expires(after=1):
time.sleep(5)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import logging
import multiprocessing as mp
Reported by Pylint.
Line: 59
Column: 9
"""
@sandcastle_skip_if(TEST_WITH_DEV_DBG_ASAN or TEST_WITH_TSAN, "test is a/tsan incompatible")
def test_torch_mp_example(self):
# in practice set the max_interval to a larger value (e.g. 60 seconds)
mp_queue = mp.get_context("spawn").Queue()
server = timer.LocalTimerServer(mp_queue, max_interval=0.01)
server.start()
Reported by Pylint.
Line: 84
Column: 9
server.stop()
@sandcastle_skip_if(TEST_WITH_DEV_DBG_ASAN or TEST_WITH_TSAN, "test is a/tsan incompatible")
def test_example_start_method_spawn(self):
self._run_example_with(start_method="spawn")
# @sandcastle_skip_if(TEST_WITH_DEV_DBG_ASAN or TEST_WITH_TSAN, "test is a/tsan incompatible")
# def test_example_start_method_forkserver(self):
# self._run_example_with(start_method="forkserver")
Reported by Pylint.
Line: 87
Column: 1
def test_example_start_method_spawn(self):
self._run_example_with(start_method="spawn")
# @sandcastle_skip_if(TEST_WITH_DEV_DBG_ASAN or TEST_WITH_TSAN, "test is a/tsan incompatible")
# def test_example_start_method_forkserver(self):
# self._run_example_with(start_method="forkserver")
@sandcastle_skip_if(TEST_WITH_TSAN, "test is tsan incompatible")
def test_example_start_method_fork(self):
Reported by Pylint.
Line: 92
Column: 9
# self._run_example_with(start_method="forkserver")
@sandcastle_skip_if(TEST_WITH_TSAN, "test is tsan incompatible")
def test_example_start_method_fork(self):
self._run_example_with(start_method="fork")
def _run_example_with(self, start_method):
spawn_ctx = mp.get_context(start_method)
mp_queue = spawn_ctx.Queue()
Reported by Pylint.
caffe2/python/operator_test/scale_op_test.py
13 issues
Line: 10
Column: 1
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
class TestScaleOps(serial.SerializedTestCase):
@serial.given(dim=st.sampled_from([[1, 386, 1], [386, 1, 1],
Reported by Pylint.
Line: 22
Column: 59
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
Line: 26
Column: 9
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
for i in range(num_tensors):
tensor = "X_{}".format(i)
X = np.random.rand(*dim).astype(np.float32) - 0.5
in_tensors.append(tensor)
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
Reported by Pylint.
Line: 14
Column: 1
import numpy as np
class TestScaleOps(serial.SerializedTestCase):
@serial.given(dim=st.sampled_from([[1, 386, 1], [386, 1, 1],
[1, 256, 1], [256, 1, 1],
[1024, 256, 1], [1, 1024, 1],
[1, 1, 1]]),
scale=st.floats(0.0, 10.0),
Reported by Pylint.
Line: 22
Column: 5
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
Line: 22
Column: 5
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
Line: 22
Column: 5
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
Line: 22
Column: 5
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
Line: 22
Column: 5
scale=st.floats(0.0, 10.0),
num_tensors=st.integers(1, 10),
**hu.gcs)
def test_scale_ops(self, dim, scale, num_tensors, gc, dc):
in_tensors = []
in_tensor_ps = []
out_tensors = []
out_ref_tensors = []
# initialize tensors
Reported by Pylint.
caffe2/python/operator_test/given_tensor_fill_op_test.py
13 issues
Line: 7
Column: 1
from caffe2.python import core
from hypothesis import given
import hypothesis.strategies as st
import caffe2.python.hypothesis_test_util as hu
import numpy as np
import unittest
Reported by Pylint.
Line: 8
Column: 1
from caffe2.python import core
from hypothesis import given
import hypothesis.strategies as st
import caffe2.python.hypothesis_test_util as hu
import numpy as np
import unittest
Reported by Pylint.
Line: 39
Column: 1
values=X.reshape((1, X.size)),
)
def constant_fill(*args, **kw):
return [X]
self.assertReferenceChecks(gc, op, [], constant_fill)
self.assertDeviceChecks(dc, op, [], [0])
Reported by Pylint.
Line: 39
Column: 1
values=X.reshape((1, X.size)),
)
def constant_fill(*args, **kw):
return [X]
self.assertReferenceChecks(gc, op, [], constant_fill)
self.assertDeviceChecks(dc, op, [], [0])
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
from hypothesis import given
import hypothesis.strategies as st
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 12
Column: 1
import caffe2.python.hypothesis_test_util as hu
import numpy as np
import unittest
class TestGivenTensorFillOps(hu.HypothesisTestCase):
@given(X=hu.tensor(min_dim=1, max_dim=4, dtype=np.int32),
t=st.sampled_from([
Reported by Pylint.
Line: 15
Column: 1
import unittest
class TestGivenTensorFillOps(hu.HypothesisTestCase):
@given(X=hu.tensor(min_dim=1, max_dim=4, dtype=np.int32),
t=st.sampled_from([
(core.DataType.BOOL, np.bool_, "GivenTensorFill"),
(core.DataType.INT32, np.int32, "GivenTensorFill"),
(core.DataType.FLOAT, np.float32, "GivenTensorFill"),
Reported by Pylint.
Line: 29
Column: 5
(core.DataType.INT32, np.double, "GivenTensorDoubleFill"),
]),
**hu.gcs)
def test_given_tensor_fill(self, X, t, gc, dc):
X = X.astype(t[1])
print('X: ', str(X))
op = core.CreateOperator(
t[2], [], ["Y"],
shape=X.shape,
Reported by Pylint.
Line: 29
Column: 5
(core.DataType.INT32, np.double, "GivenTensorDoubleFill"),
]),
**hu.gcs)
def test_given_tensor_fill(self, X, t, gc, dc):
X = X.astype(t[1])
print('X: ', str(X))
op = core.CreateOperator(
t[2], [], ["Y"],
shape=X.shape,
Reported by Pylint.
Line: 29
Column: 5
(core.DataType.INT32, np.double, "GivenTensorDoubleFill"),
]),
**hu.gcs)
def test_given_tensor_fill(self, X, t, gc, dc):
X = X.astype(t[1])
print('X: ', str(X))
op = core.CreateOperator(
t[2], [], ["Y"],
shape=X.shape,
Reported by Pylint.
caffe2/python/operator_test/map_ops_test.py
13 issues
Line: 1
Column: 1
import itertools
import numpy as np
import tempfile
import unittest
Reported by Pylint.
Line: 8
Column: 1
import itertools
import numpy as np
import tempfile
import unittest
import os
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 9
Column: 1
import itertools
import numpy as np
import tempfile
import unittest
import os
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 10
Column: 1
import numpy as np
import tempfile
import unittest
import os
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 16
Column: 1
import caffe2.python.hypothesis_test_util as hu
class TestMap(hu.HypothesisTestCase):
def test_create_map(self):
dtypes = [core.DataType.INT32, core.DataType.INT64]
for key_dtype, value_dtype in itertools.product(dtypes, dtypes):
op = core.CreateOperator(
Reported by Pylint.
Line: 18
Column: 5
class TestMap(hu.HypothesisTestCase):
def test_create_map(self):
dtypes = [core.DataType.INT32, core.DataType.INT64]
for key_dtype, value_dtype in itertools.product(dtypes, dtypes):
op = core.CreateOperator(
'CreateMap',
[],
Reported by Pylint.
Line: 21
Column: 13
def test_create_map(self):
dtypes = [core.DataType.INT32, core.DataType.INT64]
for key_dtype, value_dtype in itertools.product(dtypes, dtypes):
op = core.CreateOperator(
'CreateMap',
[],
['map'],
key_dtype=key_dtype,
value_dtype=value_dtype,
Reported by Pylint.
Line: 31
Column: 5
workspace.RunOperatorOnce(op)
self.assertTrue(workspace.HasBlob('map'))
def test_map(self):
def test_map_func(KEY_T, VALUE_T):
model_file = os.path.join(tempfile.mkdtemp(), 'db')
key_data = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=KEY_T)
value_data = np.asarray([2, 3, 3, 3, 3, 2, 3, 3, 3, 3], dtype=VALUE_T)
Reported by Pylint.
Line: 31
Column: 5
workspace.RunOperatorOnce(op)
self.assertTrue(workspace.HasBlob('map'))
def test_map(self):
def test_map_func(KEY_T, VALUE_T):
model_file = os.path.join(tempfile.mkdtemp(), 'db')
key_data = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=KEY_T)
value_data = np.asarray([2, 3, 3, 3, 3, 2, 3, 3, 3, 3], dtype=VALUE_T)
Reported by Pylint.
Line: 33
Column: 9
def test_map(self):
def test_map_func(KEY_T, VALUE_T):
model_file = os.path.join(tempfile.mkdtemp(), 'db')
key_data = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=KEY_T)
value_data = np.asarray([2, 3, 3, 3, 3, 2, 3, 3, 3, 3], dtype=VALUE_T)
workspace.FeedBlob("key_data", key_data)
workspace.FeedBlob("value_data", value_data)
Reported by Pylint.
test/backward_compatibility/check_backward_compatibility.py
13 issues
Line: 7
Column: 1
import sys
from collections import defaultdict
import torch
from torch._C import parse_schema
# The date specifies how long the allowlist exclusion should apply to.
#
Reported by Pylint.
Line: 8
Column: 1
from collections import defaultdict
import torch
from torch._C import parse_schema
# The date specifies how long the allowlist exclusion should apply to.
#
# - If we NEVER give BC guarantee for an operator, you can put the
Reported by Pylint.
Line: 159
Column: 19
def check_bc(existing_schemas):
new_schemas = torch._C._jit_get_all_schemas()
new_schemas += torch._C._jit_get_custom_class_schemas()
new_schema_dict = defaultdict(list)
for s in new_schemas:
new_schema_dict[s.name].append(s)
Reported by Pylint.
Line: 159
Column: 19
def check_bc(existing_schemas):
new_schemas = torch._C._jit_get_all_schemas()
new_schemas += torch._C._jit_get_custom_class_schemas()
new_schema_dict = defaultdict(list)
for s in new_schemas:
new_schema_dict[s.name].append(s)
Reported by Pylint.
Line: 160
Column: 20
def check_bc(existing_schemas):
new_schemas = torch._C._jit_get_all_schemas()
new_schemas += torch._C._jit_get_custom_class_schemas()
new_schema_dict = defaultdict(list)
for s in new_schemas:
new_schema_dict[s.name].append(s)
is_bc = True
Reported by Pylint.
Line: 160
Column: 20
def check_bc(existing_schemas):
new_schemas = torch._C._jit_get_all_schemas()
new_schemas += torch._C._jit_get_custom_class_schemas()
new_schema_dict = defaultdict(list)
for s in new_schemas:
new_schema_dict[s.name].append(s)
is_bc = True
Reported by Pylint.
Line: 162
Column: 9
new_schemas = torch._C._jit_get_all_schemas()
new_schemas += torch._C._jit_get_custom_class_schemas()
new_schema_dict = defaultdict(list)
for s in new_schemas:
new_schema_dict[s.name].append(s)
is_bc = True
broken_ops = []
for existing_schema in existing_schemas:
Reported by Pylint.
Line: 186
Column: 3
"\n\t".join(str(s) for s in matching_new_schemas),
)
)
# TODO Print out more details about why candidates don't match.
broken_ops.append(str(existing_schema))
is_bc = False
if is_bc:
print("Found backward compatible schemas for all existing schemas")
else:
Reported by Pylint.
Line: 1
Column: 1
import argparse
import datetime
import re
import sys
from collections import defaultdict
import torch
from torch._C import parse_schema
Reported by Pylint.
Line: 129
Column: 1
) for item in ALLOW_LIST if item[1] >= datetime.date.today()
]
def allow_listed(schema):
for item in ALLOW_LIST_COMPILED:
if item[0].search(str(schema)):
if len(item) > 2 and item[2] is not None:
# if arguments regex is present, use it
return bool(item[2].search(str(schema)))
Reported by Pylint.
caffe2/python/operator_test/cosine_embedding_criterion_op_test.py
13 issues
Line: 6
Column: 1
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
Reported by Pylint.
Line: 1
Column: 1
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
Reported by Pylint.
Line: 14
Column: 1
import caffe2.python.serialized_test.serialized_test_util as serial
class TestCosineEmbeddingCriterion(serial.SerializedTestCase):
@serial.given(N=st.integers(min_value=10, max_value=20),
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
Reported by Pylint.
Line: 19
Column: 5
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
Reported by Pylint.
Line: 19
Column: 5
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
Reported by Pylint.
Line: 19
Column: 5
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
Reported by Pylint.
Line: 19
Column: 5
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
Reported by Pylint.
Line: 19
Column: 5
seed=st.integers(min_value=0, max_value=65535),
margin=st.floats(min_value=-0.5, max_value=0.5),
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
Reported by Pylint.
Line: 21
Column: 9
**hu.gcs)
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
margin=margin)
Reported by Pylint.
Line: 22
Column: 9
def test_cosine_embedding_criterion(self, N, seed, margin, gc, dc):
np.random.seed(seed)
S = np.random.randn(N).astype(np.float32)
Y = np.random.choice([-1, 1], size=N).astype(np.int32)
op = core.CreateOperator(
"CosineEmbeddingCriterion", ["S", "Y"], ["output"],
margin=margin)
def ref_cec(S, Y):
Reported by Pylint.
test/distributed/test_launcher.py
13 issues
Line: 5
Column: 1
import sys
from contextlib import closing
import torch.distributed as dist
import torch.distributed.launch as launch
from torch.distributed.elastic.utils import get_socket_with_port
if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr)
Reported by Pylint.
Line: 6
Column: 1
from contextlib import closing
import torch.distributed as dist
import torch.distributed.launch as launch
from torch.distributed.elastic.utils import get_socket_with_port
if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
Reported by Pylint.
Line: 7
Column: 1
import torch.distributed as dist
import torch.distributed.launch as launch
from torch.distributed.elastic.utils import get_socket_with_port
if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
Reported by Pylint.
Line: 13
Column: 1
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
TestCase,
run_tests,
)
Reported by Pylint.
Line: 36
Column: 9
def test_launch_user_script(self):
nnodes = 1
nproc_per_node = 4
world_size = nnodes * nproc_per_node
sock = get_socket_with_port()
with closing(sock):
master_port = sock.getsockname()[1]
args = [
f"--nnodes={nnodes}",
Reported by Pylint.
Line: 1
Column: 1
import os
import sys
from contextlib import closing
import torch.distributed as dist
import torch.distributed.launch as launch
from torch.distributed.elastic.utils import get_socket_with_port
if not dist.is_available():
Reported by Pylint.
Line: 13
Column: 1
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
from torch.testing._internal.common_utils import (
TEST_WITH_DEV_DBG_ASAN,
TEST_WITH_TSAN,
TestCase,
run_tests,
)
Reported by Pylint.
Line: 21
Column: 1
)
def path(script):
return os.path.join(os.path.dirname(__file__), script)
if TEST_WITH_DEV_DBG_ASAN:
print("Skip ASAN as torch + multiprocessing spawn have known issues", file=sys.stderr)
sys.exit(0)
Reported by Pylint.
Line: 29
Column: 1
sys.exit(0)
if TEST_WITH_TSAN:
print("Skip as TSAN is not fork-safe since we're forking in a multi-threaded environment", file=sys.stderr)
sys.exit(0)
class TestDistributedLaunch(TestCase):
def test_launch_user_script(self):
nnodes = 1
Reported by Pylint.
Line: 32
Column: 1
print("Skip as TSAN is not fork-safe since we're forking in a multi-threaded environment", file=sys.stderr)
sys.exit(0)
class TestDistributedLaunch(TestCase):
def test_launch_user_script(self):
nnodes = 1
nproc_per_node = 4
world_size = nnodes * nproc_per_node
sock = get_socket_with_port()
Reported by Pylint.