The following issues were found
caffe2/python/db_file_reader.py
14 issues
Line: 51
Column: 27
field_names=None,
):
assert db_path is not None, "db_path can't be None."
assert db_type in C.registered_dbs(), \
"db_type [{db_type}] is not available. \n" \
"Choose one of these: {registered_dbs}.".format(
db_type=db_type,
registered_dbs=C.registered_dbs(),
)
Reported by Pylint.
Line: 55
Column: 32
"db_type [{db_type}] is not available. \n" \
"Choose one of these: {registered_dbs}.".format(
db_type=db_type,
registered_dbs=C.registered_dbs(),
)
self.db_path = os.path.expanduser(db_path)
self.db_type = db_type
self.name = name or '{db_name}_{default_name_suffix}'.format(
Reported by Pylint.
Line: 16
Column: 1
import os
class DBFileReader(Reader):
default_name_suffix = 'db_file_reader'
"""Reader reads from a DB file.
Reported by Pylint.
Line: 1
Column: 1
## @package db_file_reader
# Module caffe2.python.db_file_reader
from caffe2.python import core, scope, workspace, _import_c_extension as C
from caffe2.python.dataio import Reader
Reported by Pylint.
Line: 13
Column: 1
from caffe2.python.dataset import Dataset
from caffe2.python.schema import from_column_list
import os
class DBFileReader(Reader):
default_name_suffix = 'db_file_reader'
Reported by Pylint.
Line: 16
Column: 1
import os
class DBFileReader(Reader):
default_name_suffix = 'db_file_reader'
"""Reader reads from a DB file.
Reported by Pylint.
Line: 41
Column: 5
Otherwise, schema will be automatically restored with
schema.field_names() sorted in alphabetic order.
"""
def __init__(
self,
db_path,
db_type,
name=None,
batch_size=100,
Reported by Pylint.
Line: 50
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
loop_over=False,
field_names=None,
):
assert db_path is not None, "db_path can't be None."
assert db_type in C.registered_dbs(), \
"db_type [{db_type}] is not available. \n" \
"Choose one of these: {registered_dbs}.".format(
db_type=db_type,
registered_dbs=C.registered_dbs(),
Reported by Bandit.
Line: 51
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
field_names=None,
):
assert db_path is not None, "db_path can't be None."
assert db_type in C.registered_dbs(), \
"db_type [{db_type}] is not available. \n" \
"Choose one of these: {registered_dbs}.".format(
db_type=db_type,
registered_dbs=C.registered_dbs(),
)
Reported by Bandit.
Line: 69
Column: 9
# Before self._init_reader_schema(...),
# self.db_path and self.db_type are required to be set.
super(DBFileReader, self).__init__(self._init_reader_schema(field_names))
self.ds = Dataset(self._schema, self.name + '_dataset')
self.ds_reader = None
def _init_name(self, name):
return name or self._extract_db_name_from_db_path(
Reported by Pylint.
benchmarks/framework_overhead_benchmark/framework_overhead_benchmark.py
14 issues
Line: 29
Column: 15
SUPPORTED_OPS = {"add_op"}
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
print("{}, latency per iter (us):{}".format(key, ms_to_us(value)))
Reported by Pylint.
Line: 8
Column: 1
from SimpleAddModule import SimpleAddModule, add_tensors_loop
from pt_wrapper_module import WrapperModule
""" Framework overhead benchmark script.
Benchmark framework overhead.
Currently supported ops: add.
As of now runs only forward pass.
Supports both graph mode and eager mode. In graph mode the module is traced via JIT tracing.
Debug option prints the traced graph is graph_mode is enabled.
Reported by Pylint.
Line: 28
Column: 19
SUPPORTED_OPS = {"add_op"}
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
Reported by Pylint.
Line: 29
Column: 5
SUPPORTED_OPS = {"add_op"}
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
print("{}, latency per iter (us):{}".format(key, ms_to_us(value)))
Reported by Pylint.
Line: 1
Column: 1
from utils import ms_to_us, benchmark_module, BenchmarkConfig, ModuleConfig
import argparse
from C2Module import C2SimpleNet
from SimpleAddModule import SimpleAddModule, add_tensors_loop
from pt_wrapper_module import WrapperModule
""" Framework overhead benchmark script.
Benchmark framework overhead.
Reported by Pylint.
Line: 2
Column: 1
from utils import ms_to_us, benchmark_module, BenchmarkConfig, ModuleConfig
import argparse
from C2Module import C2SimpleNet
from SimpleAddModule import SimpleAddModule, add_tensors_loop
from pt_wrapper_module import WrapperModule
""" Framework overhead benchmark script.
Benchmark framework overhead.
Reported by Pylint.
Line: 28
Column: 1
SUPPORTED_OPS = {"add_op"}
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
Reported by Pylint.
Line: 28
Column: 1
SUPPORTED_OPS = {"add_op"}
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
Reported by Pylint.
Line: 31
Column: 1
def parse_op_args(op):
op_list = ops.split(",")
def print_results(result):
print("===================================")
for key, value in result.items():
print("{}, latency per iter (us):{}".format(key, ms_to_us(value)))
print("===================================")
Reported by Pylint.
Line: 46
Column: 1
module_config: module_config which contains op, number of parameters that op takes
and whether graph mode is enabled or not.
module_type: Type of the module to be wrapped. e.g. SimpleAddModule for add op.
result: dictionary instance to be populated with the benchmark result (latency per iter).
"""
benchmark_c2_net = args.benchmark_c2_net
print("Benchmarking {}".format(module_type.__name__))
if benchmark_c2_net:
op_name = module_config.c2_op
Reported by Pylint.
benchmarks/instruction_counts/core/api.py
14 issues
Line: 15
Column: 5
# Benchmark utils are only partially strict compliant, so MyPy won't follow
# imports using the public namespace. (Due to an exclusion rule in
# mypy-strict.ini)
from torch.utils.benchmark.utils.timer import Language
else:
from torch.utils.benchmark import Language
# Note:
Reported by Pylint.
Line: 17
Column: 5
# mypy-strict.ini)
from torch.utils.benchmark.utils.timer import Language
else:
from torch.utils.benchmark import Language
# Note:
# WorkerTimerArgs is defined in worker.main so that the worker does not
# depend on any files, including core.api. We mirror it with a public symbol
Reported by Pylint.
Line: 44
Column: 15
"""Labels for a TimerArgs instance which are inferred during unpacking."""
runtime: RuntimeMode
autograd: AutogradMode
language: Language
@property
def as_dict(self) -> Dict[str, str]:
"""Dict representation for CI reporting."""
return {
Reported by Pylint.
Line: 27
Column: 1
TimerArgs = WorkerTimerArgs
class RuntimeMode(enum.Enum):
EAGER = "Eager"
JIT = "TorchScript"
EXPLICIT = ""
Reported by Pylint.
Line: 33
Column: 1
EXPLICIT = ""
class AutogradMode(enum.Enum):
FORWARD = "Forward"
FORWARD_BACKWARD = "Forward + Backward"
EXPLICIT = ""
Reported by Pylint.
Line: 57
Column: 1
@dataclasses.dataclass(frozen=True)
class GroupedSetup:
py_setup: str = ""
cpp_setup: str = ""
global_setup: str = ""
def __post_init__(self) -> None:
Reported by Pylint.
Line: 64
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def __post_init__(self) -> None:
for field in dataclasses.fields(self):
assert field.type == str
value: str = getattr(self, field.name)
object.__setattr__(self, field.name, textwrap.dedent(value))
@dataclasses.dataclass(frozen=True)
Reported by Bandit.
Line: 70
Column: 1
@dataclasses.dataclass(frozen=True)
class GroupedBenchmark:
"""Base class for defining groups of benchmarks.
Concrete interfaces:
- `core.api.GroupedStmts` (init_from_stmts)
- `core.api.GroupedModules` (init_from_model)
Reported by Pylint.
Line: 164
Column: 5
num_threads: Tuple[int, ...]
@classmethod
def init_from_stmts(
cls,
py_stmt: Optional[str] = None,
cpp_stmt: Optional[str] = None,
# Generic constructor arguments
Reported by Pylint.
Line: 211
Column: 5
)
@classmethod
def init_from_model(
cls,
py_model_setup: Optional[str] = None,
cpp_model_setup: Optional[str] = None,
# Generic constructor arguments
Reported by Pylint.
caffe2/python/binarysize.py
14 issues
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import argparse
import subprocess
import sys
class Trie(object):
"""A simple class that represents a Trie."""
Reported by Bandit.
Line: 27
Column: 1
import sys
class Trie(object):
"""A simple class that represents a Trie."""
def __init__(self, name):
"""Initializes a Trie object."""
self.name = name
Reported by Pylint.
Line: 27
Column: 1
import sys
class Trie(object):
"""A simple class that represents a Trie."""
def __init__(self, name):
"""Initializes a Trie object."""
self.name = name
Reported by Pylint.
Line: 37
Column: 1
self.dictionary = {}
def GetSymbolTrie(target, nm_command, max_depth):
"""Gets a symbol trie with the passed in target.
Args:
target: the target binary to inspect.
nm_command: the command to run nm.
Reported by Pylint.
Line: 46
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
max_depth: the maximum depth to create the trie.
"""
# Run nm to get a dump on the strings.
proc = subprocess.Popen(
[nm_command, '--radix=d', '--size-sort', '--print-size', target],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
nm_out, _ = proc.communicate()
if proc.returncode != 0:
print('NM command failed. Output is as follows:')
Reported by Bandit.
Line: 55
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b607_start_process_with_partial_path.html
print(nm_out)
sys.exit(1)
# Run c++filt to get proper symbols.
proc = subprocess.Popen(['c++filt'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = proc.communicate(input=nm_out)
if proc.returncode != 0:
print('c++filt failed. Output is as follows:')
Reported by Bandit.
Line: 55
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
print(nm_out)
sys.exit(1)
# Run c++filt to get proper symbols.
proc = subprocess.Popen(['c++filt'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = proc.communicate(input=nm_out)
if proc.returncode != 0:
print('c++filt failed. Output is as follows:')
Reported by Bandit.
Line: 75
Column: 13
symbol_trie = Trie('')
for size, name in data:
curr = symbol_trie
for c in name:
if c not in curr.dictionary:
curr.dictionary[c] = Trie(curr.name + c)
curr = curr.dictionary[c]
curr.size += size
if len(curr.name) > max_depth:
Reported by Pylint.
Line: 86
Column: 1
return symbol_trie
def MaybeAddColor(s, color):
"""Wrap the input string to the xterm green color, if color is set.
"""
if color:
return '\033[92m{0}\033[0m'.format(s)
else:
Reported by Pylint.
Line: 86
Column: 1
return symbol_trie
def MaybeAddColor(s, color):
"""Wrap the input string to the xterm green color, if color is set.
"""
if color:
return '\033[92m{0}\033[0m'.format(s)
else:
Reported by Pylint.
benchmarks/operator_benchmark/pt/groupnorm_test.py
14 issues
Line: 3
Column: 1
import operator_benchmark as op_bench
import torch
import torch.nn.functional as F
"""Microbenchmarks for groupnorm operator."""
groupnorm_configs_short = op_bench.cross_product_configs(
Reported by Pylint.
Line: 4
Column: 1
import operator_benchmark as op_bench
import torch
import torch.nn.functional as F
"""Microbenchmarks for groupnorm operator."""
groupnorm_configs_short = op_bench.cross_product_configs(
Reported by Pylint.
Line: 9
Column: 27
"""Microbenchmarks for groupnorm operator."""
groupnorm_configs_short = op_bench.cross_product_configs(
dims=(
(32, 8, 16),
(32, 8, 56, 56),
),
num_groups=(2, 4),
Reported by Pylint.
Line: 19
Column: 26
)
class GroupNormBenchmark(op_bench.TorchBenchmarkBase):
def init(self, dims, num_groups):
num_channels = dims[1]
self.inputs = {
"input": (torch.rand(*dims) - 0.5) * 256,
"num_groups": num_groups,
Reported by Pylint.
Line: 35
Column: 1
input, num_groups, weight=weight, bias=bias, eps=eps)
op_bench.generate_pt_test(groupnorm_configs_short, GroupNormBenchmark)
if __name__ == "__main__":
op_bench.benchmark_runner.main()
Reported by Pylint.
Line: 7
Column: 1
import torch.nn.functional as F
"""Microbenchmarks for groupnorm operator."""
groupnorm_configs_short = op_bench.cross_product_configs(
dims=(
(32, 8, 16),
(32, 8, 56, 56),
Reported by Pylint.
Line: 22
Column: 9
class GroupNormBenchmark(op_bench.TorchBenchmarkBase):
def init(self, dims, num_groups):
num_channels = dims[1]
self.inputs = {
"input": (torch.rand(*dims) - 0.5) * 256,
"num_groups": num_groups,
"weight": torch.rand(num_channels, dtype=torch.float),
"bias": torch.rand(num_channels, dtype=torch.float),
"eps": 1e-5
Reported by Pylint.
Line: 30
Column: 23
"eps": 1e-5
}
def forward(self, input, num_groups: int, weight, bias, eps: float):
return F.group_norm(
input, num_groups, weight=weight, bias=bias, eps=eps)
op_bench.generate_pt_test(groupnorm_configs_short, GroupNormBenchmark)
Reported by Pylint.
Line: 1
Column: 1
import operator_benchmark as op_bench
import torch
import torch.nn.functional as F
"""Microbenchmarks for groupnorm operator."""
groupnorm_configs_short = op_bench.cross_product_configs(
Reported by Pylint.
Line: 19
Column: 1
)
class GroupNormBenchmark(op_bench.TorchBenchmarkBase):
def init(self, dims, num_groups):
num_channels = dims[1]
self.inputs = {
"input": (torch.rand(*dims) - 0.5) * 256,
"num_groups": num_groups,
Reported by Pylint.
benchmarks/operator_benchmark/pt/gather_test.py
14 issues
Line: 2
Column: 1
import operator_benchmark as op_bench
import torch
import numpy
"""Microbenchmarks for gather operator."""
# An example input from this configuration is M=4, N=4, dim=0.
gather_configs_short = op_bench.config_list(
Reported by Pylint.
Line: 9
Column: 24
"""Microbenchmarks for gather operator."""
# An example input from this configuration is M=4, N=4, dim=0.
gather_configs_short = op_bench.config_list(
attr_names=["M", "N", "dim"],
attrs=[
[256, 512, 0],
[512, 512, 1],
],
Reported by Pylint.
Line: 22
Column: 23
)
gather_configs_long = op_bench.cross_product_configs(
M=[128, 1024],
N=[128, 1024],
dim=[0, 1],
device=['cpu', 'cuda'],
tags=["long"]
Reported by Pylint.
Line: 31
Column: 23
)
class GatherBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N, dim, device):
min_val = M if dim == 0 else N
numpy.random.seed((1 << 32) - 1)
self.inputs = {
"input_one": torch.rand(M, N, device=device),
Reported by Pylint.
Line: 46
Column: 1
return torch.gather(input_one, dim, index)
op_bench.generate_pt_test(gather_configs_short + gather_configs_long,
GatherBenchmark)
if __name__ == "__main__":
op_bench.benchmark_runner.main()
Reported by Pylint.
Line: 6
Column: 1
import numpy
"""Microbenchmarks for gather operator."""
# An example input from this configuration is M=4, N=4, dim=0.
gather_configs_short = op_bench.config_list(
attr_names=["M", "N", "dim"],
attrs=[
Reported by Pylint.
Line: 35
Column: 9
def init(self, M, N, dim, device):
min_val = M if dim == 0 else N
numpy.random.seed((1 << 32) - 1)
self.inputs = {
"input_one": torch.rand(M, N, device=device),
"dim": dim,
"index": torch.tensor(numpy.random.randint(0, min_val, (M, N)), device=device)
}
self.set_module_name("gather")
Reported by Pylint.
Line: 1
Column: 1
import operator_benchmark as op_bench
import torch
import numpy
"""Microbenchmarks for gather operator."""
# An example input from this configuration is M=4, N=4, dim=0.
gather_configs_short = op_bench.config_list(
Reported by Pylint.
Line: 31
Column: 1
)
class GatherBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N, dim, device):
min_val = M if dim == 0 else N
numpy.random.seed((1 << 32) - 1)
self.inputs = {
"input_one": torch.rand(M, N, device=device),
Reported by Pylint.
Line: 32
Column: 5
class GatherBenchmark(op_bench.TorchBenchmarkBase):
def init(self, M, N, dim, device):
min_val = M if dim == 0 else N
numpy.random.seed((1 << 32) - 1)
self.inputs = {
"input_one": torch.rand(M, N, device=device),
"dim": dim,
Reported by Pylint.
.github/scripts/generate_pytorch_test_matrix.py
13 issues
Line: 15
Column: 1
import re
from typing import Dict
from typing_extensions import TypedDict
class Config(TypedDict):
num_shards: int
runner: str
Reported by Pylint.
Line: 18
Column: 1
from typing_extensions import TypedDict
class Config(TypedDict):
num_shards: int
runner: str
def get_disabled_issues() -> str:
Reported by Pylint.
Line: 18
Column: 1
from typing_extensions import TypedDict
class Config(TypedDict):
num_shards: int
runner: str
def get_disabled_issues() -> str:
Reported by Pylint.
Line: 23
Column: 1
runner: str
def get_disabled_issues() -> str:
pr_body = os.getenv('PR_BODY', '')
# The below regex is meant to match all *case-insensitive* keywords that
# GitHub has delineated would link PRs to issues, more details here:
# https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue.
# E.g., "Close #62851", "fixES #62851" and "RESOLVED #62851" would all match, but not
Reported by Pylint.
Line: 35
Column: 1
return ','.join(issue_numbers)
def main() -> None:
TEST_RUNNER_TYPE = os.getenv('TEST_RUNNER_TYPE')
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
Reported by Pylint.
Line: 36
Column: 5
def main() -> None:
TEST_RUNNER_TYPE = os.getenv('TEST_RUNNER_TYPE')
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
if ON_PULL_REQUEST and NUM_TEST_SHARDS_ON_PULL_REQUEST:
Reported by Pylint.
Line: 37
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def main() -> None:
TEST_RUNNER_TYPE = os.getenv('TEST_RUNNER_TYPE')
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
if ON_PULL_REQUEST and NUM_TEST_SHARDS_ON_PULL_REQUEST:
NUM_TEST_SHARDS = int(NUM_TEST_SHARDS_ON_PULL_REQUEST)
Reported by Bandit.
Line: 38
Column: 5
def main() -> None:
TEST_RUNNER_TYPE = os.getenv('TEST_RUNNER_TYPE')
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
if ON_PULL_REQUEST and NUM_TEST_SHARDS_ON_PULL_REQUEST:
NUM_TEST_SHARDS = int(NUM_TEST_SHARDS_ON_PULL_REQUEST)
MULTIGPU_RUNNER_TYPE = os.getenv('MULTIGPU_RUNNER_TYPE')
Reported by Pylint.
Line: 39
Column: 5
TEST_RUNNER_TYPE = os.getenv('TEST_RUNNER_TYPE')
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
if ON_PULL_REQUEST and NUM_TEST_SHARDS_ON_PULL_REQUEST:
NUM_TEST_SHARDS = int(NUM_TEST_SHARDS_ON_PULL_REQUEST)
MULTIGPU_RUNNER_TYPE = os.getenv('MULTIGPU_RUNNER_TYPE')
NOGPU_RUNNER_TYPE = os.getenv('NOGPU_RUNNER_TYPE')
Reported by Pylint.
Line: 40
Column: 5
assert TEST_RUNNER_TYPE is not None
ON_PULL_REQUEST = os.getenv('GITHUB_HEAD_REF')
NUM_TEST_SHARDS_ON_PULL_REQUEST = os.getenv('NUM_TEST_SHARDS_ON_PULL_REQUEST')
NUM_TEST_SHARDS = int(os.getenv('NUM_TEST_SHARDS', '1'))
if ON_PULL_REQUEST and NUM_TEST_SHARDS_ON_PULL_REQUEST:
NUM_TEST_SHARDS = int(NUM_TEST_SHARDS_ON_PULL_REQUEST)
MULTIGPU_RUNNER_TYPE = os.getenv('MULTIGPU_RUNNER_TYPE')
NOGPU_RUNNER_TYPE = os.getenv('NOGPU_RUNNER_TYPE')
configs: Dict[str, Config] = {}
Reported by Pylint.
caffe2/python/fused_8bit_rowwise_conversion_ops_test.py
13 issues
Line: 11
Column: 1
import numpy as np
import struct
from hypothesis import given
# Eigen/Python round 0.5 away from 0, Numpy rounds to even
round_to_nearest = np.vectorize(round)
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import numpy as np
Reported by Pylint.
Line: 10
Column: 1
import caffe2.python.hypothesis_test_util as hu
import numpy as np
import struct
from hypothesis import given
# Eigen/Python round 0.5 away from 0, Numpy rounds to even
round_to_nearest = np.vectorize(round)
Reported by Pylint.
Line: 17
Column: 1
round_to_nearest = np.vectorize(round)
def bytes_to_floats(byte_matrix):
floats = np.empty([np.shape(byte_matrix)[0], 1], dtype=np.float32)
for i, byte_values in enumerate(byte_matrix):
floats[i], = struct.unpack('f', bytearray(byte_values))
return floats
Reported by Pylint.
Line: 24
Column: 1
return floats
def floats_to_bytes(floats):
byte_matrix = np.empty([np.shape(floats)[0], 4], dtype=np.uint8)
for i, value in enumerate(floats):
assert isinstance(value, np.float32), (value, floats)
as_bytes = struct.pack('f', value)
# In Python3 bytes will be a list of int, in Python2 a list of string
Reported by Pylint.
Line: 27
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def floats_to_bytes(floats):
byte_matrix = np.empty([np.shape(floats)[0], 4], dtype=np.uint8)
for i, value in enumerate(floats):
assert isinstance(value, np.float32), (value, floats)
as_bytes = struct.pack('f', value)
# In Python3 bytes will be a list of int, in Python2 a list of string
if isinstance(as_bytes[0], int):
byte_matrix[i] = list(as_bytes)
else:
Reported by Bandit.
Line: 37
Column: 1
return byte_matrix
def fused_rowwise_8bit_quantize_reference(data):
minimum = np.min(data, axis=-1, keepdims=True)
maximum = np.max(data, axis=-1, keepdims=True)
span = maximum - minimum
bias = minimum
scale = span / 255.0
Reported by Pylint.
Line: 53
Column: 1
return np.concatenate([quantized_data, scale_bytes, bias_bytes], axis=-1)
def fused_rowwise_8bit_quantize_dequantize_reference(data):
fused_quantized = fused_rowwise_8bit_quantize_reference(data)
scale = bytes_to_floats(fused_quantized[..., -8:-4].astype(np.uint8).reshape(-1, 4))
scale = scale.reshape(fused_quantized.shape[:-1] + (scale.shape[-1],))
bias = bytes_to_floats(fused_quantized[..., -4:].astype(np.uint8).reshape(-1, 4))
bias = bias.reshape(fused_quantized.shape[:-1] + (bias.shape[-1],))
Reported by Pylint.
Line: 63
Column: 1
return quantized_data * scale + bias
class TestFused8BitRowwiseQuantizationConversion(hu.HypothesisTestCase):
@given(input_data=hu.tensor(min_dim=1, max_dim=3, max_value=33))
def test_quantize_op(self, input_data):
quantize = core.CreateOperator(
'FloatToFused8BitRowwiseQuantized',
['input_data'],
Reported by Pylint.
Line: 65
Column: 5
class TestFused8BitRowwiseQuantizationConversion(hu.HypothesisTestCase):
@given(input_data=hu.tensor(min_dim=1, max_dim=3, max_value=33))
def test_quantize_op(self, input_data):
quantize = core.CreateOperator(
'FloatToFused8BitRowwiseQuantized',
['input_data'],
['quantized_data'],
)
Reported by Pylint.
caffe2/python/mkl/mkl_relu_op_test.py
13 issues
Line: 7
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.mkl_test_util as mu
Reported by Pylint.
Line: 8
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.mkl_test_util as mu
Reported by Pylint.
Line: 15
Column: 22
import caffe2.python.mkl_test_util as mu
@unittest.skipIf(not workspace.C.has_mkldnn,
"Skipping as we do not have mkldnn.")
class MKLReluTest(hu.HypothesisTestCase):
@given(size=st.integers(8, 20),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
Reported by Pylint.
Line: 23
Column: 72
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_relu(self, size, input_channels, batch_size, inplace, gc, dc):
op = core.CreateOperator(
"Relu",
["X"],
["Y"] if not inplace else ["X"],
)
Reported by Pylint.
Line: 35
Column: 5
if __name__ == "__main__":
import unittest
unittest.main()
Reported by Pylint.
Line: 1
Column: 1
import unittest
import hypothesis.strategies as st
from hypothesis import given
import numpy as np
Reported by Pylint.
Line: 17
Column: 1
@unittest.skipIf(not workspace.C.has_mkldnn,
"Skipping as we do not have mkldnn.")
class MKLReluTest(hu.HypothesisTestCase):
@given(size=st.integers(8, 20),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_relu(self, size, input_channels, batch_size, inplace, gc, dc):
op = core.CreateOperator(
"Relu",
["X"],
["Y"] if not inplace else ["X"],
)
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_relu(self, size, input_channels, batch_size, inplace, gc, dc):
op = core.CreateOperator(
"Relu",
["X"],
["Y"] if not inplace else ["X"],
)
Reported by Pylint.
Line: 23
Column: 5
batch_size=st.integers(1, 3),
inplace=st.booleans(),
**mu.gcs)
def test_mkl_relu(self, size, input_channels, batch_size, inplace, gc, dc):
op = core.CreateOperator(
"Relu",
["X"],
["Y"] if not inplace else ["X"],
)
Reported by Pylint.
caffe2/python/layers/functional.py
13 issues
Line: 12
Column: 1
from caffe2.python.layers.layers import (
ModelLayer,
)
import caffe2.proto.caffe2_pb2 as caffe2_pb2
import numpy as np
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Reported by Pylint.
Line: 92
Column: 36
elif shapes[blob][0] == 0:
shape = tuple(shapes[blob][1:])
else:
logger.warning("unexpected shape: {}".format(shapes[blob]))
# If batch dimension is not first - give up on shape
# inference for that blob
had_issues = True
continue
Reported by Pylint.
Line: 98
Column: 3
had_issues = True
continue
# TODO(amalevich): Move it to some shared library
dtype = None
if types[blob] == caffe2_pb2.TensorProto.DOUBLE:
dtype = (np.float64, shape)
elif types[blob] == caffe2_pb2.TensorProto.FLOAT:
dtype = (np.float32, shape)
Reported by Pylint.
Line: 119
Column: 17
if had_issues:
logger.warning(
"Type inference had problems for layer: {}".format(self.name))
def add_ops(self, net):
self._function(
net, self.input_record, self.output_schema, **(self._kwargs))
Reported by Pylint.
Line: 1
Column: 1
# @package functional
# Module caffe2.python.layers.functional
from caffe2.python import core, schema, scope, workspace
from caffe2.python.layers.layers import (
Reported by Pylint.
Line: 14
Column: 1
)
import caffe2.proto.caffe2_pb2 as caffe2_pb2
import numpy as np
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Reported by Pylint.
Line: 20
Column: 1
logger.setLevel(logging.INFO)
class Functional(ModelLayer):
def __init__(self, model, input_record, output_names_or_num, function,
name='functional', output_dtypes=None, tags=None, **kwargs):
# allow coercion
Reported by Pylint.
Line: 22
Column: 5
class Functional(ModelLayer):
def __init__(self, model, input_record, output_names_or_num, function,
name='functional', output_dtypes=None, tags=None, **kwargs):
# allow coercion
input_record = schema.as_record(input_record)
Reported by Pylint.
Line: 22
Column: 5
class Functional(ModelLayer):
def __init__(self, model, input_record, output_names_or_num, function,
name='functional', output_dtypes=None, tags=None, **kwargs):
# allow coercion
input_record = schema.as_record(input_record)
Reported by Pylint.
Line: 22
Column: 5
class Functional(ModelLayer):
def __init__(self, model, input_record, output_names_or_num, function,
name='functional', output_dtypes=None, tags=None, **kwargs):
# allow coercion
input_record = schema.as_record(input_record)
Reported by Pylint.