The following issues were found
caffe2/python/benchmarks/sparse_normalize_benchmark.py
7 issues
Line: 6
Column: 1
# import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, workspace
def benchmark_sparse_normalize(
categorical_limit,
embedding_size,
Reported by Pylint.
Line: 1
Column: 1
import argparse
import datetime
# import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, workspace
def benchmark_sparse_normalize(
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python import core, workspace
def benchmark_sparse_normalize(
categorical_limit,
embedding_size,
average_len,
batch_size,
iterations,
Reported by Pylint.
Line: 9
Column: 1
from caffe2.python import core, workspace
def benchmark_sparse_normalize(
categorical_limit,
embedding_size,
average_len,
batch_size,
iterations,
Reported by Pylint.
Line: 27
Column: 9
init_net = core.Net("init_net")
if fp16:
op = core.CreateOperator("FloatToHalf", "X", "X_fp16")
init_net.Proto().op.extend([op])
l3_cache_size = 30 * 2 ** 20 // 4
# In order to produce truly random lengths and indices, we will embed a
# Python operator in the net to generate them.
Reported by Pylint.
Line: 33
Column: 5
# In order to produce truly random lengths and indices, we will embed a
# Python operator in the net to generate them.
def f(_, outputs):
lengths = np.random.randint(
int(average_len * 0.75), int(average_len * 1.25), batch_size
).astype(np.int32)
indices = np.random.randint(0, categorical_limit, np.sum(lengths)).astype(
np.int64
Reported by Pylint.
Line: 51
Column: 5
workspace.RunNetOnce(init_net)
net = core.Net("mynet")
op = core.CreateOperator(
"Float16SparseNormalize" if fp16 else "SparseNormalize",
["X_fp16", "indices"] if fp16 else ["X", "indices"],
"X_fp16" if fp16 else "X",
)
net.Proto().external_input.append("X")
Reported by Pylint.
caffe2/python/helpers/train.py
7 issues
Line: 22
Column: 1
return [w for w in model.weights if w.GetNameScope() == namescope]
def iter(model, blob_out, **kwargs):
if 'device_option' in kwargs:
del kwargs['device_option']
model.param_init_net.ConstantFill(
[],
blob_out,
Reported by Pylint.
Line: 1
Column: 1
## @package train
# Module caffe2.python.helpers.train
from caffe2.python import core, scope
from caffe2.proto import caffe2_pb2
Reported by Pylint.
Line: 16
Column: 5
if namescope is None:
namescope = scope.CurrentNameScope()
if namescope == '':
return model.weights[:]
else:
return [w for w in model.weights if w.GetNameScope() == namescope]
Reported by Pylint.
Line: 22
Column: 1
return [w for w in model.weights if w.GetNameScope() == namescope]
def iter(model, blob_out, **kwargs):
if 'device_option' in kwargs:
del kwargs['device_option']
model.param_init_net.ConstantFill(
[],
blob_out,
Reported by Pylint.
Line: 37
Column: 1
return model.net.Iter(blob_out, blob_out, **kwargs)
def accuracy(model, blob_in, blob_out, **kwargs):
dev = kwargs['device_option'] if 'device_option' in kwargs \
else scope.CurrentDeviceScope()
is_cpu = dev is None or dev.device_type == caffe2_pb2.CPU
# We support top_k > 1 only on CPU
Reported by Pylint.
Line: 68
Column: 5
"""
if weight_decay <= 0.0:
return
wd = model.param_init_net.ConstantFill(
[], 'wd', shape=[1], value=weight_decay
)
ONE = model.param_init_net.ConstantFill([], "ONE", shape=[1], value=1.0)
for param in _get_weights(model):
# Equivalent to: grad += wd * param
Reported by Pylint.
Line: 71
Column: 5
wd = model.param_init_net.ConstantFill(
[], 'wd', shape=[1], value=weight_decay
)
ONE = model.param_init_net.ConstantFill([], "ONE", shape=[1], value=1.0)
for param in _get_weights(model):
# Equivalent to: grad += wd * param
grad = model.param_to_grad[param]
model.net.WeightedSum(
[grad, ONE, param, wd],
Reported by Pylint.
caffe2/python/operator_test/conftest.py
7 issues
Line: 45
Column: 5
output = config.getoption('output', default=serial.DATA_DIR)
disable = config.getoption('disable', default=False)
disable_coverage = config.getoption('disable_coverage', default=False)
serial._output_context.__setattr__('should_generate_output', generate)
serial._output_context.__setattr__('output_dir', output)
serial._output_context.__setattr__('disable_serialized_check', disable)
serial._output_context.__setattr__('disable_gen_coverage', disable_coverage)
Reported by Pylint.
Line: 46
Column: 5
disable = config.getoption('disable', default=False)
disable_coverage = config.getoption('disable_coverage', default=False)
serial._output_context.__setattr__('should_generate_output', generate)
serial._output_context.__setattr__('output_dir', output)
serial._output_context.__setattr__('disable_serialized_check', disable)
serial._output_context.__setattr__('disable_gen_coverage', disable_coverage)
Reported by Pylint.
Line: 47
Column: 5
disable_coverage = config.getoption('disable_coverage', default=False)
serial._output_context.__setattr__('should_generate_output', generate)
serial._output_context.__setattr__('output_dir', output)
serial._output_context.__setattr__('disable_serialized_check', disable)
serial._output_context.__setattr__('disable_gen_coverage', disable_coverage)
Reported by Pylint.
Line: 48
Column: 5
serial._output_context.__setattr__('should_generate_output', generate)
serial._output_context.__setattr__('output_dir', output)
serial._output_context.__setattr__('disable_serialized_check', disable)
serial._output_context.__setattr__('disable_gen_coverage', disable_coverage)
Reported by Pylint.
Line: 1
Column: 1
import caffe2.python.serialized_test.serialized_test_util as serial
def pytest_addoption(parser):
Reported by Pylint.
Line: 9
Column: 1
import caffe2.python.serialized_test.serialized_test_util as serial
def pytest_addoption(parser):
parser.addoption(
'-G',
'--generate-serialized',
action='store_true',
dest='generate',
Reported by Pylint.
Line: 40
Column: 1
)
def pytest_configure(config):
generate = config.getoption('generate', default=False)
output = config.getoption('output', default=serial.DATA_DIR)
disable = config.getoption('disable', default=False)
disable_coverage = config.getoption('disable_coverage', default=False)
serial._output_context.__setattr__('should_generate_output', generate)
Reported by Pylint.
caffe2/python/predictor/mobile_exporter.py
7 issues
Line: 1
Column: 1
## @package mobile_exporter
# Module caffe2.python.mobile_exporter
from caffe2.python import core, utils
from caffe2.proto import caffe2_pb2
Reported by Pylint.
Line: 18
Column: 5
run the operator to put the blob to workspace.
uint8 is stored as an array of string with one element.
'''
kTypeNameMapper = {
np.dtype('float32'): "GivenTensorFill",
np.dtype('int32'): "GivenTensorIntFill",
np.dtype('int64'): "GivenTensorInt64Fill",
np.dtype('uint8'): "GivenTensorByteStringToUInt8Fill",
np.dtype('O'): "GivenTensorStringFill"
Reported by Pylint.
Line: 38
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# model which can be used for post processing results in subsequent ops.
if blob.dtype == np.dtype('O'):
for blob_val in blob:
assert(isinstance(blob_val, bytes))
op = core.CreateOperator(
kTypeNameMapper[blob.dtype],
[], [name],
arg=[
Reported by Bandit.
Line: 38
Column: 1
# model which can be used for post processing results in subsequent ops.
if blob.dtype == np.dtype('O'):
for blob_val in blob:
assert(isinstance(blob_val, bytes))
op = core.CreateOperator(
kTypeNameMapper[blob.dtype],
[], [name],
arg=[
Reported by Pylint.
Line: 40
Column: 5
for blob_val in blob:
assert(isinstance(blob_val, bytes))
op = core.CreateOperator(
kTypeNameMapper[blob.dtype],
[], [name],
arg=[
utils.MakeArgument("shape", shape),
utils.MakeArgument("values", values),
Reported by Pylint.
Line: 51
Column: 1
net.op.extend([op])
def Export(workspace, net, params):
"""Returns init_net and predict_net suitable for writing to disk
and loading into a Predictor"""
proto = net if isinstance(net, caffe2_pb2.NetDef) else net.Proto()
predict_net = caffe2_pb2.NetDef()
predict_net.CopyFrom(proto)
Reported by Pylint.
Line: 51
Column: 1
net.op.extend([op])
def Export(workspace, net, params):
"""Returns init_net and predict_net suitable for writing to disk
and loading into a Predictor"""
proto = net if isinstance(net, caffe2_pb2.NetDef) else net.Proto()
predict_net = caffe2_pb2.NetDef()
predict_net.CopyFrom(proto)
Reported by Pylint.
test/jit/test_python_ir.py
7 issues
Line: 1
Column: 1
import torch
from torch.testing._internal.jit_utils import JitTestCase
if __name__ == '__main__':
raise RuntimeError("This test file is not meant to be run directly, use:\n\n"
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
class TestPythonIr(JitTestCase):
Reported by Pylint.
Line: 2
Column: 1
import torch
from torch.testing._internal.jit_utils import JitTestCase
if __name__ == '__main__':
raise RuntimeError("This test file is not meant to be run directly, use:\n\n"
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
class TestPythonIr(JitTestCase):
Reported by Pylint.
Line: 1
Column: 1
import torch
from torch.testing._internal.jit_utils import JitTestCase
if __name__ == '__main__':
raise RuntimeError("This test file is not meant to be run directly, use:\n\n"
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
class TestPythonIr(JitTestCase):
Reported by Pylint.
Line: 9
Column: 1
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
class TestPythonIr(JitTestCase):
def test_param_strides(self):
def trace_me(arg):
return arg
t = torch.zeros(1, 3, 16, 16)
traced = torch.jit.trace(trace_me, t)
Reported by Pylint.
Line: 9
Column: 1
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
class TestPythonIr(JitTestCase):
def test_param_strides(self):
def trace_me(arg):
return arg
t = torch.zeros(1, 3, 16, 16)
traced = torch.jit.trace(trace_me, t)
Reported by Pylint.
Line: 10
Column: 5
"instead.")
class TestPythonIr(JitTestCase):
def test_param_strides(self):
def trace_me(arg):
return arg
t = torch.zeros(1, 3, 16, 16)
traced = torch.jit.trace(trace_me, t)
value = list(traced.graph.param_node().outputs())[0]
Reported by Pylint.
Line: 13
Column: 9
def test_param_strides(self):
def trace_me(arg):
return arg
t = torch.zeros(1, 3, 16, 16)
traced = torch.jit.trace(trace_me, t)
value = list(traced.graph.param_node().outputs())[0]
real_strides = list(t.stride())
type_strides = value.type().strides()
self.assertEqual(real_strides, type_strides)
Reported by Pylint.
caffe2/python/onnx/tests/test_utils.py
7 issues
Line: 1
Column: 1
## @package onnx
# Module caffe2.python.onnx.tests.test_utils
import unittest
Reported by Pylint.
Line: 13
Column: 1
import numpy as np
class TestCase(unittest.TestCase):
def setUp(self):
np.random.seed(seed=0)
def assertSameOutputs(self, outputs1, outputs2, decimal=7):
Reported by Pylint.
Line: 18
Column: 5
def setUp(self):
np.random.seed(seed=0)
def assertSameOutputs(self, outputs1, outputs2, decimal=7):
self.assertEqual(len(outputs1), len(outputs2))
for o1, o2 in zip(outputs1, outputs2):
self.assertEqual(o1.dtype, o2.dtype)
np.testing.assert_almost_equal(o1, o2, decimal=decimal)
Reported by Pylint.
Line: 18
Column: 5
def setUp(self):
np.random.seed(seed=0)
def assertSameOutputs(self, outputs1, outputs2, decimal=7):
self.assertEqual(len(outputs1), len(outputs2))
for o1, o2 in zip(outputs1, outputs2):
self.assertEqual(o1.dtype, o2.dtype)
np.testing.assert_almost_equal(o1, o2, decimal=decimal)
Reported by Pylint.
Line: 20
Column: 13
def assertSameOutputs(self, outputs1, outputs2, decimal=7):
self.assertEqual(len(outputs1), len(outputs2))
for o1, o2 in zip(outputs1, outputs2):
self.assertEqual(o1.dtype, o2.dtype)
np.testing.assert_almost_equal(o1, o2, decimal=decimal)
def add_test_case(self, name, test_func):
if not name.startswith('test_'):
Reported by Pylint.
Line: 20
Column: 17
def assertSameOutputs(self, outputs1, outputs2, decimal=7):
self.assertEqual(len(outputs1), len(outputs2))
for o1, o2 in zip(outputs1, outputs2):
self.assertEqual(o1.dtype, o2.dtype)
np.testing.assert_almost_equal(o1, o2, decimal=decimal)
def add_test_case(self, name, test_func):
if not name.startswith('test_'):
Reported by Pylint.
Line: 24
Column: 5
self.assertEqual(o1.dtype, o2.dtype)
np.testing.assert_almost_equal(o1, o2, decimal=decimal)
def add_test_case(self, name, test_func):
if not name.startswith('test_'):
raise ValueError('Test name must start with test_: {}'.format(name))
if hasattr(self, name):
raise ValueError('Duplicated test name: {}'.format(name))
setattr(self, name, test_func)
Reported by Pylint.
caffe2/python/record_queue.py
7 issues
Line: 17
Column: 1
Struct, Field, from_column_list)
class _QueueReader(Reader):
def __init__(self, blobs_queue, schema, name=None):
"""Don't call this directly. Instead, use dataset.reader()"""
super(_QueueReader, self).__init__(schema)
self.blobs_queue = blobs_queue
self.name = name
Reported by Pylint.
Line: 20
Column: 9
class _QueueReader(Reader):
def __init__(self, blobs_queue, schema, name=None):
"""Don't call this directly. Instead, use dataset.reader()"""
super(_QueueReader, self).__init__(schema)
self.blobs_queue = blobs_queue
self.name = name
def read(self, read_net):
with core.NameScope(read_net.NextName(self.name)):
Reported by Pylint.
Line: 48
Column: 1
return status
class RecordQueue(object):
""" The class is used to feed data with some process from a reader into a
queue and provider a reader interface for data fetching from the queue.
"""
def __init__(self, fields, name=None, capacity=1,
enforce_unique_name=False, num_threads=1):
Reported by Pylint.
Line: 48
Column: 1
return status
class RecordQueue(object):
""" The class is used to feed data with some process from a reader into a
queue and provider a reader interface for data fetching from the queue.
"""
def __init__(self, fields, name=None, capacity=1,
enforce_unique_name=False, num_threads=1):
Reported by Pylint.
Line: 52
Column: 5
""" The class is used to feed data with some process from a reader into a
queue and provider a reader interface for data fetching from the queue.
"""
def __init__(self, fields, name=None, capacity=1,
enforce_unique_name=False, num_threads=1):
assert isinstance(fields, list) or isinstance(fields, Struct), (
'fields must be either a Struct or a list of raw field names.')
if isinstance(fields, list):
fields = from_column_list(fields)
Reported by Pylint.
Line: 54
Column: 16
"""
def __init__(self, fields, name=None, capacity=1,
enforce_unique_name=False, num_threads=1):
assert isinstance(fields, list) or isinstance(fields, Struct), (
'fields must be either a Struct or a list of raw field names.')
if isinstance(fields, list):
fields = from_column_list(fields)
self.schema = fields
self.name = name or 'queue'
Reported by Pylint.
Line: 54
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
"""
def __init__(self, fields, name=None, capacity=1,
enforce_unique_name=False, num_threads=1):
assert isinstance(fields, list) or isinstance(fields, Struct), (
'fields must be either a Struct or a list of raw field names.')
if isinstance(fields, list):
fields = from_column_list(fields)
self.schema = fields
self.name = name or 'queue'
Reported by Bandit.
caffe2/python/rnn/lstm_comparison.py
7 issues
Line: 9
Column: 13
from copy import copy
@utils.debug
def Compare(args):
results = []
num_iters = 1000
args.gpu = True
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)):
for batch_size in [64, 128, 256]:
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import workspace, core, lstm_benchmark, utils
from copy import copy
@utils.debug
def Compare(args):
Reported by Pylint.
Line: 6
Column: 1
from caffe2.python import workspace, core, lstm_benchmark, utils
from copy import copy
@utils.debug
def Compare(args):
results = []
num_iters = 1000
Reported by Pylint.
Line: 9
Column: 1
from copy import copy
@utils.debug
def Compare(args):
results = []
num_iters = 1000
args.gpu = True
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)):
for batch_size in [64, 128, 256]:
Reported by Pylint.
Line: 9
Column: 1
from copy import copy
@utils.debug
def Compare(args):
results = []
num_iters = 1000
args.gpu = True
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)):
for batch_size in [64, 128, 256]:
Reported by Pylint.
Line: 33
Column: 9
print(args)
print("t_cudnn / t_own: {}".format(t_cudnn / t_own))
for args, t_own, t_cudnn in results:
print("{}: cudnn time: {}, own time: {}, ratio: {}".format(
str(args), t_cudnn, t_own, t_cudnn / t_own))
ratio_sum = 0
for args, t_own, t_cudnn in results:
Reported by Pylint.
Line: 38
Column: 9
str(args), t_cudnn, t_own, t_cudnn / t_own))
ratio_sum = 0
for args, t_own, t_cudnn in results:
ratio = float(t_cudnn) / t_own
ratio_sum += ratio
print("hidden_dim: {}, seq_lengths: {}, batch_size: {}, num_layers: {}:"
" cudnn time: {}, own time: {}, ratio: {}".format(
args.hidden_dim, args.seq_length, args.batch_size,
Reported by Pylint.
caffe2/python/task_test.py
7 issues
Line: 24
Column: 26
def testEffectlessRepr(self):
task_group = task.TaskGroup()
_repr = task_group.__repr__()
self.assertFalse(task_group._already_used)
Reported by Pylint.
Line: 1
Column: 1
import unittest
from caffe2.python import task
class TestTask(unittest.TestCase):
def testRepr(self):
cases = [
(task.Cluster(), "Cluster(nodes=[], node_kwargs={})"),
(task.Node(), "Node(name=local, kwargs={})"),
Reported by Pylint.
Line: 5
Column: 1
from caffe2.python import task
class TestTask(unittest.TestCase):
def testRepr(self):
cases = [
(task.Cluster(), "Cluster(nodes=[], node_kwargs={})"),
(task.Node(), "Node(name=local, kwargs={})"),
(
Reported by Pylint.
Line: 6
Column: 5
class TestTask(unittest.TestCase):
def testRepr(self):
cases = [
(task.Cluster(), "Cluster(nodes=[], node_kwargs={})"),
(task.Node(), "Node(name=local, kwargs={})"),
(
task.TaskGroup(),
Reported by Pylint.
Line: 6
Column: 5
class TestTask(unittest.TestCase):
def testRepr(self):
cases = [
(task.Cluster(), "Cluster(nodes=[], node_kwargs={})"),
(task.Node(), "Node(name=local, kwargs={})"),
(
task.TaskGroup(),
Reported by Pylint.
Line: 21
Column: 5
for obj, want in cases:
self.assertEqual(obj.__repr__(), want)
def testEffectlessRepr(self):
task_group = task.TaskGroup()
_repr = task_group.__repr__()
self.assertFalse(task_group._already_used)
Reported by Pylint.
Line: 21
Column: 5
for obj, want in cases:
self.assertEqual(obj.__repr__(), want)
def testEffectlessRepr(self):
task_group = task.TaskGroup()
_repr = task_group.__repr__()
self.assertFalse(task_group._already_used)
Reported by Pylint.
caffe2/python/test/blob_deallocation_test.py
7 issues
Line: 24
Column: 9
workspace.ResetWorkspace()
workspace.RunNetOnce(net)
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
import unittest
core.GlobalInit(['python'])
Reported by Pylint.
Line: 6
Column: 1
from caffe2.python import core, workspace
import unittest
core.GlobalInit(['python'])
class BlobDeallocationTest(unittest.TestCase):
Reported by Pylint.
Line: 11
Column: 1
core.GlobalInit(['python'])
class BlobDeallocationTest(unittest.TestCase):
def test(self):
net = core.Net('net')
x = net.GivenTensorStringFill([], ['x'], shape=[3], values=['a', 'b', 'c'])
y = net.GivenTensorStringFill([], ['y'], shape=[3], values=['d', 'e', 'f'])
Reported by Pylint.
Line: 12
Column: 5
class BlobDeallocationTest(unittest.TestCase):
def test(self):
net = core.Net('net')
x = net.GivenTensorStringFill([], ['x'], shape=[3], values=['a', 'b', 'c'])
y = net.GivenTensorStringFill([], ['y'], shape=[3], values=['d', 'e', 'f'])
net.Concat([x, y], ['concated', '_'], axis=0)
Reported by Pylint.
Line: 15
Column: 9
def test(self):
net = core.Net('net')
x = net.GivenTensorStringFill([], ['x'], shape=[3], values=['a', 'b', 'c'])
y = net.GivenTensorStringFill([], ['y'], shape=[3], values=['d', 'e', 'f'])
net.Concat([x, y], ['concated', '_'], axis=0)
workspace.ResetWorkspace()
workspace.RunNetOnce(net)
Reported by Pylint.
Line: 16
Column: 9
net = core.Net('net')
x = net.GivenTensorStringFill([], ['x'], shape=[3], values=['a', 'b', 'c'])
y = net.GivenTensorStringFill([], ['y'], shape=[3], values=['d', 'e', 'f'])
net.Concat([x, y], ['concated', '_'], axis=0)
workspace.ResetWorkspace()
workspace.RunNetOnce(net)
Reported by Pylint.