The following issues were found
tools/linter/trailing_newlines.py
8 issues
Line: 1
Column: 1
#!/usr/bin/env python3
import fileinput
import os
import sys
NEWLINE, = b'\n'
Reported by Pylint.
Line: 10
Column: 1
NEWLINE, = b'\n'
def correct_trailing_newlines(filename: str) -> bool:
with open(filename, 'rb') as f:
a = len(f.read(2))
if a == 0:
return True
elif a == 1:
Reported by Pylint.
Line: 11
Column: 34
def correct_trailing_newlines(filename: str) -> bool:
with open(filename, 'rb') as f:
a = len(f.read(2))
if a == 0:
return True
elif a == 1:
# file is wrong whether or not the only byte is a newline
Reported by Pylint.
Line: 12
Column: 9
def correct_trailing_newlines(filename: str) -> bool:
with open(filename, 'rb') as f:
a = len(f.read(2))
if a == 0:
return True
elif a == 1:
# file is wrong whether or not the only byte is a newline
return False
Reported by Pylint.
Line: 13
Column: 9
def correct_trailing_newlines(filename: str) -> bool:
with open(filename, 'rb') as f:
a = len(f.read(2))
if a == 0:
return True
elif a == 1:
# file is wrong whether or not the only byte is a newline
return False
else:
Reported by Pylint.
Line: 20
Column: 16
return False
else:
f.seek(-2, os.SEEK_END)
b, c = f.read(2)
# no ASCII byte is part of any non-ASCII character in UTF-8
return b != NEWLINE and c == NEWLINE
def main() -> int:
Reported by Pylint.
Line: 20
Column: 13
return False
else:
f.seek(-2, os.SEEK_END)
b, c = f.read(2)
# no ASCII byte is part of any non-ASCII character in UTF-8
return b != NEWLINE and c == NEWLINE
def main() -> int:
Reported by Pylint.
Line: 25
Column: 1
return b != NEWLINE and c == NEWLINE
def main() -> int:
# mimic git grep exit code behavior
exit_code = 1
for line in fileinput.input():
stripped = line.rstrip()
if not correct_trailing_newlines(stripped):
Reported by Pylint.
tools/linter/clang_tidy/__main__.py
8 issues
Line: 1
Column: 1
import argparse
import pathlib
import os
import shutil
import subprocess
import re
import sys
from typing import List
Reported by Pylint.
Line: 5
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import pathlib
import os
import shutil
import subprocess
import re
import sys
from typing import List
Reported by Bandit.
Line: 16
Column: 1
from tools.linter.install.clang_tidy import INSTALLATION_PATH
def clang_search_dirs() -> List[str]:
# Compilers are ordered based on fallback preference
# We pick the first one that is available on the system
compilers = ["clang", "gcc", "cpp", "cc"]
compilers = [c for c in compilers if shutil.which(c) is not None]
if len(compilers) == 0:
Reported by Pylint.
Line: 25
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
raise RuntimeError(f"None of {compilers} were found")
compiler = compilers[0]
result = subprocess.run(
[compiler, "-E", "-x", "c++", "-", "-v"],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
Reported by Bandit.
Line: 40
Column: 13
search_paths = []
for line in stderr:
if re.match(search_start, line):
if append_path:
continue
else:
append_path = True
elif re.match(search_end, line):
break
Reported by Pylint.
Line: 87
Column: 1
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="clang-tidy wrapper script")
parser.add_argument(
"-e",
"--clang-tidy-exe",
default=DEFAULTS["clang-tidy-exe"],
Reported by Pylint.
Line: 109
Column: 1
"--regex",
action="append",
default=[],
help="Only lint files that match these regular expressions (from the start of the filename). "
"If a pattern starts with a - the search is negated for that pattern.",
)
parser.add_argument(
"-c",
"--compile-commands-dir",
Reported by Pylint.
Line: 172
Column: 1
return parser.parse_args()
def main() -> None:
options = parse_args()
if not pathlib.Path("build").exists():
generate_build_files()
Reported by Pylint.
test/test_ao_sparsity.py
8 issues
Line: 3
Column: 1
# -*- coding: utf-8 -*-
from torch.testing._internal.common_utils import run_tests
# Kernels
from ao.sparsity.test_kernels import TestQuantizedSparseKernels # noqa: F401
from ao.sparsity.test_kernels import TestQuantizedSparseLayers # noqa: F401
# Parametrizations
Reported by Pylint.
Line: 6
Column: 1
from torch.testing._internal.common_utils import run_tests
# Kernels
from ao.sparsity.test_kernels import TestQuantizedSparseKernels # noqa: F401
from ao.sparsity.test_kernels import TestQuantizedSparseLayers # noqa: F401
# Parametrizations
from ao.sparsity.test_parametrization import TestFakeSparsity # noqa: F401
Reported by Pylint.
Line: 7
Column: 1
# Kernels
from ao.sparsity.test_kernels import TestQuantizedSparseKernels # noqa: F401
from ao.sparsity.test_kernels import TestQuantizedSparseLayers # noqa: F401
# Parametrizations
from ao.sparsity.test_parametrization import TestFakeSparsity # noqa: F401
# Sparsifier
Reported by Pylint.
Line: 10
Column: 1
from ao.sparsity.test_kernels import TestQuantizedSparseLayers # noqa: F401
# Parametrizations
from ao.sparsity.test_parametrization import TestFakeSparsity # noqa: F401
# Sparsifier
from ao.sparsity.test_sparsifier import TestBaseSparsifier # noqa: F401
from ao.sparsity.test_sparsifier import TestWeightNormSparsifier # noqa: F401
Reported by Pylint.
Line: 13
Column: 1
from ao.sparsity.test_parametrization import TestFakeSparsity # noqa: F401
# Sparsifier
from ao.sparsity.test_sparsifier import TestBaseSparsifier # noqa: F401
from ao.sparsity.test_sparsifier import TestWeightNormSparsifier # noqa: F401
# Scheduler
from ao.sparsity.test_scheduler import TestScheduler # noqa: F401
Reported by Pylint.
Line: 14
Column: 1
# Sparsifier
from ao.sparsity.test_sparsifier import TestBaseSparsifier # noqa: F401
from ao.sparsity.test_sparsifier import TestWeightNormSparsifier # noqa: F401
# Scheduler
from ao.sparsity.test_scheduler import TestScheduler # noqa: F401
if __name__ == '__main__':
Reported by Pylint.
Line: 17
Column: 1
from ao.sparsity.test_sparsifier import TestWeightNormSparsifier # noqa: F401
# Scheduler
from ao.sparsity.test_scheduler import TestScheduler # noqa: F401
if __name__ == '__main__':
run_tests()
Reported by Pylint.
Line: 1
Column: 1
# -*- coding: utf-8 -*-
from torch.testing._internal.common_utils import run_tests
# Kernels
from ao.sparsity.test_kernels import TestQuantizedSparseKernels # noqa: F401
from ao.sparsity.test_kernels import TestQuantizedSparseLayers # noqa: F401
# Parametrizations
Reported by Pylint.
tools/autograd/gen_annotated_fn_args.py
8 issues
Line: 27
Column: 1
from tools.codegen.context import with_native_function
from tools.codegen.model import BaseOperatorName, NativeFunction
import tools.codegen.api.python as python
from .gen_python_functions import should_generate_py_binding, is_py_torch_function, \
is_py_nn_function, is_py_linalg_function, is_py_variable_method, is_py_special_function, \
is_py_fft_function
def gen_annotated(native_yaml_path: str, out: str, autograd_dir: str) -> None:
native_functions = parse_native_yaml(native_yaml_path).native_functions
Reported by Pylint.
Line: 31
Column: 1
is_py_nn_function, is_py_linalg_function, is_py_variable_method, is_py_special_function, \
is_py_fft_function
def gen_annotated(native_yaml_path: str, out: str, autograd_dir: str) -> None:
native_functions = parse_native_yaml(native_yaml_path).native_functions
mappings = (
(is_py_torch_function, 'torch._C._VariableFunctions'),
(is_py_nn_function, 'torch._C._nn'),
(is_py_linalg_function, 'torch._C._linalg'),
Reported by Pylint.
Line: 44
Column: 13
annotated_args: List[str] = []
for pred, namespace in mappings:
groups: Dict[BaseOperatorName, List[NativeFunction]] = defaultdict(list)
for f in native_functions:
if not should_generate_py_binding(f) or not pred(f):
continue
groups[f.func.name.name].append(f)
for group in groups.values():
for f in group:
Reported by Pylint.
Line: 49
Column: 17
continue
groups[f.func.name.name].append(f)
for group in groups.values():
for f in group:
annotated_args.append(f'{namespace}.{gen_annotated_args(f)}')
template_path = os.path.join(autograd_dir, 'templates')
fm = FileManager(install_dir=out, template_dir=template_path, dry_run=False)
fm.write_with_template('annotated_fn_args.py', 'annotated_fn_args.py.in', lambda: {
Reported by Pylint.
Line: 53
Column: 5
annotated_args.append(f'{namespace}.{gen_annotated_args(f)}')
template_path = os.path.join(autograd_dir, 'templates')
fm = FileManager(install_dir=out, template_dir=template_path, dry_run=False)
fm.write_with_template('annotated_fn_args.py', 'annotated_fn_args.py.in', lambda: {
'annotated_args': textwrap.indent('\n'.join(annotated_args), ' '),
})
@with_native_function
Reported by Pylint.
Line: 59
Column: 1
})
@with_native_function
def gen_annotated_args(f: NativeFunction) -> str:
out_args: List[Dict[str, Any]] = []
for arg in f.func.arguments.flat_positional:
if arg.default is not None:
continue
out_arg: Dict[str, Any] = {}
Reported by Pylint.
Line: 59
Column: 1
})
@with_native_function
def gen_annotated_args(f: NativeFunction) -> str:
out_args: List[Dict[str, Any]] = []
for arg in f.func.arguments.flat_positional:
if arg.default is not None:
continue
out_arg: Dict[str, Any] = {}
Reported by Pylint.
Line: 74
Column: 1
return f'{f.func.name.name}: {repr(out_args)},'
def main() -> None:
parser = argparse.ArgumentParser(
description='Generate annotated_fn_args script')
parser.add_argument('native_functions', metavar='NATIVE',
help='path to native_functions.yaml')
parser.add_argument('out', metavar='OUT',
Reported by Pylint.
test/simulate_nccl_errors.py
8 issues
Line: 2
Column: 1
import torch.distributed as c10d
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
Reported by Pylint.
Line: 3
Column: 1
import torch.distributed as c10d
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
Reported by Pylint.
Line: 34
Column: 22
work = process_group.allreduce(torch.rand(10).cuda(rank))
logging.info('Waiting for allreduce to complete...')
work.wait()
logging.info('Second allreduce successful: {}'.format(work.is_success()))
else:
logging.info('Aborting all other ranks.')
os.abort()
Reported by Pylint.
Line: 1
Column: 1
import torch.distributed as c10d
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
Reported by Pylint.
Line: 4
Column: 1
import torch.distributed as c10d
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
Reported by Pylint.
Line: 5
Column: 1
import torch.distributed as c10d
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
Reported by Pylint.
Line: 6
Column: 1
import torch
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Simple script to simulate NCCL errors. The script is '
Reported by Pylint.
Line: 7
Column: 1
import argparse
import os
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Simple script to simulate NCCL errors. The script is '
'supposed to be run on multiple different nodes simultaneously with '
Reported by Pylint.
tools/download_mnist.py
8 issues
Line: 42
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b310-urllib-urlopen
print('Downloading {} ...'.format(url))
try:
hook = None if quiet else report_download_progress
urlretrieve(url, destination_path, reporthook=hook)
except (URLError, ConnectionError) as e:
print('Failed to download (trying next):\n{}'.format(e))
continue
finally:
if not quiet:
Reported by Bandit.
Line: 1
Column: 1
import argparse
import gzip
import os
from urllib.error import URLError
from urllib.request import urlretrieve
import sys
MIRRORS = [
'http://yann.lecun.com/exdb/mnist/',
Reported by Pylint.
Line: 21
Column: 1
]
def report_download_progress(
chunk_number: int,
chunk_size: int,
file_size: int,
) -> None:
if file_size != -1:
Reported by Pylint.
Line: 28
Column: 9
) -> None:
if file_size != -1:
percent = min(1, (chunk_number * chunk_size) / file_size)
bar = '#' * int(64 * percent)
sys.stdout.write('\r0% |{:<64}| {}%'.format(bar, int(percent * 100)))
def download(destination_path: str, resource: str, quiet: bool) -> None:
if os.path.exists(destination_path):
Reported by Pylint.
Line: 32
Column: 1
sys.stdout.write('\r0% |{:<64}| {}%'.format(bar, int(percent * 100)))
def download(destination_path: str, resource: str, quiet: bool) -> None:
if os.path.exists(destination_path):
if not quiet:
print('{} already exists, skipping ...'.format(destination_path))
else:
for mirror in MIRRORS:
Reported by Pylint.
Line: 43
Column: 13
try:
hook = None if quiet else report_download_progress
urlretrieve(url, destination_path, reporthook=hook)
except (URLError, ConnectionError) as e:
print('Failed to download (trying next):\n{}'.format(e))
continue
finally:
if not quiet:
# Just a newline.
Reported by Pylint.
Line: 55
Column: 1
raise RuntimeError('Error downloading resource!')
def unzip(zipped_path: str, quiet: bool) -> None:
unzipped_path = os.path.splitext(zipped_path)[0]
if os.path.exists(unzipped_path):
if not quiet:
print('{} already exists, skipping ... '.format(unzipped_path))
return
Reported by Pylint.
Line: 68
Column: 1
print('Unzipped {} ...'.format(zipped_path))
def main() -> None:
parser = argparse.ArgumentParser(
description='Download the MNIST dataset from the internet')
parser.add_argument(
'-d', '--destination', default='.', help='Destination directory')
parser.add_argument(
Reported by Pylint.
test/test_license.py
8 issues
Line: 6
Column: 1
import os
import unittest
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
try:
from third_party.build_bundled import create_bundled
Reported by Pylint.
Line: 7
Column: 1
import unittest
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
try:
from third_party.build_bundled import create_bundled
except ImportError:
Reported by Pylint.
Line: 1
Column: 1
import glob
import io
import os
import unittest
import torch
from torch.testing._internal.common_utils import TestCase, run_tests
Reported by Pylint.
Line: 15
Column: 1
except ImportError:
create_bundled = None
license_file = 'third_party/LICENSES_BUNDLED.txt'
starting_txt = 'The Pytorch repository and source distributions bundle'
site_packages = os.path.dirname(os.path.dirname(torch.__file__))
distinfo = glob.glob(os.path.join(site_packages, 'torch-*dist-info'))
class TestLicense(TestCase):
Reported by Pylint.
Line: 16
Column: 1
create_bundled = None
license_file = 'third_party/LICENSES_BUNDLED.txt'
starting_txt = 'The Pytorch repository and source distributions bundle'
site_packages = os.path.dirname(os.path.dirname(torch.__file__))
distinfo = glob.glob(os.path.join(site_packages, 'torch-*dist-info'))
class TestLicense(TestCase):
Reported by Pylint.
Line: 20
Column: 1
site_packages = os.path.dirname(os.path.dirname(torch.__file__))
distinfo = glob.glob(os.path.join(site_packages, 'torch-*dist-info'))
class TestLicense(TestCase):
@unittest.skipIf(not create_bundled, "can only be run in a source tree")
def test_license_for_wheel(self):
current = io.StringIO()
create_bundled('third_party', current)
Reported by Pylint.
Line: 23
Column: 5
class TestLicense(TestCase):
@unittest.skipIf(not create_bundled, "can only be run in a source tree")
def test_license_for_wheel(self):
current = io.StringIO()
create_bundled('third_party', current)
with open(license_file) as fid:
src_tree = fid.read()
if not src_tree == current.getvalue():
Reported by Pylint.
Line: 23
Column: 5
class TestLicense(TestCase):
@unittest.skipIf(not create_bundled, "can only be run in a source tree")
def test_license_for_wheel(self):
current = io.StringIO()
create_bundled('third_party', current)
with open(license_file) as fid:
src_tree = fid.read()
if not src_tree == current.getvalue():
Reported by Pylint.
test/test_jit_simple.py
8 issues
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=simple")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 6
Column: 5
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 7
Column: 5
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 8
Column: 13
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 9
Column: 5
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=simple")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 1
Column: 1
import sys
sys.argv.append("--jit_executor=simple")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=simple")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
torch/cuda/profiler.py
8 issues
Line: 3
Column: 1
import tempfile
import contextlib
from . import cudart, check_error
DEFAULT_FLAGS = [
"gpustarttimestamp",
"gpuendtimestamp",
"gridsize3d",
Reported by Pylint.
Line: 1
Column: 1
import tempfile
import contextlib
from . import cudart, check_error
DEFAULT_FLAGS = [
"gpustarttimestamp",
"gpuendtimestamp",
"gridsize3d",
Reported by Pylint.
Line: 17
Column: 1
]
def init(output_file, flags=None, output_mode='key_value'):
rt = cudart()
if not hasattr(rt, 'cudaOutputMode'):
raise AssertionError("HIP does not support profiler initialization!")
flags = DEFAULT_FLAGS if flags is None else flags
if output_mode == 'key_value':
Reported by Pylint.
Line: 18
Column: 5
def init(output_file, flags=None, output_mode='key_value'):
rt = cudart()
if not hasattr(rt, 'cudaOutputMode'):
raise AssertionError("HIP does not support profiler initialization!")
flags = DEFAULT_FLAGS if flags is None else flags
if output_mode == 'key_value':
output_mode_enum = rt.cudaOutputMode.KeyValuePair
Reported by Pylint.
Line: 28
Column: 54
output_mode_enum = rt.cudaOutputMode.CSV
else:
raise RuntimeError("supported CUDA profiler output modes are: key_value and csv")
with tempfile.NamedTemporaryFile(delete=True) as f:
f.write(b'\n'.join(f.encode('ascii') for f in flags))
f.flush()
check_error(rt.cudaProfilerInitialize(f.name, output_file, output_mode_enum))
Reported by Pylint.
Line: 34
Column: 1
check_error(rt.cudaProfilerInitialize(f.name, output_file, output_mode_enum))
def start():
check_error(cudart().cudaProfilerStart())
def stop():
check_error(cudart().cudaProfilerStop())
Reported by Pylint.
Line: 38
Column: 1
check_error(cudart().cudaProfilerStart())
def stop():
check_error(cudart().cudaProfilerStop())
@contextlib.contextmanager
def profile():
Reported by Pylint.
Line: 43
Column: 1
@contextlib.contextmanager
def profile():
try:
start()
yield
finally:
stop()
Reported by Pylint.
test/test_jit_profiling.py
8 issues
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=profiling")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 6
Column: 5
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 7
Column: 5
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 8
Column: 13
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 9
Column: 5
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=profiling")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 1
Column: 1
import sys
sys.argv.append("--jit_executor=profiling")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.
Line: 3
Column: 1
import sys
sys.argv.append("--jit_executor=profiling")
from test_jit import * # noqa: F403
if __name__ == '__main__':
run_tests()
import test_jit_py3
suite = unittest.findTestCases(test_jit_py3)
unittest.TextTestRunner().run(suite)
Reported by Pylint.