The following issues were found
caffe2/python/mkl/mkl_sbn_speed_test.py
12 issues
Line: 12
Column: 22
from caffe2.python import core, workspace, test_util
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
Reported by Pylint.
Line: 1
Column: 1
import unittest
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, test_util
Reported by Pylint.
Line: 13
Column: 1
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
Reported by Pylint.
Line: 14
Column: 5
@unittest.skipIf(not workspace.C.has_mkldnn, "Skipping as we do not have mkldnn.")
class TestMKLBasic(test_util.TestCase):
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
Reported by Pylint.
Line: 17
Column: 9
def testSpatialBNTestingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
mean = np.random.randn(input_channel).astype(np.float32)
var = np.random.rand(input_channel).astype(np.float32) + 0.5
Reported by Pylint.
Line: 40
Column: 1
net.SpatialBN(["X", "scale", "bias","mean","var"], "Y", order="NCHW",
is_test=True,
epsilon=1e-5)
net.SpatialBN(["X_mkl", "scale_mkl", "bias_mkl","mean_mkl","var_mkl"], "Y_mkl", order="NCHW",
is_test=True,
epsilon=1e-5, device_option=mkl_do)
workspace.CreateNet(net)
workspace.RunNet(net)
Reported by Pylint.
Line: 56
Column: 5
print("FC CPU runtime {}, MKL runtime {}.".format(runtime[1], runtime[2]))
def testSpatialBNTrainingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
mean = np.random.randn(input_channel).astype(np.float32)
Reported by Pylint.
Line: 56
Column: 5
print("FC CPU runtime {}, MKL runtime {}.".format(runtime[1], runtime[2]))
def testSpatialBNTrainingSpeed(self):
input_channel = 10
X = np.random.rand(1, input_channel, 100, 100).astype(np.float32) - 0.5
scale = np.random.rand(input_channel).astype(np.float32) + 0.5
bias = np.random.rand(input_channel).astype(np.float32) - 0.5
mean = np.random.randn(input_channel).astype(np.float32)
Reported by Pylint.
caffe2/contrib/playground/resnetdemo/IN1k_resnet.py
12 issues
Line: 34
Column: 34
self.log.info("Model creation completed")
def fun_per_epoch_b4RunNet(self, epoch):
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
Reported by Pylint.
Line: 34
Column: 28
self.log.info("Model creation completed")
def fun_per_epoch_b4RunNet(self, epoch):
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
Reported by Pylint.
Line: 38
Column: 40
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
learning_rate = 0.05
for idx in range(self.opts['distributed']['first_xpu_id'],
self.opts['distributed']['first_xpu_id'] +
self.opts['distributed']['num_xpus']):
Reported by Pylint.
Line: 38
Column: 33
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
learning_rate = 0.05
for idx in range(self.opts['distributed']['first_xpu_id'],
self.opts['distributed']['first_xpu_id'] +
self.opts['distributed']['num_xpus']):
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
from caffe2.python import workspace, cnn, core
from caffe2.python import timeout_guard
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
from caffe2.python import workspace, cnn, core
from caffe2.python import timeout_guard
Reported by Pylint.
Line: 13
Column: 1
from caffe2.proto import caffe2_pb2
def init_model(self):
train_model = cnn.CNNModelHelper(
order="NCHW",
name="resnet",
use_cudnn=True,
cudnn_exhaustive_search=False
Reported by Pylint.
Line: 34
Column: 1
self.log.info("Model creation completed")
def fun_per_epoch_b4RunNet(self, epoch):
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
Reported by Pylint.
Line: 34
Column: 1
self.log.info("Model creation completed")
def fun_per_epoch_b4RunNet(self, epoch):
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
Reported by Pylint.
Line: 38
Column: 1
pass
def fun_per_iter_b4RunNet(self, epoch, epoch_iter):
learning_rate = 0.05
for idx in range(self.opts['distributed']['first_xpu_id'],
self.opts['distributed']['first_xpu_id'] +
self.opts['distributed']['num_xpus']):
Reported by Pylint.
caffe2/python/modeling/parameter_info.py
12 issues
Line: 1
Column: 1
from caffe2.python import core
import numpy as np
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class ParameterTags(object):
BIAS = 'BIAS'
WEIGHT = 'WEIGHT'
COMPUTED_PARAM = 'COMPUTED_PARAM'
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class ParameterTags(object):
BIAS = 'BIAS'
WEIGHT = 'WEIGHT'
COMPUTED_PARAM = 'COMPUTED_PARAM'
Reported by Pylint.
Line: 11
Column: 1
import numpy as np
class ParameterTags(object):
BIAS = 'BIAS'
WEIGHT = 'WEIGHT'
COMPUTED_PARAM = 'COMPUTED_PARAM'
Reported by Pylint.
Line: 17
Column: 1
COMPUTED_PARAM = 'COMPUTED_PARAM'
class ParameterInfo(object):
def __init__(
self, param_id, param, key=None, shape=None, length=None,
grad=None, blob_copy=None):
assert isinstance(param, core.BlobReference)
Reported by Pylint.
Line: 17
Column: 1
COMPUTED_PARAM = 'COMPUTED_PARAM'
class ParameterInfo(object):
def __init__(
self, param_id, param, key=None, shape=None, length=None,
grad=None, blob_copy=None):
assert isinstance(param, core.BlobReference)
Reported by Pylint.
Line: 17
Column: 1
COMPUTED_PARAM = 'COMPUTED_PARAM'
class ParameterInfo(object):
def __init__(
self, param_id, param, key=None, shape=None, length=None,
grad=None, blob_copy=None):
assert isinstance(param, core.BlobReference)
Reported by Pylint.
Line: 19
Column: 5
class ParameterInfo(object):
def __init__(
self, param_id, param, key=None, shape=None, length=None,
grad=None, blob_copy=None):
assert isinstance(param, core.BlobReference)
self.param_id = param_id
self.name = str(param)
Reported by Pylint.
Line: 22
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def __init__(
self, param_id, param, key=None, shape=None, length=None,
grad=None, blob_copy=None):
assert isinstance(param, core.BlobReference)
self.param_id = param_id
self.name = str(param)
self.blob = param
self.key = key
self.shape = shape
Reported by Bandit.
Line: 41
Column: 5
self._optimizer = None
@property
def parameter(self):
return self.blob
@property
def optimizer(self):
return self._optimizer
Reported by Pylint.
benchmarks/operator_benchmark/common/tests/jit_forward_test.py
12 issues
Line: 1
Column: 1
import operator_benchmark as op_bench
import torch
intraop_bench_configs = op_bench.config_list(
attrs=[
[8, 16],
],
attr_names=["M", "N"],
tags=["short"],
Reported by Pylint.
Line: 2
Column: 1
import operator_benchmark as op_bench
import torch
intraop_bench_configs = op_bench.config_list(
attrs=[
[8, 16],
],
attr_names=["M", "N"],
tags=["short"],
Reported by Pylint.
Line: 24
Column: 9
class TorchSumBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N):
self.input_one = torch.rand(M, N)
self.set_module_name("sum")
# This is a very temporary method and will be removed soon, so
# don't use this method in your benchmark
# TODO(mingzhe): use one forward method for both JIT and Eager
Reported by Pylint.
Line: 29
Column: 3
# This is a very temporary method and will be removed soon, so
# don't use this method in your benchmark
# TODO(mingzhe): use one forward method for both JIT and Eager
def jit_forward(self, iters):
return torch_sumall(self.input_one, iters)
op_bench.generate_pt_test(intraop_bench_configs, TorchSumBenchmark)
Reported by Pylint.
Line: 1
Column: 1
import operator_benchmark as op_bench
import torch
intraop_bench_configs = op_bench.config_list(
attrs=[
[8, 16],
],
attr_names=["M", "N"],
tags=["short"],
Reported by Pylint.
Line: 13
Column: 1
)
@torch.jit.script
def torch_sumall(a, iterations):
# type: (Tensor, int)
result = 0.0
for _ in range(iterations):
result += float(torch.sum(a))
a[0][0] += 0.01
Reported by Pylint.
Line: 13
Column: 1
)
@torch.jit.script
def torch_sumall(a, iterations):
# type: (Tensor, int)
result = 0.0
for _ in range(iterations):
result += float(torch.sum(a))
a[0][0] += 0.01
Reported by Pylint.
Line: 22
Column: 1
return result
class TorchSumBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N):
self.input_one = torch.rand(M, N)
self.set_module_name("sum")
# This is a very temporary method and will be removed soon, so
Reported by Pylint.
Line: 23
Column: 5
class TorchSumBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N):
self.input_one = torch.rand(M, N)
self.set_module_name("sum")
# This is a very temporary method and will be removed soon, so
# don't use this method in your benchmark
Reported by Pylint.
Line: 23
Column: 5
class TorchSumBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N):
self.input_one = torch.rand(M, N)
self.set_module_name("sum")
# This is a very temporary method and will be removed soon, so
# don't use this method in your benchmark
Reported by Pylint.
benchmarks/distributed/rpc/rl/observer.py
12 issues
Line: 4
Column: 1
import random
import time
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import rpc_sync
from agent import AgentBase
Reported by Pylint.
Line: 5
Column: 1
import time
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import rpc_sync
from agent import AgentBase
Reported by Pylint.
Line: 6
Column: 1
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import rpc_sync
from agent import AgentBase
class ObserverBase:
Reported by Pylint.
Line: 25
Column: 9
state_size (list): List of integers denoting dimensions of state
batch (bool): Whether agent will be using batch select action
"""
self.state_size = state_size
self.select_action = AgentBase.select_action_batch if batch else AgentBase.select_action_non_batch
def reset(self):
r"""
Resets state randomly
Reported by Pylint.
Line: 26
Column: 9
batch (bool): Whether agent will be using batch select action
"""
self.state_size = state_size
self.select_action = AgentBase.select_action_batch if batch else AgentBase.select_action_non_batch
def reset(self):
r"""
Resets state randomly
"""
Reported by Pylint.
Line: 35
Column: 20
state = torch.rand(self.state_size)
return state
def step(self, action):
r"""
Generates random state and reward
Args:
action (int): Int received from agent representing action to take on state
"""
Reported by Pylint.
Line: 1
Column: 1
import random
import time
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import rpc_sync
from agent import AgentBase
Reported by Pylint.
Line: 11
Column: 1
from agent import AgentBase
class ObserverBase:
def __init__(self):
r"""
Inits observer class
"""
self.id = rpc.get_worker_info().id
Reported by Pylint.
Line: 16
Column: 9
r"""
Inits observer class
"""
self.id = rpc.get_worker_info().id
def set_state(self, state_size, batch):
r"""
Further initializes observer to be aware of rpc environment
Args:
Reported by Pylint.
Line: 26
Column: 1
batch (bool): Whether agent will be using batch select action
"""
self.state_size = state_size
self.select_action = AgentBase.select_action_batch if batch else AgentBase.select_action_non_batch
def reset(self):
r"""
Resets state randomly
"""
Reported by Pylint.
caffe2/python/mkl/mkl_elementwise_add_op_test.py
12 issues
Line: 7
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.mkl_test_util as mu
Reported by Pylint.
Line: 8
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.mkl_test_util as mu
Reported by Pylint.
Line: 15
Column: 22
import caffe2.python.mkl_test_util as mu
@unittest.skipIf(not workspace.C.has_mkldnn,
"Skipping as we do not have mkldnn.")
class MKLElementwiseAddTest(hu.HypothesisTestCase):
@given(size=st.integers(7, 9),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
Reported by Pylint.
Line: 28
Column: 34
input_channels,
batch_size,
inplace,
gc,
dc):
op = core.CreateOperator(
"Add",
["X0", "X1"],
["X0" if inplace else "Y"],
Reported by Pylint.
Line: 1
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
Reported by Pylint.
Line: 17
Column: 1
@unittest.skipIf(not workspace.C.has_mkldnn,
"Skipping as we do not have mkldnn.")
class MKLElementwiseAddTest(hu.HypothesisTestCase):
@given(size=st.integers(7, 9),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_elementwise_add(self,
size,
input_channels,
batch_size,
inplace,
gc,
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_elementwise_add(self,
size,
input_channels,
batch_size,
inplace,
gc,
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_elementwise_add(self,
size,
input_channels,
batch_size,
inplace,
gc,
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_elementwise_add(self,
size,
input_channels,
batch_size,
inplace,
gc,
Reported by Pylint.
benchmarks/distributed/rpc/parameter_server/metrics/MetricsLogger.py
12 issues
Line: 1
Column: 1
from .CPUMetric import CPUMetric
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
Reported by Pylint.
Line: 2
Column: 1
from .CPUMetric import CPUMetric
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
Reported by Pylint.
Line: 11
Column: 28
self.rank = rank
self.metrics = {}
def record_start(self, type, key, name, cuda):
if type in self.metrics and key in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} already exists")
if cuda:
if self.rank is None:
raise RuntimeError("rank is required for cuda")
Reported by Pylint.
Line: 25
Column: 26
self.metrics[type][key] = metric
metric.record_start()
def record_end(self, type, key):
if type not in self.metrics or key not in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} not found")
if self.metrics[type][key].get_end() is not None:
raise RuntimeError(f"end for metric_type={type} with key={key} already exists")
self.metrics[type][key].record_end()
Reported by Pylint.
Line: 1
Column: 1
from .CPUMetric import CPUMetric
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
Reported by Pylint.
Line: 1
Column: 1
from .CPUMetric import CPUMetric
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
Reported by Pylint.
Line: 5
Column: 1
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
Reported by Pylint.
Line: 11
Column: 5
self.rank = rank
self.metrics = {}
def record_start(self, type, key, name, cuda):
if type in self.metrics and key in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} already exists")
if cuda:
if self.rank is None:
raise RuntimeError("rank is required for cuda")
Reported by Pylint.
Line: 25
Column: 5
self.metrics[type][key] = metric
metric.record_start()
def record_end(self, type, key):
if type not in self.metrics or key not in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} not found")
if self.metrics[type][key].get_end() is not None:
raise RuntimeError(f"end for metric_type={type} with key={key} already exists")
self.metrics[type][key].record_end()
Reported by Pylint.
Line: 32
Column: 5
raise RuntimeError(f"end for metric_type={type} with key={key} already exists")
self.metrics[type][key].record_end()
def clear_metrics(self):
self.metrics.clear()
def get_metrics(self):
return self.metrics
Reported by Pylint.
benchmarks/fastrnns/runner.py
12 issues
Line: 3
Column: 1
from collections import namedtuple
from functools import partial
import torch
import torchvision.models as cnn
from .factory import (dropoutlstm_creator, imagenet_cnn_creator,
layernorm_pytorch_lstm_creator, lnlstm_creator,
lstm_creator, lstm_multilayer_creator,
lstm_premul_bias_creator, lstm_premul_creator,
Reported by Pylint.
Line: 4
Column: 1
from collections import namedtuple
from functools import partial
import torch
import torchvision.models as cnn
from .factory import (dropoutlstm_creator, imagenet_cnn_creator,
layernorm_pytorch_lstm_creator, lnlstm_creator,
lstm_creator, lstm_multilayer_creator,
lstm_premul_bias_creator, lstm_premul_creator,
Reported by Pylint.
Line: 6
Column: 1
import torch
import torchvision.models as cnn
from .factory import (dropoutlstm_creator, imagenet_cnn_creator,
layernorm_pytorch_lstm_creator, lnlstm_creator,
lstm_creator, lstm_multilayer_creator,
lstm_premul_bias_creator, lstm_premul_creator,
lstm_simple_creator, pytorch_lstm_creator,
varlen_lstm_creator, varlen_pytorch_lstm_creator)
Reported by Pylint.
Line: 16
Column: 9
class DisableCuDNN():
def __enter__(self):
self.saved = torch.backends.cudnn.enabled
torch.backends.cudnn.enabled = False
def __exit__(self, *args, **kwargs):
torch.backends.cudnn.enabled = self.saved
Reported by Pylint.
Line: 1
Column: 1
from collections import namedtuple
from functools import partial
import torch
import torchvision.models as cnn
from .factory import (dropoutlstm_creator, imagenet_cnn_creator,
layernorm_pytorch_lstm_creator, lnlstm_creator,
lstm_creator, lstm_multilayer_creator,
lstm_premul_bias_creator, lstm_premul_creator,
Reported by Pylint.
Line: 14
Column: 1
varlen_lstm_creator, varlen_pytorch_lstm_creator)
class DisableCuDNN():
def __enter__(self):
self.saved = torch.backends.cudnn.enabled
torch.backends.cudnn.enabled = False
def __exit__(self, *args, **kwargs):
Reported by Pylint.
Line: 23
Column: 1
torch.backends.cudnn.enabled = self.saved
class DummyContext():
def __enter__(self):
pass
def __exit__(self, *args, **kwargs):
pass
Reported by Pylint.
Line: 31
Column: 1
pass
class AssertNoJIT():
def __enter__(self):
import os
enabled = os.environ.get('PYTORCH_JIT', 1)
assert not enabled
Reported by Pylint.
Line: 33
Column: 9
class AssertNoJIT():
def __enter__(self):
import os
enabled = os.environ.get('PYTORCH_JIT', 1)
assert not enabled
def __exit__(self, *args, **kwargs):
pass
Reported by Pylint.
Line: 35
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def __enter__(self):
import os
enabled = os.environ.get('PYTORCH_JIT', 1)
assert not enabled
def __exit__(self, *args, **kwargs):
pass
Reported by Bandit.
caffe2/python/ideep/channel_shuffle_op_test.py
12 issues
Line: 7
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.ideep_test_util as mu
Reported by Pylint.
Line: 8
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.ideep_test_util as mu
Reported by Pylint.
Line: 15
Column: 22
import caffe2.python.ideep_test_util as mu
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class ChannelShuffleTest(hu.HypothesisTestCase):
@given(size=st.integers(8, 10),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 32),
group=st.integers(2, 4),
Reported by Pylint.
Line: 1
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
Reported by Pylint.
Line: 16
Column: 1
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class ChannelShuffleTest(hu.HypothesisTestCase):
@given(size=st.integers(8, 10),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 32),
group=st.integers(2, 4),
stride=st.integers(1, 3),
Reported by Pylint.
Line: 26
Column: 1
kernel=st.integers(3, 5),
**mu.gcs)
@settings(max_examples=10, deadline=None)
def test_channel_shuffle(self, size, input_channels, batch_size, group, stride, pad, kernel, gc, dc):
op = core.CreateOperator(
"ChannelShuffle",
["X"],
["Y"],
group=group,
Reported by Pylint.
Line: 26
Column: 5
kernel=st.integers(3, 5),
**mu.gcs)
@settings(max_examples=10, deadline=None)
def test_channel_shuffle(self, size, input_channels, batch_size, group, stride, pad, kernel, gc, dc):
op = core.CreateOperator(
"ChannelShuffle",
["X"],
["Y"],
group=group,
Reported by Pylint.
Line: 26
Column: 5
kernel=st.integers(3, 5),
**mu.gcs)
@settings(max_examples=10, deadline=None)
def test_channel_shuffle(self, size, input_channels, batch_size, group, stride, pad, kernel, gc, dc):
op = core.CreateOperator(
"ChannelShuffle",
["X"],
["Y"],
group=group,
Reported by Pylint.
Line: 26
Column: 5
kernel=st.integers(3, 5),
**mu.gcs)
@settings(max_examples=10, deadline=None)
def test_channel_shuffle(self, size, input_channels, batch_size, group, stride, pad, kernel, gc, dc):
op = core.CreateOperator(
"ChannelShuffle",
["X"],
["Y"],
group=group,
Reported by Pylint.
Line: 26
Column: 5
kernel=st.integers(3, 5),
**mu.gcs)
@settings(max_examples=10, deadline=None)
def test_channel_shuffle(self, size, input_channels, batch_size, group, stride, pad, kernel, gc, dc):
op = core.CreateOperator(
"ChannelShuffle",
["X"],
["Y"],
group=group,
Reported by Pylint.
torch/testing/_check_kernel_launches.py
12 issues
Line: 1
Column: 1
import os
import re
import sys
from typing import List
__all__ = [
"check_code_for_cuda_kernel_launches",
"check_cuda_kernel_launches",
]
Reported by Pylint.
Line: 46
Column: 1
r"\s*;(?![^;}]*C10_CUDA_KERNEL_LAUNCH_CHECK\(\);)", flags=re.MULTILINE
)
def find_matching_paren(s: str, startpos: int) -> int:
"""Given a string "prefix (unknown number of characters) suffix"
and the position of the first `(` returns the index of the character
1 past the `)`, accounting for paren nesting
"""
opening = 0
Reported by Pylint.
Line: 52
Column: 12
1 past the `)`, accounting for paren nesting
"""
opening = 0
for i, c in enumerate(s[startpos:]):
if c == '(':
opening += 1
elif c == ')':
opening -= 1
if opening == 0:
Reported by Pylint.
Line: 63
Column: 1
raise IndexError("Closing parens not found!")
def should_exclude_file(filename) -> bool:
for exclude_suffix in exclude_files:
if filename.endswith(exclude_suffix):
return True
return False
Reported by Pylint.
Line: 91
Column: 9
code = '\n'.join(code) # Put it back together
num_launches_without_checks = 0
for m in kernel_launch_start.finditer(code):
end_paren = find_matching_paren(code, m.end() - 1)
if has_check.match(code, end_paren):
num_launches_without_checks += 1
context = code[m.start():end_paren + 1]
print(f"Missing C10_CUDA_KERNEL_LAUNCH_CHECK in '{filename}'. Context:\n{context}", file=sys.stderr)
Reported by Pylint.
Line: 96
Column: 1
if has_check.match(code, end_paren):
num_launches_without_checks += 1
context = code[m.start():end_paren + 1]
print(f"Missing C10_CUDA_KERNEL_LAUNCH_CHECK in '{filename}'. Context:\n{context}", file=sys.stderr)
return num_launches_without_checks
def check_file(filename):
Reported by Pylint.
Line: 114
Column: 5
return 0
if should_exclude_file(filename):
return 0
fo = open(filename, "r")
contents = fo.read()
unsafeCount = check_code_for_cuda_kernel_launches(contents, filename)
fo.close()
return unsafeCount
Reported by Pylint.
Line: 116
Column: 5
return 0
fo = open(filename, "r")
contents = fo.read()
unsafeCount = check_code_for_cuda_kernel_launches(contents, filename)
fo.close()
return unsafeCount
def check_cuda_kernel_launches():
Reported by Pylint.
Line: 136
Column: 1
for root, dirnames, filenames in os.walk(torch_dir):
# `$BASE/build` and `$BASE/torch/include` are generated
# so we don't want to flag their contents
if root == os.path.join(torch_dir, "build") or root == os.path.join(torch_dir, "torch/include"):
# Curtail search by modifying dirnames and filenames in place
# Yes, this is the way to do this, see `help(os.walk)`
dirnames[:] = []
continue
Reported by Pylint.
Line: 142
Column: 13
dirnames[:] = []
continue
for x in filenames:
filename = os.path.join(root, x)
file_result = check_file(filename)
if file_result > 0:
kernels_without_checks += file_result
files_without_checks.append(filename)
Reported by Pylint.