The following issues were found
test/functional/rpc_help.py
27 issues
Line: 10
Column: 1
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
from collections import defaultdict
import os
import re
def parse_string(s):
Reported by Pylint.
Line: 11
Column: 1
from test_framework.util import assert_equal, assert_raises_rpc_error
from collections import defaultdict
import os
import re
def parse_string(s):
assert s[0] == '"'
Reported by Pylint.
Line: 12
Column: 1
from collections import defaultdict
import os
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
Reported by Pylint.
Line: 15
Column: 1
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
Reported by Pylint.
Line: 15
Column: 1
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
Reported by Pylint.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
def process_mapping(fname):
Reported by Bandit.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
def process_mapping(fname):
"""Find and parse conversion table in implementation file `fname`."""
Reported by Bandit.
Line: 25
Column: 47
"""Find and parse conversion table in implementation file `fname`."""
cmds = []
in_rpcs = False
with open(fname, "r", encoding="utf8") as f:
for line in f:
line = line.rstrip()
if not in_rpcs:
if line == 'static const CRPCConvertParam vRPCConvertParams[] =':
in_rpcs = True
Reported by Pylint.
Line: 35
Column: 21
if line.startswith('};'):
in_rpcs = False
elif '{' in line and '"' in line:
m = re.search(r'{ *("[^"]*"), *([0-9]+) *, *("[^"]*") *},', line)
assert m, 'No match to table expression: %s' % line
name = parse_string(m.group(1))
idx = int(m.group(2))
argname = parse_string(m.group(3))
cmds.append((name, idx, argname))
Reported by Pylint.
Line: 36
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
in_rpcs = False
elif '{' in line and '"' in line:
m = re.search(r'{ *("[^"]*"), *([0-9]+) *, *("[^"]*") *},', line)
assert m, 'No match to table expression: %s' % line
name = parse_string(m.group(1))
idx = int(m.group(2))
argname = parse_string(m.group(3))
cmds.append((name, idx, argname))
assert not in_rpcs and cmds
Reported by Bandit.
test/functional/rpc_txoutproof.py
26 issues
Line: 111
Column: 3
for n in self.nodes:
assert not n.verifytxoutproof(tweaked_proof.serialize().hex())
# TODO: try more variants, eg transactions at different depths, and
# verify that the proofs are invalid
if __name__ == '__main__':
MerkleBlockTest().main()
Reported by Pylint.
Line: 20
Column: 1
from test_framework.wallet import MiniWallet
class MerkleBlockTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [
[],
Reported by Pylint.
Line: 29
Column: 5
["-txindex"],
]
def run_test(self):
miniwallet = MiniWallet(self.nodes[0])
# Add enough mature utxos to the wallet, so that all txs spend confirmed coins
miniwallet.generate(5)
self.nodes[0].generate(COINBASE_MATURITY)
self.sync_all()
Reported by Pylint.
Line: 42
Column: 1
txid1 = miniwallet.send_self_transfer(from_node=self.nodes[0])['txid']
txid2 = miniwallet.send_self_transfer(from_node=self.nodes[0])['txid']
# This will raise an exception because the transaction is not yet in a block
assert_raises_rpc_error(-5, "Transaction not yet in block", self.nodes[0].gettxoutproof, [txid1])
self.nodes[0].generate(1)
blockhash = self.nodes[0].getblockhash(chain_height + 1)
self.sync_all()
Reported by Pylint.
Line: 54
Column: 1
txlist.append(blocktxn[2])
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1])), [txid1])
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1, txid2])), txlist)
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1, txid2], blockhash)), txlist)
txin_spent = miniwallet.get_utxo() # Get the change from txid2
tx3 = miniwallet.send_self_transfer(from_node=self.nodes[0], utxo_to_spend=txin_spent)
txid3 = tx3['txid']
Reported by Pylint.
Line: 55
Column: 1
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1])), [txid1])
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1, txid2])), txlist)
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid1, txid2], blockhash)), txlist)
txin_spent = miniwallet.get_utxo() # Get the change from txid2
tx3 = miniwallet.send_self_transfer(from_node=self.nodes[0], utxo_to_spend=txin_spent)
txid3 = tx3['txid']
self.nodes[0].generate(1)
Reported by Pylint.
Line: 67
Column: 1
txid_unspent = txid1 # Input was change from txid2, so txid1 should be unspent
# Invalid txids
assert_raises_rpc_error(-8, "txid must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["00000000000000000000000000000000"], blockhash)
assert_raises_rpc_error(-8, "txid must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["ZZZ0000000000000000000000000000000000000000000000000000000000000"], blockhash)
# Invalid blockhashes
assert_raises_rpc_error(-8, "blockhash must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "00000000000000000000000000000000")
assert_raises_rpc_error(-8, "blockhash must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "ZZZ0000000000000000000000000000000000000000000000000000000000000")
# We can't find the block from a fully-spent tx
Reported by Pylint.
Line: 68
Column: 1
# Invalid txids
assert_raises_rpc_error(-8, "txid must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["00000000000000000000000000000000"], blockhash)
assert_raises_rpc_error(-8, "txid must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["ZZZ0000000000000000000000000000000000000000000000000000000000000"], blockhash)
# Invalid blockhashes
assert_raises_rpc_error(-8, "blockhash must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "00000000000000000000000000000000")
assert_raises_rpc_error(-8, "blockhash must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "ZZZ0000000000000000000000000000000000000000000000000000000000000")
# We can't find the block from a fully-spent tx
assert_raises_rpc_error(-5, "Transaction not yet in block", self.nodes[0].gettxoutproof, [txid_spent])
Reported by Pylint.
Line: 70
Column: 1
assert_raises_rpc_error(-8, "txid must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["00000000000000000000000000000000"], blockhash)
assert_raises_rpc_error(-8, "txid must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["ZZZ0000000000000000000000000000000000000000000000000000000000000"], blockhash)
# Invalid blockhashes
assert_raises_rpc_error(-8, "blockhash must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "00000000000000000000000000000000")
assert_raises_rpc_error(-8, "blockhash must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "ZZZ0000000000000000000000000000000000000000000000000000000000000")
# We can't find the block from a fully-spent tx
assert_raises_rpc_error(-5, "Transaction not yet in block", self.nodes[0].gettxoutproof, [txid_spent])
# We can get the proof if we specify the block
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid_spent], blockhash)), [txid_spent])
Reported by Pylint.
Line: 71
Column: 1
assert_raises_rpc_error(-8, "txid must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, ["ZZZ0000000000000000000000000000000000000000000000000000000000000"], blockhash)
# Invalid blockhashes
assert_raises_rpc_error(-8, "blockhash must be of length 64 (not 32, for '00000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "00000000000000000000000000000000")
assert_raises_rpc_error(-8, "blockhash must be hexadecimal string (not 'ZZZ0000000000000000000000000000000000000000000000000000000000000')", self.nodes[0].gettxoutproof, [txid_spent], "ZZZ0000000000000000000000000000000000000000000000000000000000000")
# We can't find the block from a fully-spent tx
assert_raises_rpc_error(-5, "Transaction not yet in block", self.nodes[0].gettxoutproof, [txid_spent])
# We can get the proof if we specify the block
assert_equal(self.nodes[0].verifytxoutproof(self.nodes[0].gettxoutproof([txid_spent], blockhash)), [txid_spent])
# We can't get the proof if we specify a non-existent block
Reported by Pylint.
test/lint/lint-files.py
26 issues
Line: 75
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html
Checks every file in the repository against an allowed regexp to make sure only lowercase or uppercase
alphanumerics (a-zA-Z0-9), underscores (_), hyphens (-), at (@) and dots (.) are used in repository filenames.
"""
filenames = check_output(CMD_ALL_FILES, shell=True).decode("utf8").rstrip("\0").split("\0")
filename_regex = re.compile(ALLOWED_FILENAME_REGEXP)
failed_tests = 0
for filename in filenames:
if not filename_regex.match(filename):
print(
Reported by Bandit.
Line: 94
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html
Additionally there is an exception regexp for directories or files which are excepted from matching this regexp.
"""
filenames = check_output(CMD_SOURCE_FILES, shell=True).decode("utf8").rstrip("\0").split("\0")
filename_regex = re.compile(ALLOWED_SOURCE_FILENAME_REGEXP)
filename_exception_regex = re.compile(ALLOWED_SOURCE_FILENAME_EXCEPTION_REGEXP)
failed_tests = 0
for filename in filenames:
if not filename_regex.match(filename) and not filename_exception_regex.match(filename):
Reported by Bandit.
Line: 113
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html
Additionally checks that for executable files, the file contains a shebang line
"""
filenames = check_output(CMD_ALL_FILES, shell=True).decode("utf8").rstrip("\0").split("\0")
failed_tests = 0
for filename in filenames:
file_meta = FileMeta(filename)
if file_meta.permissions == ALLOWED_PERMISSION_EXECUTABLES:
with open(filename, "rb") as f:
Reported by Bandit.
Line: 158
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html
"""
Checks every file that contains a shebang line to ensure it has an executable permission
"""
filenames = check_output(CMD_SHEBANG_FILES, shell=True).decode("utf8").strip().split("\n")
# The git grep command we use returns files which contain a shebang on any line within the file
# so we need to filter the list to only files with the shebang on the first line
filenames = [filename.split(":1:")[0] for filename in filenames if ":1:" in filename]
Reported by Bandit.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2021 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
This checks that all files in the repository have correct filenames and permissions
"""
Reported by Pylint.
Line: 13
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import os
import re
import sys
from subprocess import check_output
from typing import Optional, NoReturn
CMD_ALL_FILES = "git ls-files -z --full-name"
CMD_SOURCE_FILES = 'git ls-files -z --full-name -- "*.[cC][pP][pP]" "*.[hH]" "*.[pP][yY]" "*.[sS][hH]"'
CMD_SHEBANG_FILES = "git grep --full-name --line-number -I '^#!'"
Reported by Bandit.
Line: 17
Column: 1
from typing import Optional, NoReturn
CMD_ALL_FILES = "git ls-files -z --full-name"
CMD_SOURCE_FILES = 'git ls-files -z --full-name -- "*.[cC][pP][pP]" "*.[hH]" "*.[pP][yY]" "*.[sS][hH]"'
CMD_SHEBANG_FILES = "git grep --full-name --line-number -I '^#!'"
ALLOWED_FILENAME_REGEXP = "^[a-zA-Z0-9/_.@][a-zA-Z0-9/_.@-]*$"
ALLOWED_SOURCE_FILENAME_REGEXP = "^[a-z0-9_./-]+$"
ALLOWED_SOURCE_FILENAME_EXCEPTION_REGEXP = (
"^src/(secp256k1/|univalue/|test/fuzz/FuzzedDataProvider.h)"
Reported by Pylint.
Line: 32
Column: 1
}
class FileMeta(object):
def __init__(self, file_path: str):
self.file_path = file_path
@property
def extension(self) -> Optional[str]:
Reported by Pylint.
Line: 32
Column: 1
}
class FileMeta(object):
def __init__(self, file_path: str):
self.file_path = file_path
@property
def extension(self) -> Optional[str]:
Reported by Pylint.
Line: 72
Column: 1
def check_all_filenames() -> int:
"""
Checks every file in the repository against an allowed regexp to make sure only lowercase or uppercase
alphanumerics (a-zA-Z0-9), underscores (_), hyphens (-), at (@) and dots (.) are used in repository filenames.
"""
filenames = check_output(CMD_ALL_FILES, shell=True).decode("utf8").rstrip("\0").split("\0")
filename_regex = re.compile(ALLOWED_FILENAME_REGEXP)
failed_tests = 0
Reported by Pylint.
test/functional/mempool_persist.py
25 issues
Line: 51
Column: 1
)
class MempoolPersistTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 3
self.extra_args = [[], ["-persistmempool=0"], []]
def skip_test_if_missing_module(self):
Reported by Pylint.
Line: 59
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.log.debug("Send 5 transactions from node2 (to its own address)")
tx_creation_time_lower = int(time.time())
for _ in range(5):
last_txid = self.nodes[2].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("10"))
node2_balance = self.nodes[2].getbalance()
Reported by Pylint.
Line: 83
Column: 1
self.log.info('Check the total base fee is unchanged after prioritisetransaction')
assert_equal(total_fee_old, self.nodes[0].getmempoolinfo()['total_fee'])
assert_equal(total_fee_old, sum(v['fees']['base'] for k, v in self.nodes[0].getrawmempool(verbose=True).items()))
tx_creation_time = self.nodes[0].getmempoolentry(txid=last_txid)['time']
assert_greater_than_or_equal(tx_creation_time, tx_creation_time_lower)
assert_greater_than_or_equal(tx_creation_time_higher, tx_creation_time)
Reported by Pylint.
Line: 91
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# disconnect nodes & make a txn that remains in the unbroadcast set.
self.disconnect_nodes(0, 1)
assert(len(self.nodes[0].getpeerinfo()) == 0)
assert(len(self.nodes[0].p2ps) == 0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
self.connect_nodes(0, 2)
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
Reported by Bandit.
Line: 91
Column: 1
# disconnect nodes & make a txn that remains in the unbroadcast set.
self.disconnect_nodes(0, 1)
assert(len(self.nodes[0].getpeerinfo()) == 0)
assert(len(self.nodes[0].p2ps) == 0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
self.connect_nodes(0, 2)
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
Reported by Pylint.
Line: 92
Column: 1
# disconnect nodes & make a txn that remains in the unbroadcast set.
self.disconnect_nodes(0, 1)
assert(len(self.nodes[0].getpeerinfo()) == 0)
assert(len(self.nodes[0].p2ps) == 0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
self.connect_nodes(0, 2)
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
self.stop_nodes()
Reported by Pylint.
Line: 92
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# disconnect nodes & make a txn that remains in the unbroadcast set.
self.disconnect_nodes(0, 1)
assert(len(self.nodes[0].getpeerinfo()) == 0)
assert(len(self.nodes[0].p2ps) == 0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
self.connect_nodes(0, 2)
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
self.stop_nodes()
Reported by Bandit.
Line: 96
Column: 1
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
self.connect_nodes(0, 2)
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
self.stop_nodes()
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
# Also don't store the mempool, to keep the datadir clean
self.start_node(1, extra_args=["-persistmempool=0"])
self.start_node(0)
Reported by Pylint.
Line: 103
Column: 1
self.start_node(1, extra_args=["-persistmempool=0"])
self.start_node(0)
self.start_node(2)
assert self.nodes[0].getmempoolinfo()["loaded"] # start_node is blocking on the mempool being loaded
assert self.nodes[2].getmempoolinfo()["loaded"]
assert_equal(len(self.nodes[0].getrawmempool()), 6)
assert_equal(len(self.nodes[2].getrawmempool()), 5)
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
assert_equal(len(self.nodes[1].getrawmempool()), 0)
Reported by Pylint.
Line: 103
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.start_node(1, extra_args=["-persistmempool=0"])
self.start_node(0)
self.start_node(2)
assert self.nodes[0].getmempoolinfo()["loaded"] # start_node is blocking on the mempool being loaded
assert self.nodes[2].getmempoolinfo()["loaded"]
assert_equal(len(self.nodes[0].getrawmempool()), 6)
assert_equal(len(self.nodes[2].getrawmempool()), 5)
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
assert_equal(len(self.nodes[1].getrawmempool()), 0)
Reported by Bandit.
test/functional/wallet_hd.py
25 issues
Line: 18
Column: 1
)
class WalletHDTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.extra_args = [[], ['-keypool=0']]
self.supports_cli = False
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Make sure we use hd, keep masterkeyid
hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
assert_equal(len(hd_fingerprint), 8)
# create an internal key
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Make sure we use hd, keep masterkeyid
hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
assert_equal(len(hd_fingerprint), 8)
# create an internal key
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Make sure we use hd, keep masterkeyid
hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
assert_equal(len(hd_fingerprint), 8)
# create an internal key
Reported by Pylint.
Line: 30
Column: 1
def run_test(self):
# Make sure we use hd, keep masterkeyid
hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
assert_equal(len(hd_fingerprint), 8)
# create an internal key
change_addr = self.nodes[1].getrawchangeaddress()
change_addrV = self.nodes[1].getaddressinfo(change_addr)
Reported by Pylint.
Line: 35
Column: 9
# create an internal key
change_addr = self.nodes[1].getrawchangeaddress()
change_addrV = self.nodes[1].getaddressinfo(change_addr)
if self.options.descriptors:
assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/0")
else:
assert_equal(change_addrV["hdkeypath"], "m/0'/1'/0'") #first internal child key
Reported by Pylint.
Line: 54
Column: 9
# Also send funds to each add
self.nodes[0].generate(COINBASE_MATURITY + 1)
hd_add = None
NUM_HD_ADDS = 10
for i in range(1, NUM_HD_ADDS + 1):
hd_add = self.nodes[1].getnewaddress()
hd_info = self.nodes[1].getaddressinfo(hd_add)
if self.options.descriptors:
assert_equal(hd_info["hdkeypath"], "m/84'/1'/0'/0/" + str(i))
Reported by Pylint.
Line: 70
Column: 9
# create an internal key (again)
change_addr = self.nodes[1].getrawchangeaddress()
change_addrV = self.nodes[1].getaddressinfo(change_addr)
if self.options.descriptors:
assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/1")
else:
assert_equal(change_addrV["hdkeypath"], "m/0'/1'/1'") #second internal child key
Reported by Pylint.
Line: 87
Column: 1
shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate"))
shutil.copyfile(
os.path.join(self.nodes[1].datadir, "hd.bak"),
os.path.join(self.nodes[1].datadir, self.chain, 'wallets', self.default_wallet_name, self.wallet_data_filename),
)
self.start_node(1)
# Assert that derivation is deterministic
hd_add_2 = None
Reported by Pylint.
Line: 115
Column: 1
shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate"))
shutil.copyfile(
os.path.join(self.nodes[1].datadir, "hd.bak"),
os.path.join(self.nodes[1].datadir, self.chain, "wallets", self.default_wallet_name, self.wallet_data_filename),
)
self.start_node(1, extra_args=self.extra_args[1])
self.connect_nodes(0, 1)
self.sync_all()
# Wallet automatically scans blocks older than key on startup
Reported by Pylint.
test/functional/p2p_filter.py
25 issues
Line: 33
Column: 1
from test_framework.test_framework import BitcoinTestFramework
class P2PBloomFilter(P2PInterface):
# This is a P2SH watch-only wallet
watch_script_pubkey = 'a914ffffffffffffffffffffffffffffffffffffffff87'
# The initial filter (n=10, fp=0.000001) with just the above scriptPubKey added
watch_filter_init = msg_filterload(
data=
Reported by Pylint.
Line: 39
Column: 1
# The initial filter (n=10, fp=0.000001) with just the above scriptPubKey added
watch_filter_init = msg_filterload(
data=
b'@\x00\x08\x00\x80\x00\x00 \x00\xc0\x00 \x04\x00\x08$\x00\x04\x80\x00\x00 \x00\x00\x00\x00\x80\x00\x00@\x00\x02@ \x00',
nHashFuncs=19,
nTweak=0,
nFlags=1,
)
Reported by Pylint.
Line: 68
Column: 5
self._tx_received = True
@property
def tx_received(self):
with p2p_lock:
return self._tx_received
@tx_received.setter
def tx_received(self, value):
Reported by Pylint.
Line: 78
Column: 5
self._tx_received = value
@property
def merkleblock_received(self):
with p2p_lock:
return self._merkleblock_received
@merkleblock_received.setter
def merkleblock_received(self, value):
Reported by Pylint.
Line: 88
Column: 1
self._merkleblock_received = value
class FilterTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [[
'-peerbloomfilters',
'-whitelist=noban@127.0.0.1', # immediate tx relay
Reported by Pylint.
Line: 99
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def test_size_limits(self, filter_peer):
self.log.info('Check that too large filter is rejected')
with self.nodes[0].assert_debug_log(['Misbehaving']):
filter_peer.send_and_ping(msg_filterload(data=b'\xbb'*(MAX_BLOOM_FILTER_SIZE+1)))
self.log.info('Check that max size filter is accepted')
Reported by Pylint.
Line: 111
Column: 1
self.log.info('Check that filter with too many hash functions is rejected')
with self.nodes[0].assert_debug_log(['Misbehaving']):
filter_peer.send_and_ping(msg_filterload(data=b'\xaa', nHashFuncs=MAX_BLOOM_HASH_FUNCS+1))
self.log.info('Check that filter with max hash functions is accepted')
with self.nodes[0].assert_debug_log([], unexpected_msgs=['Misbehaving']):
filter_peer.send_and_ping(msg_filterload(data=b'\xaa', nHashFuncs=MAX_BLOOM_HASH_FUNCS))
# Don't send filterclear until next two filteradd checks are done
Reported by Pylint.
Line: 128
Column: 5
filter_peer.send_and_ping(msg_filterclear())
def test_msg_mempool(self):
self.log.info("Check that a node with bloom filters enabled services p2p mempool messages")
filter_peer = P2PBloomFilter()
self.log.debug("Create a tx relevant to the peer before connecting")
filter_address = self.nodes[0].decodescript(filter_peer.watch_script_pubkey)['address']
Reported by Pylint.
Line: 142
Column: 5
filter_peer.send_message(msg_mempool())
filter_peer.wait_for_tx(txid)
def test_frelay_false(self, filter_peer):
self.log.info("Check that a node with fRelay set to false does not receive invs until the filter is set")
filter_peer.tx_received = False
filter_address = self.nodes[0].decodescript(filter_peer.watch_script_pubkey)['address']
self.nodes[0].sendtoaddress(filter_address, 90)
# Sync to make sure the reason filter_peer doesn't receive the tx is not p2p delays
Reported by Pylint.
Line: 143
Column: 1
filter_peer.wait_for_tx(txid)
def test_frelay_false(self, filter_peer):
self.log.info("Check that a node with fRelay set to false does not receive invs until the filter is set")
filter_peer.tx_received = False
filter_address = self.nodes[0].decodescript(filter_peer.watch_script_pubkey)['address']
self.nodes[0].sendtoaddress(filter_address, 90)
# Sync to make sure the reason filter_peer doesn't receive the tx is not p2p delays
filter_peer.sync_with_ping()
Reported by Pylint.
test/functional/feature_includeconf.py
25 issues
Line: 21
Column: 1
from test_framework.test_framework import BitcoinTestFramework
class IncludeConfTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
def setup_chain(self):
super().setup_chain()
Reported by Pylint.
Line: 29
Column: 1
super().setup_chain()
# Create additional config files
# - tmpdir/node0/relative.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative\n")
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
Reported by Pylint.
Line: 29
Column: 105
super().setup_chain()
# Create additional config files
# - tmpdir/node0/relative.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative\n")
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
Reported by Pylint.
Line: 32
Column: 106
with open(os.path.join(self.options.tmpdir, "node0", "relative.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative\n")
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
f.write("uacomment=main\nincludeconf=relative.conf\n")
def run_test(self):
Reported by Pylint.
Line: 32
Column: 1
with open(os.path.join(self.options.tmpdir, "node0", "relative.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative\n")
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
f.write("uacomment=main\nincludeconf=relative.conf\n")
def run_test(self):
Reported by Pylint.
Line: 34
Column: 104
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
f.write("uacomment=main\nincludeconf=relative.conf\n")
def run_test(self):
self.log.info("-includeconf works from config file. subversion should end with 'main; relative)/'")
Reported by Pylint.
Line: 34
Column: 1
# - tmpdir/node0/relative2.conf
with open(os.path.join(self.options.tmpdir, "node0", "relative2.conf"), "w", encoding="utf8") as f:
f.write("uacomment=relative2\n")
with open(os.path.join(self.options.tmpdir, "node0", "bitcoin.conf"), "a", encoding='utf8') as f:
f.write("uacomment=main\nincludeconf=relative.conf\n")
def run_test(self):
self.log.info("-includeconf works from config file. subversion should end with 'main; relative)/'")
Reported by Pylint.
Line: 38
Column: 1
f.write("uacomment=main\nincludeconf=relative.conf\n")
def run_test(self):
self.log.info("-includeconf works from config file. subversion should end with 'main; relative)/'")
subversion = self.nodes[0].getnetworkinfo()["subversion"]
assert subversion.endswith("main; relative)/")
self.log.info("-includeconf cannot be used as command-line arg")
Reported by Pylint.
Line: 41
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.log.info("-includeconf works from config file. subversion should end with 'main; relative)/'")
subversion = self.nodes[0].getnetworkinfo()["subversion"]
assert subversion.endswith("main; relative)/")
self.log.info("-includeconf cannot be used as command-line arg")
self.stop_node(0)
self.nodes[0].assert_start_raises_init_error(
extra_args=['-noincludeconf=0'],
Reported by Bandit.
Line: 47
Column: 1
self.stop_node(0)
self.nodes[0].assert_start_raises_init_error(
extra_args=['-noincludeconf=0'],
expected_msg='Error: Error parsing command line arguments: -includeconf cannot be used from commandline; -includeconf=true',
)
self.nodes[0].assert_start_raises_init_error(
extra_args=['-includeconf=relative2.conf', '-includeconf=no_warn.conf'],
expected_msg='Error: Error parsing command line arguments: -includeconf cannot be used from commandline; -includeconf="relative2.conf"',
)
Reported by Pylint.
contrib/linearize/linearize-hashes.py
24 issues
Line: 66
Column: 22
def response_is_error(resp_obj):
return 'error' in resp_obj and resp_obj['error'] is not None
def get_block_hashes(settings, max_blocks_per_call=10000):
rpc = BitcoinRPC(settings['host'], settings['port'],
settings['rpcuser'], settings['rpcpassword'])
height = settings['min_height']
while height < settings['max_height']+1:
Reported by Pylint.
Line: 95
Column: 107
def get_rpc_cookie():
# Open the cookie file
with open(os.path.join(os.path.expanduser(settings['datadir']), '.cookie'), 'r', encoding="ascii") as f:
combined = f.readline()
combined_split = combined.split(":")
settings['rpcuser'] = combined_split[0]
settings['rpcpassword'] = combined_split[1]
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
#
# linearize-hashes.py: List blocks in a linear, no-fork version of the chain.
#
# Copyright (c) 2013-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
#
# linearize-hashes.py: List blocks in a linear, no-fork version of the chain.
#
# Copyright (c) 2013-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
Reported by Pylint.
Line: 20
Column: 1
settings = {}
def hex_switchEndian(s):
""" Switches the endianness of a hex string (in pairs of hex chars) """
pairList = [s[i:i+2].encode() for i in range(0, len(s), 2)]
return b''.join(pairList[::-1]).decode()
class BitcoinRPC:
Reported by Pylint.
Line: 20
Column: 1
settings = {}
def hex_switchEndian(s):
""" Switches the endianness of a hex string (in pairs of hex chars) """
pairList = [s[i:i+2].encode() for i in range(0, len(s), 2)]
return b''.join(pairList[::-1]).decode()
class BitcoinRPC:
Reported by Pylint.
Line: 22
Column: 5
def hex_switchEndian(s):
""" Switches the endianness of a hex string (in pairs of hex chars) """
pairList = [s[i:i+2].encode() for i in range(0, len(s), 2)]
return b''.join(pairList[::-1]).decode()
class BitcoinRPC:
def __init__(self, host, port, username, password):
authpair = "%s:%s" % (username, password)
Reported by Pylint.
Line: 25
Column: 1
pairList = [s[i:i+2].encode() for i in range(0, len(s), 2)]
return b''.join(pairList[::-1]).decode()
class BitcoinRPC:
def __init__(self, host, port, username, password):
authpair = "%s:%s" % (username, password)
authpair = authpair.encode('utf-8')
self.authhdr = b"Basic " + base64.b64encode(authpair)
self.conn = HTTPConnection(host, port=port, timeout=30)
Reported by Pylint.
Line: 32
Column: 5
self.authhdr = b"Basic " + base64.b64encode(authpair)
self.conn = HTTPConnection(host, port=port, timeout=30)
def execute(self, obj):
try:
self.conn.request('POST', '/', json.dumps(obj),
{ 'Authorization' : self.authhdr,
'Content-type' : 'application/json' })
except ConnectionRefusedError:
Reported by Pylint.
Line: 52
Column: 5
return resp_obj
@staticmethod
def build_request(idx, method, params):
obj = { 'version' : '1.1',
'method' : method,
'id' : idx }
if params is None:
obj['params'] = []
Reported by Pylint.
test/functional/wallet_backup.py
24 issues
Line: 46
Column: 1
)
class WalletBackupTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 4
self.setup_clean_chain = True
# nodes 1, 2,3 are spenders, let's give them a keypool=100
# whitelist all peers to speed up tx relay / mempool sync
Reported by Pylint.
Line: 71
Column: 5
self.connect_nodes(2, 0)
self.sync_all()
def one_send(self, from_node, to_address):
if (randint(1,2) == 1):
amount = Decimal(randint(1,10)) / Decimal(10)
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
Reported by Pylint.
Line: 72
Column: 1
self.sync_all()
def one_send(self, from_node, to_address):
if (randint(1,2) == 1):
amount = Decimal(randint(1,10)) / Decimal(10)
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
Reported by Pylint.
Line: 72
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
self.sync_all()
def one_send(self, from_node, to_address):
if (randint(1,2) == 1):
amount = Decimal(randint(1,10)) / Decimal(10)
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
Reported by Bandit.
Line: 73
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
def one_send(self, from_node, to_address):
if (randint(1,2) == 1):
amount = Decimal(randint(1,10)) / Decimal(10)
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
a1 = self.nodes[1].getnewaddress()
Reported by Bandit.
Line: 76
Column: 5
amount = Decimal(randint(1,10)) / Decimal(10)
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
a1 = self.nodes[1].getnewaddress()
a2 = self.nodes[2].getnewaddress()
self.one_send(0, a1)
Reported by Pylint.
Line: 77
Column: 9
self.nodes[from_node].sendtoaddress(to_address, amount)
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
a1 = self.nodes[1].getnewaddress()
a2 = self.nodes[2].getnewaddress()
self.one_send(0, a1)
self.one_send(0, a2)
Reported by Pylint.
Line: 78
Column: 9
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
a1 = self.nodes[1].getnewaddress()
a2 = self.nodes[2].getnewaddress()
self.one_send(0, a1)
self.one_send(0, a2)
self.one_send(1, a0)
Reported by Pylint.
Line: 79
Column: 9
def do_one_round(self):
a0 = self.nodes[0].getnewaddress()
a1 = self.nodes[1].getnewaddress()
a2 = self.nodes[2].getnewaddress()
self.one_send(0, a1)
self.one_send(0, a2)
self.one_send(1, a0)
self.one_send(1, a2)
Reported by Pylint.
Line: 95
Column: 5
self.sync_blocks()
# As above, this mirrors the original bash test.
def start_three(self, args=()):
self.start_node(0, self.extra_args[0] + list(args))
self.start_node(1, self.extra_args[1] + list(args))
self.start_node(2, self.extra_args[2] + list(args))
self.connect_nodes(0, 3)
self.connect_nodes(1, 3)
Reported by Pylint.
test/functional/mocks/signer.py
23 issues
Line: 20
Column: 1
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 20
Column: 15
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 20
Column: 15
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 23
Column: 20
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
sys.stdout.write(json.dumps({
"receive": [
"pkh([00000001/44'/1'/" + args.account + "']" + xpub + "/0/*)#vt6w3l3j",
Reported by Pylint.
Line: 40
Column: 20
}))
def displayaddress(args):
# Several descriptor formats are acceptable, so allowing for potential
# changes to InferDescriptor:
if args.fingerprint != "00000001":
return sys.stdout.write(json.dumps({"error": "Unexpected fingerprint", "fingerprint": args.fingerprint}))
Reported by Pylint.
Line: 54
Column: 12
return sys.stdout.write(json.dumps({"address": "bcrt1qm90ugl4d48jv8n6e5t9ln6t9zlpm5th68x4f8g"}))
def signtx(args):
if args.fingerprint != "00000001":
return sys.stdout.write(json.dumps({"error": "Unexpected fingerprint", "fingerprint": args.fingerprint}))
with open(os.path.join(os.getcwd(), "mock_psbt"), "r", encoding="utf8") as f:
mock_psbt = f.read()
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
import os
import sys
import argparse
import json
Reported by Pylint.
Line: 11
Column: 1
import argparse
import json
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
Reported by Pylint.
Line: 13
Column: 1
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
Reported by Pylint.
Line: 14
Column: 62
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
Reported by Pylint.