The following issues were found
torch/utils/benchmark/utils/compare.py
48 issues
Line: 8
Column: 1
from typing import DefaultDict, List, Optional, Tuple
from torch.utils.benchmark.utils import common
from torch import tensor as _tensor
__all__ = ["Compare"]
BEST = "\033[92m"
GOOD = "\033[34m"
Reported by Pylint.
Line: 20
Column: 1
TERMINATE = "\033[0m"
class Colorize(enum.Enum):
NONE = "none"
COLUMNWISE = "columnwise"
ROWWISE = "rowwise"
Reported by Pylint.
Line: 27
Column: 1
# Classes to separate internal bookkeeping from what is rendered.
class _Column(object):
def __init__(
self,
grouped_results: List[Tuple[Optional[common.Measurement], ...]],
time_scale: float,
time_unit: str,
Reported by Pylint.
Line: 28
Column: 5
# Classes to separate internal bookkeeping from what is rendered.
class _Column(object):
def __init__(
self,
grouped_results: List[Tuple[Optional[common.Measurement], ...]],
time_scale: float,
time_unit: str,
trim_significant_figures: bool,
Reported by Pylint.
Line: 56
Column: 1
if (m is not None) and (digits is not None)
) if self._trim_significant_figures else 1
length = unit_digits + decimal_digits + (1 if decimal_digits else 0)
self._template = f"{{:>{length}.{decimal_digits}f}}{{:>{7 if self._highlight_warnings else 0}}}"
def get_results_for(self, group):
return self._grouped_results[group]
def num_to_str(self, value: Optional[float], estimated_sigfigs: int, spread: Optional[float]):
Reported by Pylint.
Line: 58
Column: 5
length = unit_digits + decimal_digits + (1 if decimal_digits else 0)
self._template = f"{{:>{length}.{decimal_digits}f}}{{:>{7 if self._highlight_warnings else 0}}}"
def get_results_for(self, group):
return self._grouped_results[group]
def num_to_str(self, value: Optional[float], estimated_sigfigs: int, spread: Optional[float]):
if value is None:
return " " * len(self.num_to_str(1, estimated_sigfigs, None))
Reported by Pylint.
Line: 61
Column: 5
def get_results_for(self, group):
return self._grouped_results[group]
def num_to_str(self, value: Optional[float], estimated_sigfigs: int, spread: Optional[float]):
if value is None:
return " " * len(self.num_to_str(1, estimated_sigfigs, None))
if self._trim_significant_figures:
value = common.trim_sigfig(value, estimated_sigfigs)
Reported by Pylint.
Line: 73
Column: 1
f" (! {spread * 100:.0f}%)" if self._highlight_warnings and spread is not None else "")
def optional_min(seq):
l = list(seq)
return None if len(l) == 0 else min(l)
class _Row(object):
Reported by Pylint.
Line: 74
Column: 5
def optional_min(seq):
l = list(seq)
return None if len(l) == 0 else min(l)
class _Row(object):
def __init__(self, results, row_group, render_env, env_str_len,
Reported by Pylint.
Line: 78
Column: 1
return None if len(l) == 0 else min(l)
class _Row(object):
def __init__(self, results, row_group, render_env, env_str_len,
row_name_str_len, time_scale, colorize, num_threads=None):
super(_Row, self).__init__()
self._results = results
self._row_group = row_group
Reported by Pylint.
test/distributed/pipeline/sync/skip/test_gpipe.py
48 issues
Line: 7
Column: 1
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import pytest
import torch
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
Reported by Pylint.
Line: 8
Column: 1
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import pytest
import torch
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
Reported by Pylint.
Line: 9
Column: 1
# LICENSE file in the root directory of this source tree.
import pytest
import torch
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
from torch.distributed.pipeline.sync.utils import partition_model
Reported by Pylint.
Line: 11
Column: 1
import torch
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
from torch.distributed.pipeline.sync.utils import partition_model
Reported by Pylint.
Line: 12
Column: 1
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
from torch.distributed.pipeline.sync.utils import partition_model
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
Reported by Pylint.
Line: 13
Column: 1
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
from torch.distributed.pipeline.sync.utils import partition_model
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
@pytest.mark.parametrize("balance", [[3], [1, 2], [2, 1], [1, 1, 1]], ids=["3", "1:2", "2:1", "1:1:1"])
Reported by Pylint.
Line: 14
Column: 1
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
from torch.distributed.pipeline.sync.utils import partition_model
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
@pytest.mark.parametrize("balance", [[3], [1, 2], [2, 1], [1, 1, 1]], ids=["3", "1:2", "2:1", "1:1:1"])
@pytest.mark.parametrize("checkpoint", ["never", "always", "except_last"])
Reported by Pylint.
Line: 20
Column: 36
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
@pytest.mark.parametrize("balance", [[3], [1, 2], [2, 1], [1, 1, 1]], ids=["3", "1:2", "2:1", "1:1:1"])
@pytest.mark.parametrize("checkpoint", ["never", "always", "except_last"])
def test_1to3(balance, checkpoint, setup_rpc):
if torch.cuda.device_count() < len(balance):
pytest.skip("at least %d cuda devices required" % len(balance))
@skippable(stash=["1to3"])
class Layer1(nn.Module):
Reported by Pylint.
Line: 30
Column: 27
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
def forward(self, input):
yield stash("1to3", input)
output = self.conv(input)
return output # noqa: B901
class Layer2(nn.Module):
Reported by Pylint.
Line: 40
Column: 27
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
def forward(self, input):
output = self.conv(input)
return output
@skippable(pop=["1to3"])
class Layer3(nn.Module):
Reported by Pylint.
caffe2/python/pipeline.py
48 issues
Line: 78
Column: 9
TODO(azzolini): simplify once all processors use NetBuilder API.
"""
if isinstance(output, Output):
""" Processor returned an Output. """
return output
elif isinstance(output, Field):
""" Processor returned a record. """
return Output(record=output)
elif isinstance(output, tuple):
Reported by Pylint.
Line: 81
Column: 9
""" Processor returned an Output. """
return output
elif isinstance(output, Field):
""" Processor returned a record. """
return Output(record=output)
elif isinstance(output, tuple):
is_record_and_blob = (
len(output) == 2 and
isinstance(output[0], Field) and
Reported by Pylint.
Line: 89
Column: 13
isinstance(output[0], Field) and
isinstance(output[1], core.BlobReference))
if is_record_and_blob:
""" Processor returned (record, stop_blob) """
return Output(None, *output)
else:
""" Processor returned (nets, record, stop_blob) """
return Output(*output)
else:
Reported by Pylint.
Line: 92
Column: 13
""" Processor returned (record, stop_blob) """
return Output(None, *output)
else:
""" Processor returned (nets, record, stop_blob) """
return Output(*output)
else:
""" Processor returned nets, no output """
return Output(output)
Reported by Pylint.
Line: 95
Column: 9
""" Processor returned (nets, record, stop_blob) """
return Output(*output)
else:
""" Processor returned nets, no output """
return Output(output)
def pipe(
input, output=None, num_threads=1, processor=None, name=None,
Reported by Pylint.
Line: 100
Column: 9
def pipe(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1):
"""
Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
Queue or DataStream in `output`, creates a Task that, when run, will
pipe the input into the output, using multiple parallel threads.
Reported by Pylint.
Line: 153
Column: 9
def pipe_and_output(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
"""
Similar to `pipe`, with the additional ability for the pipe Task to
return output values to the `Session` once done.
Reported by Pylint.
Line: 196
Column: 9
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs,
num_instances=num_threads) as task:
global_exit_net = core.Net('pipe:exit')
Reported by Pylint.
Line: 256
Column: 9
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs) as task:
global_exit_net = core.Net('exit')
global_init_net = core.Net('init')
Reported by Pylint.
Line: 315
Column: 9
def _pipe_step(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
"""
"""
assert num_threads <= 1 or num_runtime_threads <= 1, (
'Only one of num_threads or num_runtime_threads must be set.')
Reported by Pylint.
torch/utils/bundled_inputs.py
48 issues
Line: 361
Column: 46
return arg.clone(), ref
# Example inputs commonly come from torch.zeros, torch.ones, or torch.full.
# These can be represented compactly.
for fmt in [torch.contiguous_format, torch.channels_last]:
if arg.is_contiguous(memory_format=fmt) and (arg == arg.flatten()[0]).all().item():
return (arg.flatten()[0].clone().expand(*arg.size()),
f"{ref}.contiguous(memory_format={fmt})")
# Prevent big tensors from being bundled by default.
# TODO: Provide more useful diagnostics.
Reported by Pylint.
Line: 361
Column: 21
return arg.clone(), ref
# Example inputs commonly come from torch.zeros, torch.ones, or torch.full.
# These can be represented compactly.
for fmt in [torch.contiguous_format, torch.channels_last]:
if arg.is_contiguous(memory_format=fmt) and (arg == arg.flatten()[0]).all().item():
return (arg.flatten()[0].clone().expand(*arg.size()),
f"{ref}.contiguous(memory_format={fmt})")
# Prevent big tensors from being bundled by default.
# TODO: Provide more useful diagnostics.
Reported by Pylint.
Line: 398
Column: 12
def bundle_randn(*size, dtype=None):
"""Generate a tensor that will be inflated with torch.randn."""
stub = torch.zeros(1, dtype=dtype).expand(*size)
return InflatableArg(value=stub, fmt="torch.randn_like({})")
def bundle_large_tensor(t):
"""Wrap a tensor to allow bundling regardless of size."""
Reported by Pylint.
Line: 103
Column: 13
raise Exception("Only ScriptModule is supported.")
ignored_methods, ignored_attrs = _get_bundled_inputs_attributes_and_methods(model)
clone = torch._C._hack_do_not_use_clone_module_with_class( # type: ignore[attr-defined]
model._c,
ignored_methods,
ignored_attrs,
)
Reported by Pylint.
Line: 103
Column: 13
raise Exception("Only ScriptModule is supported.")
ignored_methods, ignored_attrs = _get_bundled_inputs_attributes_and_methods(model)
clone = torch._C._hack_do_not_use_clone_module_with_class( # type: ignore[attr-defined]
model._c,
ignored_methods,
ignored_attrs,
)
Reported by Pylint.
Line: 104
Column: 9
ignored_methods, ignored_attrs = _get_bundled_inputs_attributes_and_methods(model)
clone = torch._C._hack_do_not_use_clone_module_with_class( # type: ignore[attr-defined]
model._c,
ignored_methods,
ignored_attrs,
)
# The above cloning function returns a torch._C.scriptmodule and we need a torch.jit.scriptmodule.
Reported by Pylint.
Line: 252
Column: 9
function_arg_types = [arg.type for arg in function.schema.arguments[1:]] # type: ignore[attr-defined]
deflated_inputs_type: ListType = ListType(TupleType(function_arg_types))
model._c._register_attribute("_bundled_inputs_deflated_{name}".format(name=function_name), deflated_inputs_type, [])
if hasattr(model, "_generate_bundled_inputs_for_" + function_name):
if input_list is not None:
raise Exception(
"inputs[{name}] is not None, but _generate_bundled_inputs_for_{name} is already defined".format(
Reported by Pylint.
Line: 252
Column: 9
function_arg_types = [arg.type for arg in function.schema.arguments[1:]] # type: ignore[attr-defined]
deflated_inputs_type: ListType = ListType(TupleType(function_arg_types))
model._c._register_attribute("_bundled_inputs_deflated_{name}".format(name=function_name), deflated_inputs_type, [])
if hasattr(model, "_generate_bundled_inputs_for_" + function_name):
if input_list is not None:
raise Exception(
"inputs[{name}] is not None, but _generate_bundled_inputs_for_{name} is already defined".format(
Reported by Pylint.
Line: 275
Column: 56
deflated_inputs = []
parts = []
for inp_idx, args in enumerate(input_list):
if not isinstance(args, Tuple) and not isinstance(args, List): # type: ignore[arg-type]
raise TypeError(
"Error bundled input for function {0} idx: {1} is not a Tuple or a List".format(function_name, inp_idx)
)
deflated_args = []
parts.append("(")
Reported by Pylint.
Line: 275
Column: 24
deflated_inputs = []
parts = []
for inp_idx, args in enumerate(input_list):
if not isinstance(args, Tuple) and not isinstance(args, List): # type: ignore[arg-type]
raise TypeError(
"Error bundled input for function {0} idx: {1} is not a Tuple or a List".format(function_name, inp_idx)
)
deflated_args = []
parts.append("(")
Reported by Pylint.
torch/optim/_functional.py
48 issues
Line: 12
Column: 16
def _make_sparse(grad, grad_indices, values):
size = grad.size()
if grad_indices.numel() == 0 or values.numel() == 0:
return torch.empty_like(grad)
return torch.sparse_coo_tensor(grad_indices, values, size)
def adagrad(params: List[Tensor],
grads: List[Tensor],
Reported by Pylint.
Line: 13
Column: 12
size = grad.size()
if grad_indices.numel() == 0 or values.numel() == 0:
return torch.empty_like(grad)
return torch.sparse_coo_tensor(grad_indices, values, size)
def adagrad(params: List[Tensor],
grads: List[Tensor],
state_sums: List[Tensor],
Reported by Pylint.
Line: 90
Column: 13
exp_avg_sq.mul_(beta2).addcmul_(grad, grad.conj(), value=1 - beta2)
if amsgrad:
# Maintains the maximum of all 2nd moment running avg. till now
torch.maximum(max_exp_avg_sqs[i], exp_avg_sq, out=max_exp_avg_sqs[i])
# Use the max. for normalizing running avg. of gradient
denom = (max_exp_avg_sqs[i].sqrt() / math.sqrt(bias_correction2)).add_(eps)
else:
denom = (exp_avg_sq.sqrt() / math.sqrt(bias_correction2)).add_(eps)
Reported by Pylint.
Line: 135
Column: 13
exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
if amsgrad:
# Maintains the maximum of all 2nd moment running avg. till now
torch.maximum(max_exp_avg_sqs[i], exp_avg_sq, out=max_exp_avg_sqs[i])
# Use the max. for normalizing running avg. of gradient
denom = (max_exp_avg_sqs[i].sqrt() / math.sqrt(bias_correction2)).add_(eps)
else:
denom = (exp_avg_sq.sqrt() / math.sqrt(bias_correction2)).add_(eps)
Reported by Pylint.
Line: 170
Column: 23
buf = momentum_buffer_list[i]
if buf is None:
buf = torch.clone(d_p).detach()
momentum_buffer_list[i] = buf
else:
buf.mul_(momentum).add_(d_p, alpha=1 - dampening)
if nesterov:
Reported by Pylint.
Line: 278
Column: 41
# for dir<0, dfdx=0
# for dir>=0 dfdx=dfdx
grad = grad.clone(memory_format=torch.preserve_format)
grad[sign.eq(etaminus)] = 0
# update parameters
param.addcmul_(grad.sign(), step_size, value=-1)
Reported by Pylint.
Line: 315
Column: 20
# Update biased first moment estimate.
exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
# Update the exponentially weighted infinity norm.
norm_buf = torch.cat([
exp_inf.mul_(beta2).unsqueeze(0),
grad.abs().add_(eps).unsqueeze_(0)
], 0)
torch.amax(norm_buf, 0, keepdim=False, out=exp_inf)
Reported by Pylint.
Line: 319
Column: 9
exp_inf.mul_(beta2).unsqueeze(0),
grad.abs().add_(eps).unsqueeze_(0)
], 0)
torch.amax(norm_buf, 0, keepdim=False, out=exp_inf)
bias_correction = 1 - beta1 ** step
clr = lr / bias_correction
param.addcdiv_(exp_avg, exp_inf, value=-clr)
Reported by Pylint.
Line: 7
Column: 3
from torch import Tensor
from typing import List, Optional
# TODO: use foreach API in optim._functional to do all the computation
def _make_sparse(grad, grad_indices, values):
size = grad.size()
if grad_indices.numel() == 0 or values.numel() == 0:
return torch.empty_like(grad)
Reported by Pylint.
Line: 40
Column: 28
if grad.is_sparse:
grad = grad.coalesce() # the update is non-linear so indices must be unique
grad_indices = grad._indices()
grad_values = grad._values()
size = grad.size()
state_sum.add_(_make_sparse(grad, grad_indices, grad_values.pow(2)))
std = state_sum.sparse_mask(grad)
Reported by Pylint.
torch/utils/benchmark/utils/common.py
47 issues
Line: 146
Column: 27
self._lazy_init()
n_total = len(self._sorted_times)
lower_bound = int(n_total // 4)
upper_bound = int(torch.tensor(3 * n_total / 4).ceil())
interquartile_points: Tuple[float, ...] = self._sorted_times[lower_bound:upper_bound]
std = torch.tensor(interquartile_points).std(unbiased=False).item()
sqrt_n = torch.tensor(len(interquartile_points)).sqrt().item()
# Rough estimates. These are by no means statistically rigorous.
Reported by Pylint.
Line: 148
Column: 15
lower_bound = int(n_total // 4)
upper_bound = int(torch.tensor(3 * n_total / 4).ceil())
interquartile_points: Tuple[float, ...] = self._sorted_times[lower_bound:upper_bound]
std = torch.tensor(interquartile_points).std(unbiased=False).item()
sqrt_n = torch.tensor(len(interquartile_points)).sqrt().item()
# Rough estimates. These are by no means statistically rigorous.
confidence_interval = max(1.645 * std / sqrt_n, _MIN_CONFIDENCE_INTERVAL)
relative_ci = torch.tensor(self._median / confidence_interval).log10().item()
Reported by Pylint.
Line: 149
Column: 18
upper_bound = int(torch.tensor(3 * n_total / 4).ceil())
interquartile_points: Tuple[float, ...] = self._sorted_times[lower_bound:upper_bound]
std = torch.tensor(interquartile_points).std(unbiased=False).item()
sqrt_n = torch.tensor(len(interquartile_points)).sqrt().item()
# Rough estimates. These are by no means statistically rigorous.
confidence_interval = max(1.645 * std / sqrt_n, _MIN_CONFIDENCE_INTERVAL)
relative_ci = torch.tensor(self._median / confidence_interval).log10().item()
num_significant_figures = int(torch.tensor(relative_ci).floor())
Reported by Pylint.
Line: 153
Column: 23
# Rough estimates. These are by no means statistically rigorous.
confidence_interval = max(1.645 * std / sqrt_n, _MIN_CONFIDENCE_INTERVAL)
relative_ci = torch.tensor(self._median / confidence_interval).log10().item()
num_significant_figures = int(torch.tensor(relative_ci).floor())
return min(max(num_significant_figures, 1), _MAX_SIGNIFICANT_FIGURES)
@property
def has_warnings(self) -> bool:
Reported by Pylint.
Line: 154
Column: 39
# Rough estimates. These are by no means statistically rigorous.
confidence_interval = max(1.645 * std / sqrt_n, _MIN_CONFIDENCE_INTERVAL)
relative_ci = torch.tensor(self._median / confidence_interval).log10().item()
num_significant_figures = int(torch.tensor(relative_ci).floor())
return min(max(num_significant_figures, 1), _MAX_SIGNIFICANT_FIGURES)
@property
def has_warnings(self) -> bool:
self._lazy_init()
Reported by Pylint.
Line: 165
Column: 29
def _lazy_init(self) -> None:
if self.raw_times and not self._sorted_times:
self._sorted_times = tuple(sorted(self.times))
_sorted_times = torch.tensor(self._sorted_times, dtype=torch.float64)
self._median = _sorted_times.quantile(.5).item()
self._mean = _sorted_times.mean().item()
self._p25 = _sorted_times.quantile(.25).item()
self._p75 = _sorted_times.quantile(.75).item()
Reported by Pylint.
Line: 165
Column: 68
def _lazy_init(self) -> None:
if self.raw_times and not self._sorted_times:
self._sorted_times = tuple(sorted(self.times))
_sorted_times = torch.tensor(self._sorted_times, dtype=torch.float64)
self._median = _sorted_times.quantile(.5).item()
self._mean = _sorted_times.mean().item()
self._p25 = _sorted_times.quantile(.25).item()
self._p75 = _sorted_times.quantile(.75).item()
Reported by Pylint.
Line: 262
Column: 56
This utility is used to format numbers for human consumption.
"""
time_unit = {-3: "ns", -2: "us", -1: "ms"}.get(int(torch.tensor(t).log10().item() // 3), "s")
time_scale = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1}[time_unit]
return time_unit, time_scale
def unit_to_english(u: str) -> str:
Reported by Pylint.
Line: 279
Column: 21
def trim_sigfig(x: float, n: int) -> float:
"""Trim `x` to `n` significant figures. (e.g. 3.14159, 2 -> 3.10000)"""
assert n == int(n)
magnitude = int(torch.tensor(x).abs().log10().ceil().item())
scale = 10 ** (magnitude - n)
return float(torch.tensor(x / scale).round() * scale)
def ordered_unique(elements: Iterable[Any]) -> List[Any]:
Reported by Pylint.
Line: 281
Column: 18
assert n == int(n)
magnitude = int(torch.tensor(x).abs().log10().ceil().item())
scale = 10 ** (magnitude - n)
return float(torch.tensor(x / scale).round() * scale)
def ordered_unique(elements: Iterable[Any]) -> List[Any]:
return list(collections.OrderedDict({i: None for i in elements}).keys())
Reported by Pylint.
caffe2/python/predictor/predictor_py_utils.py
47 issues
Line: 1
Column: 1
## @package predictor_py_utils
# Module caffe2.python.predictor.predictor_py_utils
from caffe2.python import core, scope
def create_predict_net(predictor_export_meta):
"""
Reported by Pylint.
Line: 28
Column: 1
return net.Proto()
def create_predict_init_net(ws, predictor_export_meta):
"""
Return an initialization net that zero-fill all the input and
output blobs, using the shapes from the provided workspace. This is
necessary as there is no shape inference functionality in Caffe2.
"""
Reported by Pylint.
Line: 68
Column: 1
return net.Proto()
def get_comp_name(string, name):
if name:
return string + "_" + name
return string
Reported by Pylint.
Line: 78
Column: 5
"""
Construct dict from kv_list
"""
d = {}
for item in kv_list:
if item.key not in d:
d[item.key] = item.value
return d
Reported by Pylint.
Line: 85
Column: 1
return d
def _ProtoMapGet(field, key):
"""
Given the key, get the value of the repeated field.
Helper function used by protobuf since it doesn't have map construct
"""
for v in field:
Reported by Pylint.
Line: 90
Column: 9
Given the key, get the value of the repeated field.
Helper function used by protobuf since it doesn't have map construct
"""
for v in field:
if v.key == key:
return v.value
return None
Reported by Pylint.
Line: 96
Column: 1
return None
def GetPlan(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
def GetPlanOriginal(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
Reported by Pylint.
Line: 96
Column: 1
return None
def GetPlan(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
def GetPlanOriginal(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
Reported by Pylint.
Line: 100
Column: 1
return _ProtoMapGet(meta_net_def.plans, key)
def GetPlanOriginal(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
def GetBlobs(meta_net_def, key):
blobs = _ProtoMapGet(meta_net_def.blobs, key)
Reported by Pylint.
Line: 100
Column: 1
return _ProtoMapGet(meta_net_def.plans, key)
def GetPlanOriginal(meta_net_def, key):
return _ProtoMapGet(meta_net_def.plans, key)
def GetBlobs(meta_net_def, key):
blobs = _ProtoMapGet(meta_net_def.blobs, key)
Reported by Pylint.
test/test_module_init.py
47 issues
Line: 2
Column: 1
import inspect
import torch
from unittest import mock
from unittest.mock import MagicMock, patch
from torch.testing import floating_types
from torch.testing._internal.common_device_type import instantiate_device_type_tests, dtypes
from torch.testing._internal.common_quantization import skipIfNoFBGEMM
from torch.testing._internal.common_utils import TestCase, run_tests
Reported by Pylint.
Line: 5
Column: 1
import torch
from unittest import mock
from unittest.mock import MagicMock, patch
from torch.testing import floating_types
from torch.testing._internal.common_device_type import instantiate_device_type_tests, dtypes
from torch.testing._internal.common_quantization import skipIfNoFBGEMM
from torch.testing._internal.common_utils import TestCase, run_tests
Reported by Pylint.
Line: 6
Column: 1
from unittest import mock
from unittest.mock import MagicMock, patch
from torch.testing import floating_types
from torch.testing._internal.common_device_type import instantiate_device_type_tests, dtypes
from torch.testing._internal.common_quantization import skipIfNoFBGEMM
from torch.testing._internal.common_utils import TestCase, run_tests
# Returns a database of args & kwargs that can be used to construct each module.
Reported by Pylint.
Line: 7
Column: 1
from unittest.mock import MagicMock, patch
from torch.testing import floating_types
from torch.testing._internal.common_device_type import instantiate_device_type_tests, dtypes
from torch.testing._internal.common_quantization import skipIfNoFBGEMM
from torch.testing._internal.common_utils import TestCase, run_tests
# Returns a database of args & kwargs that can be used to construct each module.
# Each entry is in class -> (args, kwargs) format.
Reported by Pylint.
Line: 8
Column: 1
from torch.testing import floating_types
from torch.testing._internal.common_device_type import instantiate_device_type_tests, dtypes
from torch.testing._internal.common_quantization import skipIfNoFBGEMM
from torch.testing._internal.common_utils import TestCase, run_tests
# Returns a database of args & kwargs that can be used to construct each module.
# Each entry is in class -> (args, kwargs) format.
# Example: torch.nn.Linear -> ([10, 5], {})
Reported by Pylint.
Line: 14
Column: 3
# Returns a database of args & kwargs that can be used to construct each module.
# Each entry is in class -> (args, kwargs) format.
# Example: torch.nn.Linear -> ([10, 5], {})
# TODO: Merge this in with the initial ModuleInfo implementation.
def build_constructor_arg_db():
return {
torch.nn.AdaptiveAvgPool1d: ((5,), {}),
torch.nn.AdaptiveAvgPool2d: ((5,), {}),
torch.nn.AdaptiveAvgPool3d: ((5,), {}),
Reported by Pylint.
Line: 237
Column: 5
# Returns a function that calls the real implementation of a method
# in addition to passing args to a mock object.
def mock_wrapper(method):
mock = MagicMock()
def wrapper(self, *args, **kwargs):
mock(*args, **kwargs)
return method(self, *args, **kwargs)
wrapper.mock = mock
Reported by Pylint.
Line: 264
Column: 24
return args, kwargs
def generate_test_func(test_cls, module_cls, constructor_arg_db,
verify_kwargs=True, module_is_lazy=False, check_nonexistent_arg=True):
# Generate a function for testing the given module.
@dtypes(*floating_types())
def run_test(test_cls, device, dtype, module_cls=module_cls):
# Check if this module creates parameters or registers buffers.
Reported by Pylint.
Line: 356
Column: 9
torch.nn.Module,
torch.nn.Container, # deprecated
torch.nn.NLLLoss2d, # deprecated
torch.nn.quantized._ConvNd, # base class in __all__ for some reason
# TODO: Remove these 2 from this list once the ASan issue is fixed.
# See https://github.com/pytorch/pytorch/issues/55396
torch.nn.quantized.Embedding,
torch.nn.quantized.EmbeddingBag,
}
Reported by Pylint.
Line: 357
Column: 3
torch.nn.Container, # deprecated
torch.nn.NLLLoss2d, # deprecated
torch.nn.quantized._ConvNd, # base class in __all__ for some reason
# TODO: Remove these 2 from this list once the ASan issue is fixed.
# See https://github.com/pytorch/pytorch/issues/55396
torch.nn.quantized.Embedding,
torch.nn.quantized.EmbeddingBag,
}
# no need to support kwargs for these modules even though
Reported by Pylint.
test/jit/test_custom_operators.py
47 issues
Line: 5
Column: 1
import sys
import unittest
import torch
# Make the helper files in test/ importable
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.append(pytorch_test_dir)
from torch.testing._internal.jit_utils import JitTestCase
Reported by Pylint.
Line: 10
Column: 1
# Make the helper files in test/ importable
pytorch_test_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.append(pytorch_test_dir)
from torch.testing._internal.jit_utils import JitTestCase
if __name__ == '__main__':
raise RuntimeError("This test file is not meant to be run directly, use:\n\n"
"\tpython test/test_jit.py TESTNAME\n\n"
"instead.")
Reported by Pylint.
Line: 23
Column: 9
class TestCustomOperators(JitTestCase):
def test_dynamic_op_registry(self):
from torch._ops import _OpNamespace
self.assertTrue(hasattr(torch, 'ops'))
if '_test' in torch.ops.__dict__:
torch.ops.__dict__.pop('_test')
Reported by Pylint.
Line: 18
Column: 12
"instead.")
def canonical(graph):
return torch._C._jit_pass_canonicalize(graph).str(False)
class TestCustomOperators(JitTestCase):
def test_dynamic_op_registry(self):
from torch._ops import _OpNamespace
Reported by Pylint.
Line: 18
Column: 12
"instead.")
def canonical(graph):
return torch._C._jit_pass_canonicalize(graph).str(False)
class TestCustomOperators(JitTestCase):
def test_dynamic_op_registry(self):
from torch._ops import _OpNamespace
Reported by Pylint.
Line: 31
Column: 9
# Don't use `hasattr()` because it will call `__getattr__`.
self.assertNotIn('_test', torch.ops.__dict__)
torch.ops._test
self.assertIn('_test', torch.ops.__dict__)
self.assertEqual(type(torch.ops._test), _OpNamespace)
self.assertNotIn('leaky_relu', torch.ops._test.__dict__)
op = torch.ops._test.leaky_relu
Reported by Pylint.
Line: 31
Column: 9
# Don't use `hasattr()` because it will call `__getattr__`.
self.assertNotIn('_test', torch.ops.__dict__)
torch.ops._test
self.assertIn('_test', torch.ops.__dict__)
self.assertEqual(type(torch.ops._test), _OpNamespace)
self.assertNotIn('leaky_relu', torch.ops._test.__dict__)
op = torch.ops._test.leaky_relu
Reported by Pylint.
Line: 33
Column: 31
self.assertNotIn('_test', torch.ops.__dict__)
torch.ops._test
self.assertIn('_test', torch.ops.__dict__)
self.assertEqual(type(torch.ops._test), _OpNamespace)
self.assertNotIn('leaky_relu', torch.ops._test.__dict__)
op = torch.ops._test.leaky_relu
self.assertTrue(callable(op))
self.assertIn('leaky_relu', torch.ops._test.__dict__)
Reported by Pylint.
Line: 35
Column: 40
self.assertIn('_test', torch.ops.__dict__)
self.assertEqual(type(torch.ops._test), _OpNamespace)
self.assertNotIn('leaky_relu', torch.ops._test.__dict__)
op = torch.ops._test.leaky_relu
self.assertTrue(callable(op))
self.assertIn('leaky_relu', torch.ops._test.__dict__)
op2 = torch.ops._test.leaky_relu
self.assertEqual(op, op2)
Reported by Pylint.
Line: 36
Column: 14
self.assertEqual(type(torch.ops._test), _OpNamespace)
self.assertNotIn('leaky_relu', torch.ops._test.__dict__)
op = torch.ops._test.leaky_relu
self.assertTrue(callable(op))
self.assertIn('leaky_relu', torch.ops._test.__dict__)
op2 = torch.ops._test.leaky_relu
self.assertEqual(op, op2)
Reported by Pylint.
caffe2/python/net_drawer.py
47 issues
Line: 384
Column: 21
)
for key, operators in viewitems(graphs):
if args.minimal:
graph = GetPydotGraphMinimal(
operators,
name=key,
rankdir=args.rankdir,
node_producer=GetOpNodeProducer(args.append_output, **OP_STYLE),
minimal_dependency=args.minimal_dependency)
Reported by Pylint.
Line: 391
Column: 21
node_producer=GetOpNodeProducer(args.append_output, **OP_STYLE),
minimal_dependency=args.minimal_dependency)
else:
graph = GetPydotGraph(
operators,
name=key,
rankdir=args.rankdir,
node_producer=GetOpNodeProducer(args.append_output, **OP_STYLE))
filename = args.output_prefix + graph.get_name() + '.dot'
Reported by Pylint.
Line: 210
Column: 18
kMaxParallelSteps = 3
def get_label():
label = [step.name + '\n']
if step.report_net:
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
Reported by Pylint.
Line: 211
Column: 12
def get_label():
label = [step.name + '\n']
if step.report_net:
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
Reported by Pylint.
Line: 212
Column: 48
def get_label():
label = [step.name + '\n']
if step.report_net:
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
if step.only_once:
Reported by Pylint.
Line: 213
Column: 12
label = [step.name + '\n']
if step.report_net:
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
if step.only_once:
label.append('Once')
Reported by Pylint.
Line: 214
Column: 47
if step.report_net:
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
if step.only_once:
label.append('Once')
return '\n'.join(label)
Reported by Pylint.
Line: 215
Column: 12
label.append('Reporter: {}'.format(step.report_net))
if step.should_stop_blob:
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
if step.only_once:
label.append('Once')
return '\n'.join(label)
Reported by Pylint.
Line: 217
Column: 12
label.append('Stopper: {}'.format(step.should_stop_blob))
if step.concurrent_substeps:
label.append('Concurrent')
if step.only_once:
label.append('Once')
return '\n'.join(label)
def substep_edge(start, end):
return pydot.Edge(start, end, arrowhead='dot', style='dashed')
Reported by Pylint.
Line: 341
Column: 12
if not isinstance(graph, pydot.Dot):
raise ValueError("func is expected to return pydot.Dot")
return graph.create_png()
except Exception as e:
logger.error("Failed to draw graph: {}".format(e))
return _DummyPngImage
def main():
Reported by Pylint.