The following issues were found
torch/utils/data/datapipes/iter/readfilesfromzip.py
10 issues
Line: 1
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, IO, cast
from io import BufferedIOBase
import os
import sys
import zipfile
import warnings
Reported by Pylint.
Line: 3
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, IO, cast
from io import BufferedIOBase
import os
import sys
import zipfile
import warnings
Reported by Pylint.
Line: 4
Column: 1
from torch.utils.data import IterDataPipe
from torch.utils.data.datapipes.utils.common import validate_pathname_binary_tuple
from typing import Iterable, Iterator, Tuple, IO, cast
from io import BufferedIOBase
import os
import sys
import zipfile
import warnings
Reported by Pylint.
Line: 6
Column: 1
from typing import Iterable, Iterator, Tuple, IO, cast
from io import BufferedIOBase
import os
import sys
import zipfile
import warnings
class ReadFilesFromZipIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
Reported by Pylint.
Line: 7
Column: 1
from io import BufferedIOBase
import os
import sys
import zipfile
import warnings
class ReadFilesFromZipIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""" :class:`ReadFilesFromZipIterDataPipe`.
Reported by Pylint.
Line: 8
Column: 1
import os
import sys
import zipfile
import warnings
class ReadFilesFromZipIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""" :class:`ReadFilesFromZipIterDataPipe`.
Reported by Pylint.
Line: 9
Column: 1
import os
import sys
import zipfile
import warnings
class ReadFilesFromZipIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]):
r""" :class:`ReadFilesFromZipIterDataPipe`.
Iterable data pipe to extract zip binary streams from input iterable which contains tuples of
Reported by Pylint.
Line: 54
Column: 1
extracted_fobj = zips.open(zipinfo)
inner_pathname = os.path.normpath(os.path.join(pathname, zipinfo.filename))
# Add a reference of the source zipfile into extracted_fobj, so the source
# zipfile handle won't be released until all the extracted file objs are destroyed.
extracted_fobj.source_ref = zips # type: ignore[attr-defined]
# typing.cast is used here to silence mypy's type checker
yield (inner_pathname, cast(BufferedIOBase, extracted_fobj))
except Exception as e:
warnings.warn(
Reported by Pylint.
Line: 58
Column: 13
extracted_fobj.source_ref = zips # type: ignore[attr-defined]
# typing.cast is used here to silence mypy's type checker
yield (inner_pathname, cast(BufferedIOBase, extracted_fobj))
except Exception as e:
warnings.warn(
"Unable to extract files from corrupted zipfile stream {} due to: {}, abort!".format(pathname, e))
raise e
Reported by Pylint.
Line: 60
Column: 1
yield (inner_pathname, cast(BufferedIOBase, extracted_fobj))
except Exception as e:
warnings.warn(
"Unable to extract files from corrupted zipfile stream {} due to: {}, abort!".format(pathname, e))
raise e
def __len__(self):
if self.length == -1:
Reported by Pylint.
torch/utils/data/datapipes/utils/common.py
10 issues
Line: 34
Column: 15
warnings.warn(err.filename + " : " + err.strerror)
raise err
for path, dirs, files in os.walk(root, onerror=onerror):
if abspath:
path = os.path.abspath(path)
for f in files:
if match_masks(f, masks):
yield os.path.join(path, f)
Reported by Pylint.
Line: 1
Column: 1
import os
import fnmatch
import warnings
from io import BufferedIOBase
from typing import Iterable, List, Union
def match_masks(name : str, masks : Union[str, List[str]]) -> bool:
Reported by Pylint.
Line: 9
Column: 1
from typing import Iterable, List, Union
def match_masks(name : str, masks : Union[str, List[str]]) -> bool:
# empty mask matches any input name
if not masks:
return True
if isinstance(masks, str):
Reported by Pylint.
Line: 23
Column: 1
return False
def get_file_pathnames_from_root(
root: str,
masks: Union[str, List[str]],
recursive: bool = False,
abspath: bool = False) -> Iterable[str]:
Reported by Pylint.
Line: 37
Column: 13
for path, dirs, files in os.walk(root, onerror=onerror):
if abspath:
path = os.path.abspath(path)
for f in files:
if match_masks(f, masks):
yield os.path.join(path, f)
if not recursive:
break
Reported by Pylint.
Line: 44
Column: 1
break
def get_file_binaries_from_pathnames(pathnames: Iterable, mode: str):
if not isinstance(pathnames, Iterable):
pathnames = [pathnames, ]
if mode in ('b', 't'):
mode = 'r' + mode
Reported by Pylint.
Line: 58
Column: 1
yield (pathname, open(pathname, mode))
def validate_pathname_binary_tuple(data):
if not isinstance(data, tuple):
raise TypeError("pathname binary data should be tuple type, but got {}".format(type(data)))
if len(data) != 2:
raise TypeError("pathname binary tuple length should be 2, but got {}".format(str(len(data))))
if not isinstance(data[0], str):
Reported by Pylint.
Line: 62
Column: 1
if not isinstance(data, tuple):
raise TypeError("pathname binary data should be tuple type, but got {}".format(type(data)))
if len(data) != 2:
raise TypeError("pathname binary tuple length should be 2, but got {}".format(str(len(data))))
if not isinstance(data[0], str):
raise TypeError("pathname binary tuple should have string type pathname, but got {}".format(type(data[0])))
if not isinstance(data[1], BufferedIOBase):
raise TypeError("pathname binary tuple should have BufferedIOBase based binary type, but got {}".format(type(data[1])))
Reported by Pylint.
Line: 64
Column: 1
if len(data) != 2:
raise TypeError("pathname binary tuple length should be 2, but got {}".format(str(len(data))))
if not isinstance(data[0], str):
raise TypeError("pathname binary tuple should have string type pathname, but got {}".format(type(data[0])))
if not isinstance(data[1], BufferedIOBase):
raise TypeError("pathname binary tuple should have BufferedIOBase based binary type, but got {}".format(type(data[1])))
Reported by Pylint.
Line: 66
Column: 1
if not isinstance(data[0], str):
raise TypeError("pathname binary tuple should have string type pathname, but got {}".format(type(data[0])))
if not isinstance(data[1], BufferedIOBase):
raise TypeError("pathname binary tuple should have BufferedIOBase based binary type, but got {}".format(type(data[1])))
Reported by Pylint.
torch/utils/tensorboard/_proto_graph.py
10 issues
Line: 1
Column: 1
from tensorboard.compat.proto.node_def_pb2 import NodeDef
from tensorboard.compat.proto.attr_value_pb2 import AttrValue
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
Reported by Pylint.
Line: 2
Column: 1
from tensorboard.compat.proto.node_def_pb2 import NodeDef
from tensorboard.compat.proto.attr_value_pb2 import AttrValue
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
Reported by Pylint.
Line: 3
Column: 1
from tensorboard.compat.proto.node_def_pb2 import NodeDef
from tensorboard.compat.proto.attr_value_pb2 import AttrValue
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
Reported by Pylint.
Line: 6
Column: 22
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
reverse engineered from standard TensorBoard logged data.
"""
Reported by Pylint.
Line: 30
Column: 16
def node_proto(name,
op='UnSpecified',
input=None,
dtype=None,
shape=None, # type: tuple
outputsize=None,
attributes=''
):
Reported by Pylint.
Line: 32
Column: 16
op='UnSpecified',
input=None,
dtype=None,
shape=None, # type: tuple
outputsize=None,
attributes=''
):
"""Creates an object matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/node_def.proto
Reported by Pylint.
Line: 1
Column: 1
from tensorboard.compat.proto.node_def_pb2 import NodeDef
from tensorboard.compat.proto.attr_value_pb2 import AttrValue
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
Reported by Pylint.
Line: 6
Column: 1
from tensorboard.compat.proto.tensor_shape_pb2 import TensorShapeProto
def attr_value_proto(dtype, shape, s):
"""Creates a dict of objects matching
https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/attr_value.proto
specifically designed for a NodeDef. The values have been
reverse engineered from standard TensorBoard logged data.
"""
Reported by Pylint.
Line: 28
Column: 1
return TensorShapeProto(dim=[TensorShapeProto.Dim(size=d) for d in outputsize])
def node_proto(name,
op='UnSpecified',
input=None,
dtype=None,
shape=None, # type: tuple
outputsize=None,
Reported by Pylint.
Line: 28
Column: 1
return TensorShapeProto(dim=[TensorShapeProto.Dim(size=d) for d in outputsize])
def node_proto(name,
op='UnSpecified',
input=None,
dtype=None,
shape=None, # type: tuple
outputsize=None,
Reported by Pylint.
torch/distributions/gumbel.py
10 issues
Line: 30
Column: 17
def __init__(self, loc, scale, validate_args=None):
self.loc, self.scale = broadcast_all(loc, scale)
finfo = torch.finfo(self.loc.dtype)
if isinstance(loc, Number) and isinstance(scale, Number):
base_dist = Uniform(finfo.tiny, 1 - finfo.eps)
else:
base_dist = Uniform(torch.full_like(self.loc, finfo.tiny),
torch.full_like(self.loc, 1 - finfo.eps))
Reported by Pylint.
Line: 34
Column: 33
if isinstance(loc, Number) and isinstance(scale, Number):
base_dist = Uniform(finfo.tiny, 1 - finfo.eps)
else:
base_dist = Uniform(torch.full_like(self.loc, finfo.tiny),
torch.full_like(self.loc, 1 - finfo.eps))
transforms = [ExpTransform().inv, AffineTransform(loc=0, scale=-torch.ones_like(self.scale)),
ExpTransform().inv, AffineTransform(loc=loc, scale=-self.scale)]
super(Gumbel, self).__init__(base_dist, transforms, validate_args=validate_args)
Reported by Pylint.
Line: 35
Column: 33
base_dist = Uniform(finfo.tiny, 1 - finfo.eps)
else:
base_dist = Uniform(torch.full_like(self.loc, finfo.tiny),
torch.full_like(self.loc, 1 - finfo.eps))
transforms = [ExpTransform().inv, AffineTransform(loc=0, scale=-torch.ones_like(self.scale)),
ExpTransform().inv, AffineTransform(loc=loc, scale=-self.scale)]
super(Gumbel, self).__init__(base_dist, transforms, validate_args=validate_args)
def expand(self, batch_shape, _instance=None):
Reported by Pylint.
Line: 36
Column: 73
else:
base_dist = Uniform(torch.full_like(self.loc, finfo.tiny),
torch.full_like(self.loc, 1 - finfo.eps))
transforms = [ExpTransform().inv, AffineTransform(loc=0, scale=-torch.ones_like(self.scale)),
ExpTransform().inv, AffineTransform(loc=loc, scale=-self.scale)]
super(Gumbel, self).__init__(base_dist, transforms, validate_args=validate_args)
def expand(self, batch_shape, _instance=None):
new = self._get_checked_instance(Gumbel, _instance)
Reported by Pylint.
Line: 11
Column: 1
from torch.distributions.utils import broadcast_all, euler_constant
class Gumbel(TransformedDistribution):
r"""
Samples from a Gumbel Distribution.
Examples::
Reported by Pylint.
Line: 1
Column: 1
from numbers import Number
import math
import torch
from torch.distributions import constraints
from torch.distributions.uniform import Uniform
from torch.distributions.transformed_distribution import TransformedDistribution
from torch.distributions.transforms import AffineTransform, ExpTransform
from torch.distributions.utils import broadcast_all, euler_constant
Reported by Pylint.
Line: 36
Column: 1
else:
base_dist = Uniform(torch.full_like(self.loc, finfo.tiny),
torch.full_like(self.loc, 1 - finfo.eps))
transforms = [ExpTransform().inv, AffineTransform(loc=0, scale=-torch.ones_like(self.scale)),
ExpTransform().inv, AffineTransform(loc=loc, scale=-self.scale)]
super(Gumbel, self).__init__(base_dist, transforms, validate_args=validate_args)
def expand(self, batch_shape, _instance=None):
new = self._get_checked_instance(Gumbel, _instance)
Reported by Pylint.
Line: 38
Column: 9
torch.full_like(self.loc, 1 - finfo.eps))
transforms = [ExpTransform().inv, AffineTransform(loc=0, scale=-torch.ones_like(self.scale)),
ExpTransform().inv, AffineTransform(loc=loc, scale=-self.scale)]
super(Gumbel, self).__init__(base_dist, transforms, validate_args=validate_args)
def expand(self, batch_shape, _instance=None):
new = self._get_checked_instance(Gumbel, _instance)
new.loc = self.loc.expand(batch_shape)
new.scale = self.scale.expand(batch_shape)
Reported by Pylint.
Line: 44
Column: 16
new = self._get_checked_instance(Gumbel, _instance)
new.loc = self.loc.expand(batch_shape)
new.scale = self.scale.expand(batch_shape)
return super(Gumbel, self).expand(batch_shape, _instance=new)
# Explicitly defining the log probability function for Gumbel due to precision issues
def log_prob(self, value):
if self._validate_args:
self._validate_sample(value)
Reported by Pylint.
Line: 50
Column: 9
def log_prob(self, value):
if self._validate_args:
self._validate_sample(value)
y = (self.loc - value) / self.scale
return (y - y.exp()) - self.scale.log()
@property
def mean(self):
return self.loc + self.scale * euler_constant
Reported by Pylint.
torch/distributed/optim/post_localSGD_optimizer.py
10 issues
Line: 60
Column: 5
`PostLocalSDGOptimizer` is experimental and subject to change.
"""
def __init__(
self,
params: Iterator[torch.nn.Parameter],
optimizer_class: Type[torch.optim.Optimizer],
averager: averagers.ModelAverager,
**defaults: Any,
Reported by Pylint.
Line: 85
Column: 5
def load_state_dict(self, state_dict):
self.optim.load_state_dict(state_dict)
def step(self):
r"""
Performs a single optimization step (parameter update).
"""
self.optim.step()
self.averager.average_parameters(iter(self.params))
Reported by Pylint.
Line: 92
Column: 5
self.optim.step()
self.averager.average_parameters(iter(self.params))
def zero_grad(self):
self.optim.zero_grad()
def add_param_group(self, param_group):
self.optim.add_param_group(param_group)
Reported by Pylint.
Line: 1
Column: 1
from typing import Any, Iterator, Type
import torch
import torch.distributed.algorithms.model_averaging.averagers as averagers
class PostLocalSGDOptimizer(torch.optim.Optimizer):
r"""
Wraps an arbitrary :class:`torch.optim.Optimizer` and runs `post-local SGD <https://arxiv.org/abs/1808.07217>`_,
Reported by Pylint.
Line: 1
Column: 1
from typing import Any, Iterator, Type
import torch
import torch.distributed.algorithms.model_averaging.averagers as averagers
class PostLocalSGDOptimizer(torch.optim.Optimizer):
r"""
Wraps an arbitrary :class:`torch.optim.Optimizer` and runs `post-local SGD <https://arxiv.org/abs/1808.07217>`_,
Reported by Pylint.
Line: 9
Column: 1
class PostLocalSGDOptimizer(torch.optim.Optimizer):
r"""
Wraps an arbitrary :class:`torch.optim.Optimizer` and runs `post-local SGD <https://arxiv.org/abs/1808.07217>`_,
This optimizer runs local optimizer at every step.
After the warm-up stage, it averages parameters periodically afer the local optimizer is applied.
Args:
params: All the parameters.
Reported by Pylint.
Line: 11
Column: 1
r"""
Wraps an arbitrary :class:`torch.optim.Optimizer` and runs `post-local SGD <https://arxiv.org/abs/1808.07217>`_,
This optimizer runs local optimizer at every step.
After the warm-up stage, it averages parameters periodically afer the local optimizer is applied.
Args:
params: All the parameters.
optimizer_class: The class of the local optimizer.
averager: A model averager instance to run post-localSGD algorithm.
Reported by Pylint.
Line: 48
Column: 1
>>> )
>>>
>>> # In the first 100 steps, DDP runs global gradient averaging at every step.
>>> # After 100 steps, DDP runs gradient averaging within each subgroup (intra-node by default),
>>> # and post-localSGD optimizer runs global model averaging every 4 steps after applying the local optimizer.
>>> for step in range(0, 20):
>>> opt.zero_grad()
>>> loss = loss_fn(output, labels)
>>> loss.backward()
Reported by Pylint.
Line: 49
Column: 1
>>>
>>> # In the first 100 steps, DDP runs global gradient averaging at every step.
>>> # After 100 steps, DDP runs gradient averaging within each subgroup (intra-node by default),
>>> # and post-localSGD optimizer runs global model averaging every 4 steps after applying the local optimizer.
>>> for step in range(0, 20):
>>> opt.zero_grad()
>>> loss = loss_fn(output, labels)
>>> loss.backward()
>>> opt.step()
Reported by Pylint.
Line: 73
Column: 5
self.averager = averager
@property
def state(self):
return self.optim.state
def __repr__(self):
return self.optim.__repr__()
Reported by Pylint.
test/test_logging.py
10 issues
Line: 1
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
Reported by Pylint.
Line: 2
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
Reported by Pylint.
Line: 15
Column: 96
s = TestCase.runWithPytorchAPIUsageStderr("import torch")
self.assertRegexpMatches(s, "PYTORCH_API_USAGE.*import")
# import the shared library directly - it triggers static init but doesn't call anything
s = TestCase.runWithPytorchAPIUsageStderr("from ctypes import CDLL; CDLL('{}')".format(torch._C.__file__))
self.assertNotRegexpMatches(s, "PYTORCH_API_USAGE")
if __name__ == '__main__':
run_tests()
Reported by Pylint.
Line: 1
Column: 1
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
Reported by Pylint.
Line: 5
Column: 1
from torch.testing._internal.common_utils import TestCase, run_tests
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
subprocess
Reported by Pylint.
Line: 5
Column: 1
from torch.testing._internal.common_utils import TestCase, run_tests
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
subprocess
Reported by Pylint.
Line: 6
Column: 5
class LoggingTest(TestCase):
def testApiUsage(self):
"""
This test verifies that api usage logging is not triggered via static
initialization. Since it's triggered at first invocation only - we just
subprocess
"""
Reported by Pylint.
Line: 12
Column: 9
initialization. Since it's triggered at first invocation only - we just
subprocess
"""
s = TestCase.runWithPytorchAPIUsageStderr("import torch")
self.assertRegexpMatches(s, "PYTORCH_API_USAGE.*import")
# import the shared library directly - it triggers static init but doesn't call anything
s = TestCase.runWithPytorchAPIUsageStderr("from ctypes import CDLL; CDLL('{}')".format(torch._C.__file__))
self.assertNotRegexpMatches(s, "PYTORCH_API_USAGE")
Reported by Pylint.
Line: 15
Column: 1
s = TestCase.runWithPytorchAPIUsageStderr("import torch")
self.assertRegexpMatches(s, "PYTORCH_API_USAGE.*import")
# import the shared library directly - it triggers static init but doesn't call anything
s = TestCase.runWithPytorchAPIUsageStderr("from ctypes import CDLL; CDLL('{}')".format(torch._C.__file__))
self.assertNotRegexpMatches(s, "PYTORCH_API_USAGE")
if __name__ == '__main__':
run_tests()
Reported by Pylint.
Line: 15
Column: 9
s = TestCase.runWithPytorchAPIUsageStderr("import torch")
self.assertRegexpMatches(s, "PYTORCH_API_USAGE.*import")
# import the shared library directly - it triggers static init but doesn't call anything
s = TestCase.runWithPytorchAPIUsageStderr("from ctypes import CDLL; CDLL('{}')".format(torch._C.__file__))
self.assertNotRegexpMatches(s, "PYTORCH_API_USAGE")
if __name__ == '__main__':
run_tests()
Reported by Pylint.
torch/distributed/elastic/agent/server/local_elastic_agent.py
10 issues
Line: 187
Column: 36
return self._pcontext.pids()
def _shutdown(self, death_sig: signal.Signals = signal.SIGTERM) -> None:
if self._pcontext:
self._pcontext.close(death_sig)
# pyre-fixme[56]: Pyre was not able to infer the type of the decorator
# `torch.distributed.elastic.metrics.prof`.
Reported by Pylint.
Line: 116
Column: 9
def _make_log_dir(self, log_dir: Optional[str], rdzv_run_id: str):
base_log_dir = log_dir or tempfile.mkdtemp(prefix="torchelastic_")
os.makedirs(base_log_dir, exist_ok=True)
dir = tempfile.mkdtemp(prefix=f"{rdzv_run_id}_", dir=base_log_dir)
log.info(f"log directory set to: {dir}")
return dir
# pyre-fixme[56]: Pyre was not able to infer the type of the decorator
# `torch.distributed.elastic.metrics.prof`.
Reported by Pylint.
Line: 117
Column: 9
base_log_dir = log_dir or tempfile.mkdtemp(prefix="torchelastic_")
os.makedirs(base_log_dir, exist_ok=True)
dir = tempfile.mkdtemp(prefix=f"{rdzv_run_id}_", dir=base_log_dir)
log.info(f"log directory set to: {dir}")
return dir
# pyre-fixme[56]: Pyre was not able to infer the type of the decorator
# `torch.distributed.elastic.metrics.prof`.
@prof
Reported by Pylint.
Line: 200
Column: 13
assert self._pcontext is not None
pc_pids = set(self._pcontext.pids().values())
if worker_pids != pc_pids:
log.error(
f"[{role}] worker pids do not match process_context pids."
f" Expected: {worker_pids}, actual: {pc_pids}"
)
return RunResult(state=WorkerState.UNKNOWN)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
Reported by Pylint.
Line: 113
Column: 5
rdzv_run_id = spec.rdzv_handler.get_run_id()
self._log_dir = self._make_log_dir(log_dir, rdzv_run_id)
def _make_log_dir(self, log_dir: Optional[str], rdzv_run_id: str):
base_log_dir = log_dir or tempfile.mkdtemp(prefix="torchelastic_")
os.makedirs(base_log_dir, exist_ok=True)
dir = tempfile.mkdtemp(prefix=f"{rdzv_run_id}_", dir=base_log_dir)
log.info(f"log directory set to: {dir}")
return dir
Reported by Pylint.
Line: 132
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
spec = worker_group.spec
store = worker_group.store
assert store is not None
master_addr, master_port = super()._get_master_addr_port(store)
restart_count = spec.max_restarts - self._remaining_restarts
use_agent_store = spec.rdzv_handler.get_backend() == "static"
Reported by Bandit.
Line: 173
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
shutil.rmtree(attempt_log_dir, ignore_errors=True)
os.makedirs(attempt_log_dir)
assert spec.entrypoint is not None
self._pcontext = start_processes(
name=spec.role,
entrypoint=spec.entrypoint,
args=args,
envs=envs,
Reported by Bandit.
Line: 197
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
role = worker_group.spec.role
worker_pids = {w.id for w in worker_group.workers}
assert self._pcontext is not None
pc_pids = set(self._pcontext.pids().values())
if worker_pids != pc_pids:
log.error(
f"[{role}] worker pids do not match process_context pids."
f" Expected: {worker_pids}, actual: {pc_pids}"
Reported by Bandit.
Line: 208
Column: 13
result = self._pcontext.wait(0)
if result:
if result.is_failed():
# map local rank failure to global rank
worker_failures = {}
for local_rank, failure in result.failures.items():
worker = worker_group.workers[local_rank]
worker_failures[worker.global_rank] = failure
Reported by Pylint.
torch/csrc/deploy/test_deploy_python.py
10 issues
Line: 24
Column: 9
def numpy_test(x):
import numpy as np
xs = [np.array([x, x]), np.array([x, x])]
for i in range(10):
xs.append(xs[-1] + xs[-2])
return int(xs[-1][0])
Reported by Pylint.
Line: 1
Column: 1
# this is imported by test_deploy to do some checks in python
import sys
import subprocess
from pathlib import Path
# we've taken steps to clear out the embedded python environment,
# so we have to go searching for real python to figure out where its libraries are installed.
def python_path(cpath):
for maybe in cpath.split(':'):
Reported by Pylint.
Line: 3
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
# this is imported by test_deploy to do some checks in python
import sys
import subprocess
from pathlib import Path
# we've taken steps to clear out the embedded python environment,
# so we have to go searching for real python to figure out where its libraries are installed.
def python_path(cpath):
for maybe in cpath.split(':'):
Reported by Bandit.
Line: 8
Column: 1
# we've taken steps to clear out the embedded python environment,
# so we have to go searching for real python to figure out where its libraries are installed.
def python_path(cpath):
for maybe in cpath.split(':'):
candidate = Path(maybe) / "python"
if candidate.exists():
cmd = [str(candidate), '-c', 'import sys; print(":".join(sys.path))']
return subprocess.check_output(cmd).decode('utf-8').strip('\n').split(':')
Reported by Pylint.
Line: 13
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
candidate = Path(maybe) / "python"
if candidate.exists():
cmd = [str(candidate), '-c', 'import sys; print(":".join(sys.path))']
return subprocess.check_output(cmd).decode('utf-8').strip('\n').split(':')
raise RuntimeError('could not find real python')
def setup(path):
sys.path.extend(python_path(path))
sys.path.append('build/lib') # for our test python extension
Reported by Bandit.
Line: 16
Column: 1
return subprocess.check_output(cmd).decode('utf-8').strip('\n').split(':')
raise RuntimeError('could not find real python')
def setup(path):
sys.path.extend(python_path(path))
sys.path.append('build/lib') # for our test python extension
# smoke test the numpy extension loading works
def numpy_test(x):
Reported by Pylint.
Line: 21
Column: 1
sys.path.append('build/lib') # for our test python extension
# smoke test the numpy extension loading works
def numpy_test(x):
import numpy as np
xs = [np.array([x, x]), np.array([x, x])]
for i in range(10):
xs.append(xs[-1] + xs[-2])
return int(xs[-1][0])
Reported by Pylint.
Line: 21
Column: 1
sys.path.append('build/lib') # for our test python extension
# smoke test the numpy extension loading works
def numpy_test(x):
import numpy as np
xs = [np.array([x, x]), np.array([x, x])]
for i in range(10):
xs.append(xs[-1] + xs[-2])
return int(xs[-1][0])
Reported by Pylint.
Line: 22
Column: 5
# smoke test the numpy extension loading works
def numpy_test(x):
import numpy as np
xs = [np.array([x, x]), np.array([x, x])]
for i in range(10):
xs.append(xs[-1] + xs[-2])
return int(xs[-1][0])
Reported by Pylint.
Line: 23
Column: 5
# smoke test the numpy extension loading works
def numpy_test(x):
import numpy as np
xs = [np.array([x, x]), np.array([x, x])]
for i in range(10):
xs.append(xs[-1] + xs[-2])
return int(xs[-1][0])
Reported by Pylint.
torch/distributed/__init__.py
10 issues
Line: 25
Column: 5
if is_available():
from torch._C._distributed_c10d import (
Store,
FileStore,
TCPStore,
ProcessGroup,
PrefixStore,
Reported by Pylint.
Line: 47
Column: 9
)
if sys.platform != "win32":
from torch._C._distributed_c10d import (
HashStore,
_round_robin_process_groups,
)
from .distributed_c10d import * # noqa: F403
Reported by Pylint.
Line: 52
Column: 5
_round_robin_process_groups,
)
from .distributed_c10d import * # noqa: F403
# Variables prefixed with underscore are not auto imported
# See the comment in `distributed_c10d.py` above `_backend` on why we expose
# this.
Reported by Pylint.
Line: 58
Column: 5
# See the comment in `distributed_c10d.py` above `_backend` on why we expose
# this.
from .distributed_c10d import (
_backend,
_all_gather_base,
_reduce_scatter_base,
_create_process_group_wrapper,
_rank_not_in_group,
Reported by Pylint.
Line: 66
Column: 5
_rank_not_in_group,
)
from .remote_device import _remote_device
Reported by Pylint.
Line: 17
Column: 20
Currently, the default value is ``USE_DISTRIBUTED=1`` for Linux and Windows,
``USE_DISTRIBUTED=0`` for MacOS.
"""
return hasattr(torch._C, "_c10d_init")
if is_available() and not torch._C._c10d_init():
raise RuntimeError("Failed to initialize torch.distributed")
Reported by Pylint.
Line: 20
Column: 27
return hasattr(torch._C, "_c10d_init")
if is_available() and not torch._C._c10d_init():
raise RuntimeError("Failed to initialize torch.distributed")
if is_available():
from torch._C._distributed_c10d import (
Reported by Pylint.
Line: 20
Column: 27
return hasattr(torch._C, "_c10d_init")
if is_available() and not torch._C._c10d_init():
raise RuntimeError("Failed to initialize torch.distributed")
if is_available():
from torch._C._distributed_c10d import (
Reported by Pylint.
Line: 1
Column: 1
import os
import sys
from enum import Enum
import torch
def is_available() -> bool:
"""
Reported by Pylint.
Line: 20
Column: 27
return hasattr(torch._C, "_c10d_init")
if is_available() and not torch._C._c10d_init():
raise RuntimeError("Failed to initialize torch.distributed")
if is_available():
from torch._C._distributed_c10d import (
Reported by Pylint.
tools/extract_scripts.py
10 issues
Line: 10
Column: 1
from typing import Any, Dict, Optional
import yaml
from typing_extensions import TypedDict
Step = Dict[str, Any]
class Script(TypedDict):
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
import argparse
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
Reported by Pylint.
Line: 15
Column: 1
Step = Dict[str, Any]
class Script(TypedDict):
extension: str
script: str
def extract(step: Step) -> Optional[Script]:
Reported by Pylint.
Line: 15
Column: 1
Step = Dict[str, Any]
class Script(TypedDict):
extension: str
script: str
def extract(step: Step) -> Optional[Script]:
Reported by Pylint.
Line: 20
Column: 1
script: str
def extract(step: Step) -> Optional[Script]:
run = step.get('run')
# https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#using-a-specific-shell
shell = step.get('shell', 'bash')
extension = {
Reported by Pylint.
Line: 37
Column: 5
is_gh_script = step.get('uses', '').startswith('actions/github-script@')
gh_script = step.get('with', {}).get('script')
if run is not None and extension is not None:
script = {
'bash': f'#!/usr/bin/env bash\nset -eo pipefail\n{run}',
'sh': f'#!/usr/bin/env sh\nset -e\n{run}',
}.get(shell, run)
return {'extension': extension, 'script': script}
Reported by Pylint.
Line: 49
Column: 1
return None
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--out', required=True)
args = parser.parse_args()
out = Path(args.out)
Reported by Pylint.
Line: 49
Column: 1
return None
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--out', required=True)
args = parser.parse_args()
out = Path(args.out)
Reported by Pylint.
Line: 60
Column: 9
gha_expressions_found = False
for p in Path('.github/workflows').iterdir():
with open(p) as f:
workflow = yaml.safe_load(f)
for job_name, job in workflow['jobs'].items():
job_dir = out / p / job_name
Reported by Pylint.
Line: 61
Column: 25
gha_expressions_found = False
for p in Path('.github/workflows').iterdir():
with open(p) as f:
workflow = yaml.safe_load(f)
for job_name, job in workflow['jobs'].items():
job_dir = out / p / job_name
steps = job['steps']
Reported by Pylint.