The following issues were found
benchmarks/distributed/rpc/rl/launcher.py
22 issues
Line: 6
Column: 1
import time
import json
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
from coordinator import CoordinatorBase
Reported by Pylint.
Line: 7
Column: 1
import json
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
from coordinator import CoordinatorBase
COORDINATOR_NAME = "coordinator"
Reported by Pylint.
Line: 81
Column: 25
rank=rank, world_size=world_size)
rpc.shutdown()
def find_graph_variable(args):
r"""
Determines if user specified multiple entries for a single argument, in which case
benchmark is run for each of these entries. Comma separated values in a given argument indicate multiple entries.
Output is presented so that user can use plot repo to plot the results with each of the
variable argument's entries on the x-axis. Args is modified in accordance with this.
Reported by Pylint.
Line: 1
Column: 1
import argparse
import os
import time
import json
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
Reported by Pylint.
Line: 20
Column: 1
TOTAL_EPISODE_STEPS = 100
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
Reported by Pylint.
Line: 20
Column: 1
TOTAL_EPISODE_STEPS = 100
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
Reported by Pylint.
Line: 23
Column: 5
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
Reported by Pylint.
Line: 45
Column: 1
args = parser.parse_args()
args = vars(args)
def run_worker(rank, world_size, master_addr, master_port, batch, state_size, nlayers, out_features, queue):
r"""
inits an rpc worker
Args:
rank (int): Rpc rank of worker machine
world_size (int): Number of workers in rpc network (number of observers +
Reported by Pylint.
Line: 45
Column: 1
args = parser.parse_args()
args = vars(args)
def run_worker(rank, world_size, master_addr, master_port, batch, state_size, nlayers, out_features, queue):
r"""
inits an rpc worker
Args:
rank (int): Rpc rank of worker machine
world_size (int): Number of workers in rpc network (number of observers +
Reported by Pylint.
Line: 84
Column: 1
def find_graph_variable(args):
r"""
Determines if user specified multiple entries for a single argument, in which case
benchmark is run for each of these entries. Comma separated values in a given argument indicate multiple entries.
Output is presented so that user can use plot repo to plot the results with each of the
variable argument's entries on the x-axis. Args is modified in accordance with this.
More than 1 argument with multiple entries is not permitted.
Args:
args (dict): Dictionary containing arguments passed by the user (and default arguments)
Reported by Pylint.
benchmarks/instruction_counts/execution/work.py
22 issues
Line: 78
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
executable=SHELL,
)
def clone(self) -> "_BenchmarkProcess":
return _BenchmarkProcess(self._work_order, self._cpu_list)
Reported by Bandit.
Line: 18
Column: 17
from worker.main import WORKER_PATH, WorkerFailure, WorkerOutput, WorkerTimerArgs, WorkerUnpickler
if TYPE_CHECKING:
PopenType = subprocess.Popen[bytes]
else:
PopenType = subprocess.Popen
# Mitigate https://github.com/pytorch/pytorch/issues/37377
Reported by Pylint.
Line: 176
Column: 16
@property
def work_order(self) -> WorkOrder:
return self._proc._work_order
@property
def cpu_list(self) -> Optional[str]:
return self._proc._cpu_list
Reported by Pylint.
Line: 180
Column: 16
@property
def cpu_list(self) -> Optional[str]:
return self._proc._cpu_list
@property
def proc(self) -> _BenchmarkProcess:
# NB: For cleanup only.
return self._proc
Reported by Pylint.
Line: 5
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b403-import-pickle
import dataclasses
import json
import os
import pickle
import signal
import subprocess
import time
from typing import List, Optional, Union, TYPE_CHECKING
import uuid
Reported by Bandit.
Line: 7
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import os
import pickle
import signal
import subprocess
import time
from typing import List, Optional, Union, TYPE_CHECKING
import uuid
from core.api import AutoLabels
Reported by Bandit.
Line: 53
Column: 1
})
class _BenchmarkProcess:
"""Wraps subprocess.Popen for a given WorkOrder."""
_work_order: WorkOrder
_cpu_list: Optional[str]
_proc: PopenType
Reported by Pylint.
Line: 71
Column: 54
self._cpu_list = cpu_list
self._start_time = time.time()
self._communication_file = os.path.join(get_temp_dir(), f"{uuid.uuid4()}.pkl")
with open(self._communication_file, "wb") as f:
pickle.dump(self._work_order.timer_args, f)
self._proc = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
Reported by Pylint.
Line: 82
Column: 5
executable=SHELL,
)
def clone(self) -> "_BenchmarkProcess":
return _BenchmarkProcess(self._work_order, self._cpu_list)
@property
def cmd(self) -> str:
cmd: List[str] = []
Reported by Pylint.
Line: 86
Column: 5
return _BenchmarkProcess(self._work_order, self._cpu_list)
@property
def cmd(self) -> str:
cmd: List[str] = []
if self._work_order.source_cmd is not None:
cmd.extend([self._work_order.source_cmd, "&&"])
cmd.append(_ENV)
Reported by Pylint.
test/distributed/elastic/timer/api_test.py
22 issues
Line: 9
Column: 1
import unittest
import unittest.mock as mock
from torch.distributed.elastic.timer import TimerServer
from torch.distributed.elastic.timer.api import RequestQueue, TimerRequest
class MockRequestQueue(RequestQueue):
def size(self):
Reported by Pylint.
Line: 10
Column: 1
import unittest.mock as mock
from torch.distributed.elastic.timer import TimerServer
from torch.distributed.elastic.timer.api import RequestQueue, TimerRequest
class MockRequestQueue(RequestQueue):
def size(self):
return 2
Reported by Pylint.
Line: 17
Column: 19
def size(self):
return 2
def get(self, size, timeout):
return [TimerRequest(1, "test_1", 0), TimerRequest(2, "test_2", 0)]
class MockTimerServer(TimerServer):
"""
Reported by Pylint.
Line: 17
Column: 25
def size(self):
return 2
def get(self, size, timeout):
return [TimerRequest(1, "test_1", 0), TimerRequest(2, "test_2", 0)]
class MockTimerServer(TimerServer):
"""
Reported by Pylint.
Line: 33
Column: 5
For each workers 1 - 3 returns 2 expired timers
"""
def __init__(self, request_queue, max_interval):
super().__init__(request_queue, max_interval)
def register_timers(self, timer_requests):
pass
Reported by Pylint.
Line: 42
Column: 34
def clear_timers(self, worker_ids):
pass
def get_expired_timers(self, deadline):
return {
i: [TimerRequest(i, f"test_{i}_0", 0), TimerRequest(i, f"test_{i}_1", 0)]
for i in range(1, 4)
}
Reported by Pylint.
Line: 69
Column: 9
max_interval = 1
request_queue = mock.Mock(wraps=MockRequestQueue())
timer_server = MockTimerServer(request_queue, max_interval)
timer_server._run_watchdog()
request_queue.size.assert_called_once()
request_queue.get.assert_called_with(request_queue.size(), max_interval)
mock_register_timers.assert_called_with(request_queue.get(2, 1))
mock_clear_timers.assert_called_with({1, 2})
Reported by Pylint.
Line: 1
Column: 1
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import unittest
import unittest.mock as mock
from torch.distributed.elastic.timer import TimerServer
Reported by Pylint.
Line: 13
Column: 1
from torch.distributed.elastic.timer.api import RequestQueue, TimerRequest
class MockRequestQueue(RequestQueue):
def size(self):
return 2
def get(self, size, timeout):
return [TimerRequest(1, "test_1", 0), TimerRequest(2, "test_2", 0)]
Reported by Pylint.
Line: 14
Column: 5
class MockRequestQueue(RequestQueue):
def size(self):
return 2
def get(self, size, timeout):
return [TimerRequest(1, "test_1", 0), TimerRequest(2, "test_2", 0)]
Reported by Pylint.
caffe2/python/operator_test/hyperbolic_ops_test.py
22 issues
Line: 9
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
Reported by Pylint.
Line: 13
Column: 1
import numpy as np
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
Reported by Pylint.
Line: 14
Column: 5
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
engine=engine,)
Reported by Pylint.
Line: 14
Column: 5
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
engine=engine,)
Reported by Pylint.
Line: 14
Column: 5
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
engine=engine,)
Reported by Pylint.
Line: 14
Column: 5
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
engine=engine,)
Reported by Pylint.
Line: 15
Column: 9
class TestHyperbolicOps(serial.SerializedTestCase):
def _test_hyperbolic_op(self, op_name, np_ref, X, in_place, engine, gc, dc):
op = core.CreateOperator(
op_name,
["X"],
["X"] if in_place else ["Y"],
engine=engine,)
Reported by Pylint.
Line: 21
Column: 9
["X"] if in_place else ["Y"],
engine=engine,)
def ref(X):
return [np_ref(X)]
self.assertReferenceChecks(
device_option=gc,
op=op,
Reported by Pylint.
Line: 35
Column: 5
self.assertGradientChecks(gc, op, [X], 0, [0], ensure_outputs_are_inferred=True)
@serial.given(X=hu.tensor(dtype=np.float32), **hu.gcs)
def test_sinh(self, X, gc, dc):
self._test_hyperbolic_op("Sinh", np.sinh, X, False, "", gc, dc)
@serial.given(X=hu.tensor(dtype=np.float32), **hu.gcs)
def test_cosh(self, X, gc, dc):
self._test_hyperbolic_op("Cosh", np.cosh, X, False, "", gc, dc)
Reported by Pylint.
caffe2/python/operator_test/batch_bucketize_op_test.py
22 issues
Line: 11
Column: 1
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given
import hypothesis.strategies as st
class TestBatchBucketize(serial.SerializedTestCase):
@serial.given(**hu.gcs_cpu_only)
Reported by Pylint.
Line: 12
Column: 1
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given
import hypothesis.strategies as st
class TestBatchBucketize(serial.SerializedTestCase):
@serial.given(**hu.gcs_cpu_only)
def test_batch_bucketize_example(self, gc, dc):
Reported by Pylint.
Line: 17
Column: 48
class TestBatchBucketize(serial.SerializedTestCase):
@serial.given(**hu.gcs_cpu_only)
def test_batch_bucketize_example(self, gc, dc):
op = core.CreateOperator('BatchBucketize',
["FEATURE", "INDICES", "BOUNDARIES", "LENGTHS"],
["O"])
float_feature = np.array([[1.42, 2.07, 3.19, 0.55, 4.32],
[4.57, 2.30, 0.84, 4.48, 3.09],
Reported by Pylint.
Line: 30
Column: 53
lengths = np.array([2, 3, 1], dtype=np.int32)
boundaries = np.array([0.5, 1.0, 1.5, 2.5, 3.5, 2.5], dtype=np.float32)
def ref(float_feature, indices, boundaries, lengths):
output = np.array([[2, 1, 1],
[2, 1, 1],
[1, 0, 0],
[0, 2, 1],
[2, 3, 0]], dtype=np.int32)
Reported by Pylint.
Line: 30
Column: 17
lengths = np.array([2, 3, 1], dtype=np.int32)
boundaries = np.array([0.5, 1.0, 1.5, 2.5, 3.5, 2.5], dtype=np.float32)
def ref(float_feature, indices, boundaries, lengths):
output = np.array([[2, 1, 1],
[2, 1, 1],
[1, 0, 0],
[0, 2, 1],
[2, 3, 0]], dtype=np.int32)
Reported by Pylint.
Line: 30
Column: 32
lengths = np.array([2, 3, 1], dtype=np.int32)
boundaries = np.array([0.5, 1.0, 1.5, 2.5, 3.5, 2.5], dtype=np.float32)
def ref(float_feature, indices, boundaries, lengths):
output = np.array([[2, 1, 1],
[2, 1, 1],
[1, 0, 0],
[0, 2, 1],
[2, 3, 0]], dtype=np.int32)
Reported by Pylint.
Line: 30
Column: 41
lengths = np.array([2, 3, 1], dtype=np.int32)
boundaries = np.array([0.5, 1.0, 1.5, 2.5, 3.5, 2.5], dtype=np.float32)
def ref(float_feature, indices, boundaries, lengths):
output = np.array([[2, 1, 1],
[2, 1, 1],
[1, 0, 0],
[0, 2, 1],
[2, 3, 0]], dtype=np.int32)
Reported by Pylint.
Line: 49
Column: 49
min_value=5),
seed=st.integers(min_value=2, max_value=1000),
**hu.gcs_cpu_only)
def test_batch_bucketize(self, x, seed, gc, dc):
op = core.CreateOperator('BatchBucketize',
["FEATURE", "INDICES", "BOUNDARIES", "LENGTHS"],
['O'])
np.random.seed(seed)
d = x.shape[1]
Reported by Pylint.
Line: 1
Column: 1
import numpy as np
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 15
Column: 1
import hypothesis.strategies as st
class TestBatchBucketize(serial.SerializedTestCase):
@serial.given(**hu.gcs_cpu_only)
def test_batch_bucketize_example(self, gc, dc):
op = core.CreateOperator('BatchBucketize',
["FEATURE", "INDICES", "BOUNDARIES", "LENGTHS"],
["O"])
Reported by Pylint.
caffe2/python/operator_test/ctc_beam_search_decoder_op_test.py
22 issues
Line: 8
Column: 1
from caffe2.python import core
from collections import defaultdict, Counter
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
Reported by Pylint.
Line: 11
Column: 1
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
import unittest
DEFAULT_BEAM_WIDTH = 10
Reported by Pylint.
Line: 31
Column: 79
)
@settings(deadline=None, max_examples=30)
def test_ctc_beam_search_decoder(
self, batch, max_time, alphabet_size, beam_width, num_candidates, gc, dc
):
if not beam_width:
beam_width = DEFAULT_BEAM_WIDTH
op_seq_len = core.CreateOperator('CTCBeamSearchDecoder',
['INPUTS', 'SEQ_LEN'],
Reported by Pylint.
Line: 1
Column: 1
from caffe2.python import core
from collections import defaultdict, Counter
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
Reported by Pylint.
Line: 7
Column: 1
from caffe2.python import core
from collections import defaultdict, Counter
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
Reported by Pylint.
Line: 14
Column: 1
import hypothesis.strategies as st
import numpy as np
import unittest
DEFAULT_BEAM_WIDTH = 10
DEFAULT_PRUNE_THRESHOLD = 0.001
Reported by Pylint.
Line: 20
Column: 1
DEFAULT_PRUNE_THRESHOLD = 0.001
class TestCTCBeamSearchDecoderOp(serial.SerializedTestCase):
@given(
batch=st.sampled_from([1, 2, 4]),
max_time=st.sampled_from([1, 8, 64]),
alphabet_size=st.sampled_from([1, 2, 32, 128, 512]),
beam_width=st.sampled_from([1, 2, 16, None]),
Reported by Pylint.
Line: 29
Column: 5
num_candidates=st.sampled_from([1, 2]),
**hu.gcs_cpu_only
)
@settings(deadline=None, max_examples=30)
def test_ctc_beam_search_decoder(
self, batch, max_time, alphabet_size, beam_width, num_candidates, gc, dc
):
if not beam_width:
beam_width = DEFAULT_BEAM_WIDTH
Reported by Pylint.
Line: 29
Column: 5
num_candidates=st.sampled_from([1, 2]),
**hu.gcs_cpu_only
)
@settings(deadline=None, max_examples=30)
def test_ctc_beam_search_decoder(
self, batch, max_time, alphabet_size, beam_width, num_candidates, gc, dc
):
if not beam_width:
beam_width = DEFAULT_BEAM_WIDTH
Reported by Pylint.
Line: 29
Column: 5
num_candidates=st.sampled_from([1, 2]),
**hu.gcs_cpu_only
)
@settings(deadline=None, max_examples=30)
def test_ctc_beam_search_decoder(
self, batch, max_time, alphabet_size, beam_width, num_candidates, gc, dc
):
if not beam_width:
beam_width = DEFAULT_BEAM_WIDTH
Reported by Pylint.
test/distributed/elastic/metrics/api_test.py
22 issues
Line: 12
Column: 1
import unittest
import unittest.mock as mock
from torch.distributed.elastic.metrics.api import (
MetricData,
MetricHandler,
MetricStream,
_get_metric_name,
prof,
Reported by Pylint.
Line: 19
Column: 1
_get_metric_name,
prof,
)
from torch.testing._internal.common_utils import run_tests
def foo_1():
pass
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.abs
import abc
import unittest
Reported by Pylint.
Line: 22
Column: 1
from torch.testing._internal.common_utils import run_tests
def foo_1():
pass
class TestMetricsHandler(MetricHandler):
def __init__(self):
Reported by Pylint.
Line: 26
Column: 1
pass
class TestMetricsHandler(MetricHandler):
def __init__(self):
self.metric_data = {}
def emit(self, metric_data: MetricData):
self.metric_data[metric_data.name] = metric_data
Reported by Pylint.
Line: 26
Column: 1
pass
class TestMetricsHandler(MetricHandler):
def __init__(self):
self.metric_data = {}
def emit(self, metric_data: MetricData):
self.metric_data[metric_data.name] = metric_data
Reported by Pylint.
Line: 30
Column: 5
def __init__(self):
self.metric_data = {}
def emit(self, metric_data: MetricData):
self.metric_data[metric_data.name] = metric_data
class Parent(abc.ABC):
@abc.abstractmethod
Reported by Pylint.
Line: 34
Column: 1
self.metric_data[metric_data.name] = metric_data
class Parent(abc.ABC):
@abc.abstractmethod
def func(self):
raise NotImplementedError()
def base_func(self):
Reported by Pylint.
Line: 36
Column: 5
class Parent(abc.ABC):
@abc.abstractmethod
def func(self):
raise NotImplementedError()
def base_func(self):
self.func()
Reported by Pylint.
Line: 39
Column: 5
def func(self):
raise NotImplementedError()
def base_func(self):
self.func()
class Child(Parent):
# need to decorate the implementation not the abstract method!
Reported by Pylint.
torch/nn/modules/fold.py
22 issues
Line: 2
Column: 1
# -*- coding: utf-8 -*-
from .module import Module
from .. import functional as F
from torch import Tensor
from ..common_types import _size_any_t
class Fold(Module):
Reported by Pylint.
Line: 3
Column: 1
# -*- coding: utf-8 -*-
from .module import Module
from .. import functional as F
from torch import Tensor
from ..common_types import _size_any_t
class Fold(Module):
Reported by Pylint.
Line: 6
Column: 1
from .. import functional as F
from torch import Tensor
from ..common_types import _size_any_t
class Fold(Module):
r"""Combines an array of sliding local blocks into a large containing
tensor.
Reported by Pylint.
Line: 143
Column: 23
self.padding = padding
self.stride = stride
def forward(self, input: Tensor) -> Tensor:
return F.fold(input, self.output_size, self.kernel_size, self.dilation,
self.padding, self.stride)
def extra_repr(self) -> str:
return 'output_size={output_size}, kernel_size={kernel_size}, ' \
Reported by Pylint.
Line: 293
Column: 23
self.padding = padding
self.stride = stride
def forward(self, input: Tensor) -> Tensor:
return F.unfold(input, self.kernel_size, self.dilation,
self.padding, self.stride)
def extra_repr(self) -> str:
return 'kernel_size={kernel_size}, dilation={dilation}, padding={padding},' \
Reported by Pylint.
Line: 1
Column: 1
# -*- coding: utf-8 -*-
from .module import Module
from .. import functional as F
from torch import Tensor
from ..common_types import _size_any_t
class Fold(Module):
Reported by Pylint.
Line: 5
Column: 1
from .module import Module
from .. import functional as F
from torch import Tensor
from ..common_types import _size_any_t
class Fold(Module):
r"""Combines an array of sliding local blocks into a large containing
Reported by Pylint.
Line: 27
Column: 1
.. math::
L = \prod_d \left\lfloor\frac{\text{output\_size}[d] + 2 \times \text{padding}[d] %
- \text{dilation}[d] \times (\text{kernel\_size}[d] - 1) - 1}{\text{stride}[d]} + 1\right\rfloor,
where :math:`d` is over all spatial dimensions.
* :attr:`output_size` describes the spatial shape of the large containing
tensor of the sliding local blocks. It is useful to resolve the ambiguity
Reported by Pylint.
Line: 45
Column: 1
sides for :attr:`padding` number of points for each dimension before
reshaping.
* :attr:`dilation` controls the spacing between the kernel points; also known as the à trous algorithm.
It is harder to describe, but this `link`_ has a nice visualization of what :attr:`dilation` does.
Args:
output_size (int or tuple): the shape of the spatial dimensions of the
output (i.e., ``output.sizes()[2:]``)
Reported by Pylint.
Line: 46
Column: 1
reshaping.
* :attr:`dilation` controls the spacing between the kernel points; also known as the à trous algorithm.
It is harder to describe, but this `link`_ has a nice visualization of what :attr:`dilation` does.
Args:
output_size (int or tuple): the shape of the spatial dimensions of the
output (i.e., ``output.sizes()[2:]``)
kernel_size (int or tuple): the size of the sliding blocks
Reported by Pylint.
torch/nn/modules/adaptive.py
22 issues
Line: 10
Column: 1
from torch import Tensor
from typing import List, Sequence
from . import Sequential, ModuleList, Linear
from .module import Module
from ..functional import log_softmax
_ASMoutput = namedtuple('_ASMoutput', ['output', 'loss'])
Reported by Pylint.
Line: 11
Column: 1
from typing import List, Sequence
from . import Sequential, ModuleList, Linear
from .module import Module
from ..functional import log_softmax
_ASMoutput = namedtuple('_ASMoutput', ['output', 'loss'])
Reported by Pylint.
Line: 12
Column: 1
from . import Sequential, ModuleList, Linear
from .module import Module
from ..functional import log_softmax
_ASMoutput = namedtuple('_ASMoutput', ['output', 'loss'])
Reported by Pylint.
Line: 278
Column: 18
"""
head_output = self.head(input)
output = torch.argmax(head_output, dim=1)
not_in_shortlist = (output >= self.shortlist_size)
all_in_shortlist = not (not_in_shortlist.any())
if all_in_shortlist:
return output
Reported by Pylint.
Line: 287
Column: 20
elif not_in_shortlist.all():
log_prob = self._get_full_log_prob(input, head_output)
return torch.argmax(log_prob, dim=1)
else:
log_prob = self._get_full_log_prob(input[not_in_shortlist],
head_output[not_in_shortlist])
output[not_in_shortlist] = torch.argmax(log_prob, dim=1)
Reported by Pylint.
Line: 292
Column: 40
else:
log_prob = self._get_full_log_prob(input[not_in_shortlist],
head_output[not_in_shortlist])
output[not_in_shortlist] = torch.argmax(log_prob, dim=1)
return output
Reported by Pylint.
Line: 169
Column: 23
i2h.reset_parameters()
h2o.reset_parameters()
def forward(self, input: Tensor, target: Tensor) -> _ASMoutput:
if input.size(0) != target.size(0):
raise RuntimeError('Input and target should have the same size '
'in the batch dimension.')
used_rows = 0
Reported by Pylint.
Line: 224
Column: 34
return _ASMoutput(output, loss)
def _get_full_log_prob(self, input, head_output):
""" Given input tensor, and output of `self.head`,
compute the log of the full distribution """
out = input.new_empty((head_output.size(0), self.n_classes))
head_logprob = log_softmax(head_output, dim=1)
Reported by Pylint.
Line: 242
Column: 24
return out
def log_prob(self, input: Tensor) -> Tensor:
r""" Computes log probabilities for all :math:`\texttt{n\_classes}`
Args:
input (Tensor): a minibatch of examples
Reported by Pylint.
Line: 262
Column: 23
head_output = self.head(input)
return self._get_full_log_prob(input, head_output)
def predict(self, input: Tensor) -> Tensor:
r""" This is equivalent to `self.log_pob(input).argmax(dim=1)`,
but is more efficient in some cases.
Args:
input (Tensor): a minibatch of examples
Reported by Pylint.
torch/nn/quantized/modules/utils.py
22 issues
Line: 8
Column: 29
def _quantize_weight(float_wt, observer):
wt_scale, wt_zp = observer.calculate_qparams()
if observer.qscheme in [torch.per_tensor_symmetric, torch.per_tensor_affine]:
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
Reported by Pylint.
Line: 8
Column: 57
def _quantize_weight(float_wt, observer):
wt_scale, wt_zp = observer.calculate_qparams()
if observer.qscheme in [torch.per_tensor_symmetric, torch.per_tensor_affine]:
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
Reported by Pylint.
Line: 9
Column: 19
def _quantize_weight(float_wt, observer):
wt_scale, wt_zp = observer.calculate_qparams()
if observer.qscheme in [torch.per_tensor_symmetric, torch.per_tensor_affine]:
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
Reported by Pylint.
Line: 11
Column: 42
if observer.qscheme in [torch.per_tensor_symmetric, torch.per_tensor_affine]:
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
Reported by Pylint.
Line: 12
Column: 31
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
Reported by Pylint.
Line: 12
Column: 60
qweight = torch.quantize_per_tensor(
float_wt,
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
Reported by Pylint.
Line: 14
Column: 19
float(wt_scale), int(wt_zp), torch.qint8)
elif observer.qscheme in [torch.per_channel_symmetric, torch.per_channel_affine]:
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
qweight = torch.quantize_per_channel(
float_wt,
Reported by Pylint.
Line: 16
Column: 72
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.float), wt_zp.to(torch.float), observer.ch_axis, observer.dtype)
else:
Reported by Pylint.
Line: 16
Column: 25
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.float), wt_zp.to(torch.float), observer.ch_axis, observer.dtype)
else:
Reported by Pylint.
Line: 16
Column: 49
wt_axis = observer.ch_axis
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.double), wt_zp.to(torch.int64), wt_axis, torch.qint8)
elif observer.qscheme in [torch.per_channel_affine_float_qparams]:
qweight = torch.quantize_per_channel(
float_wt,
wt_scale.to(torch.float), wt_zp.to(torch.float), observer.ch_axis, observer.dtype)
else:
Reported by Pylint.