The following issues were found
test/functional/tool_wallet.py
103 issues
Line: 54
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b303-md5
assert_equal(p.poll(), 0)
def wallet_shasum(self):
h = hashlib.sha1()
mv = memoryview(bytearray(BUFFER_SIZE))
with open(self.wallet_path, 'rb', buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
Reported by Bandit.
Line: 69
Column: 24
def log_wallet_timestamp_comparison(self, old, new):
result = 'unchanged' if new == old else 'increased!'
self.log.debug('Wallet file timestamp {}'.format(result))
def get_expected_info_output(self, name="", transactions=0, keypool=2, address=0):
wallet_name = self.default_wallet_name if name == "" else name
output_types = 3 # p2pkh, p2sh, segwit
if self.options.descriptors:
Reported by Pylint.
Line: 210
Column: 3
self.stop_node(0)
self.log.info('Calling wallet tool info, testing output')
#
# TODO: Wallet tool info should work with wallet file permissions set to
# read-only without raising:
# "Error loading wallet.dat. Is wallet being used by another process?"
# The following lines should be uncommented and the tests still succeed:
#
# self.log.debug('Setting wallet file permissions to 400 (read-only)')
Reported by Pylint.
Line: 220
Column: 24
# assert self.wallet_permissions() in ['400', '666'] # Sanity check. 666 because Appveyor.
# shasum_before = self.wallet_shasum()
timestamp_before = self.wallet_timestamp()
self.log.debug('Wallet file timestamp before calling info: {}'.format(timestamp_before))
out = self.get_expected_info_output(address=1)
self.assert_tool_output(out, '-wallet=' + self.default_wallet_name, 'info')
timestamp_after = self.wallet_timestamp()
self.log.debug('Wallet file timestamp after calling info: {}'.format(timestamp_after))
self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after)
Reported by Pylint.
Line: 224
Column: 24
out = self.get_expected_info_output(address=1)
self.assert_tool_output(out, '-wallet=' + self.default_wallet_name, 'info')
timestamp_after = self.wallet_timestamp()
self.log.debug('Wallet file timestamp after calling info: {}'.format(timestamp_after))
self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after)
self.log.debug('Setting wallet file permissions back to 600 (read/write)')
os.chmod(self.wallet_path, stat.S_IRUSR | stat.S_IWUSR)
assert self.wallet_permissions() in ['600', '666'] # Sanity check. 666 because Appveyor.
#
Reported by Pylint.
Line: 230
Column: 3
os.chmod(self.wallet_path, stat.S_IRUSR | stat.S_IWUSR)
assert self.wallet_permissions() in ['600', '666'] # Sanity check. 666 because Appveyor.
#
# TODO: Wallet tool info should not write to the wallet file.
# The following lines should be uncommented and the tests still succeed:
#
# assert_equal(timestamp_before, timestamp_after)
# shasum_after = self.wallet_shasum()
# assert_equal(shasum_before, shasum_after)
Reported by Pylint.
Line: 251
Column: 24
self.log.info('Calling wallet tool info after generating a transaction, testing output')
shasum_before = self.wallet_shasum()
timestamp_before = self.wallet_timestamp()
self.log.debug('Wallet file timestamp before calling info: {}'.format(timestamp_before))
out = self.get_expected_info_output(transactions=1, address=1)
self.assert_tool_output(out, '-wallet=' + self.default_wallet_name, 'info')
shasum_after = self.wallet_shasum()
timestamp_after = self.wallet_timestamp()
self.log.debug('Wallet file timestamp after calling info: {}'.format(timestamp_after))
Reported by Pylint.
Line: 256
Column: 24
self.assert_tool_output(out, '-wallet=' + self.default_wallet_name, 'info')
shasum_after = self.wallet_shasum()
timestamp_after = self.wallet_timestamp()
self.log.debug('Wallet file timestamp after calling info: {}'.format(timestamp_after))
self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after)
#
# TODO: Wallet tool info should not write to the wallet file.
# This assertion should be uncommented and succeed:
# assert_equal(timestamp_before, timestamp_after)
Reported by Pylint.
Line: 259
Column: 3
self.log.debug('Wallet file timestamp after calling info: {}'.format(timestamp_after))
self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after)
#
# TODO: Wallet tool info should not write to the wallet file.
# This assertion should be uncommented and succeed:
# assert_equal(timestamp_before, timestamp_after)
assert_equal(shasum_before, shasum_after)
self.log.debug('Wallet file shasum unchanged\n')
Reported by Pylint.
Line: 269
Column: 24
self.log.info('Calling wallet tool create on an existing wallet, testing output')
shasum_before = self.wallet_shasum()
timestamp_before = self.wallet_timestamp()
self.log.debug('Wallet file timestamp before calling create: {}'.format(timestamp_before))
out = "Topping up keypool...\n" + self.get_expected_info_output(name="foo", keypool=2000)
self.assert_tool_output(out, '-wallet=foo', 'create')
shasum_after = self.wallet_shasum()
timestamp_after = self.wallet_timestamp()
self.log.debug('Wallet file timestamp after calling create: {}'.format(timestamp_after))
Reported by Pylint.
test/functional/feature_csv_activation.py
102 issues
Line: 105
Column: 36
tx = self.miniwallet.create_self_transfer(from_node=self.nodes[0], utxo_to_spend=utxo)['tx']
return tx
def create_bip112special(self, input, txversion):
tx = self.create_self_transfer_from_utxo(input)
tx.nVersion = txversion
self.miniwallet.sign_tx(tx)
tx.vin[0].scriptSig = CScript([-1, OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(tx.vin[0].scriptSig)))
return tx
Reported by Pylint.
Line: 112
Column: 39
tx.vin[0].scriptSig = CScript([-1, OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(tx.vin[0].scriptSig)))
return tx
def create_bip112emptystack(self, input, txversion):
tx = self.create_self_transfer_from_utxo(input)
tx.nVersion = txversion
self.miniwallet.sign_tx(tx)
tx.vin[0].scriptSig = CScript([OP_CHECKSEQUENCEVERIFY] + list(CScript(tx.vin[0].scriptSig)))
return tx
Reported by Pylint.
Line: 166
Column: 13
block = self.create_test_block([])
test_blocks.append(block)
self.last_block_time += 600
self.tip = block.sha256
self.tipheight += 1
return test_blocks
def create_test_block(self, txs):
block = create_block(self.tip, create_coinbase(self.tipheight + 1), self.last_block_time + 600)
Reported by Pylint.
Line: 186
Column: 9
self.helper_peer.send_blocks_and_test(blocks, self.nodes[0], success=success, reject_reason=reject_reason)
def run_test(self):
self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore())
self.miniwallet = MiniWallet(self.nodes[0], mode=MiniWalletMode.RAW_P2PK)
self.log.info("Generate blocks in the past for coinbase outputs.")
long_past_time = int(time.time()) - 600 * 1000 # enough to build up to 1000 blocks 10 minutes apart without worrying about getting into the future
self.nodes[0].setmocktime(long_past_time - 100) # enough so that the generated blocks will still all be before long_past_time
Reported by Pylint.
Line: 187
Column: 9
def run_test(self):
self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore())
self.miniwallet = MiniWallet(self.nodes[0], mode=MiniWalletMode.RAW_P2PK)
self.log.info("Generate blocks in the past for coinbase outputs.")
long_past_time = int(time.time()) - 600 * 1000 # enough to build up to 1000 blocks 10 minutes apart without worrying about getting into the future
self.nodes[0].setmocktime(long_past_time - 100) # enough so that the generated blocks will still all be before long_past_time
self.coinbase_blocks = self.miniwallet.generate(COINBASE_BLOCK_COUNT) # blocks generated for inputs
Reported by Pylint.
Line: 192
Column: 9
self.log.info("Generate blocks in the past for coinbase outputs.")
long_past_time = int(time.time()) - 600 * 1000 # enough to build up to 1000 blocks 10 minutes apart without worrying about getting into the future
self.nodes[0].setmocktime(long_past_time - 100) # enough so that the generated blocks will still all be before long_past_time
self.coinbase_blocks = self.miniwallet.generate(COINBASE_BLOCK_COUNT) # blocks generated for inputs
self.nodes[0].setmocktime(0) # set time back to present so yielded blocks aren't in the future as we advance last_block_time
self.tipheight = COINBASE_BLOCK_COUNT # height of the next block to build
self.last_block_time = long_past_time
self.tip = int(self.nodes[0].getbestblockhash(), 16)
Reported by Pylint.
Line: 194
Column: 9
self.nodes[0].setmocktime(long_past_time - 100) # enough so that the generated blocks will still all be before long_past_time
self.coinbase_blocks = self.miniwallet.generate(COINBASE_BLOCK_COUNT) # blocks generated for inputs
self.nodes[0].setmocktime(0) # set time back to present so yielded blocks aren't in the future as we advance last_block_time
self.tipheight = COINBASE_BLOCK_COUNT # height of the next block to build
self.last_block_time = long_past_time
self.tip = int(self.nodes[0].getbestblockhash(), 16)
# Activation height is hardcoded
# We advance to block height five below BIP112 activation for the following tests
Reported by Pylint.
Line: 195
Column: 9
self.coinbase_blocks = self.miniwallet.generate(COINBASE_BLOCK_COUNT) # blocks generated for inputs
self.nodes[0].setmocktime(0) # set time back to present so yielded blocks aren't in the future as we advance last_block_time
self.tipheight = COINBASE_BLOCK_COUNT # height of the next block to build
self.last_block_time = long_past_time
self.tip = int(self.nodes[0].getbestblockhash(), 16)
# Activation height is hardcoded
# We advance to block height five below BIP112 activation for the following tests
test_blocks = self.generate_blocks(CSV_ACTIVATION_HEIGHT-5 - COINBASE_BLOCK_COUNT)
Reported by Pylint.
Line: 196
Column: 9
self.nodes[0].setmocktime(0) # set time back to present so yielded blocks aren't in the future as we advance last_block_time
self.tipheight = COINBASE_BLOCK_COUNT # height of the next block to build
self.last_block_time = long_past_time
self.tip = int(self.nodes[0].getbestblockhash(), 16)
# Activation height is hardcoded
# We advance to block height five below BIP112 activation for the following tests
test_blocks = self.generate_blocks(CSV_ACTIVATION_HEIGHT-5 - COINBASE_BLOCK_COUNT)
self.send_blocks(test_blocks)
Reported by Pylint.
Line: 240
Column: 9
self.nodes[0].setmocktime(self.last_block_time + 600)
inputblockhash = self.nodes[0].generate(1)[0] # 1 block generated for inputs to be in chain at height 431
self.nodes[0].setmocktime(0)
self.tip = int(inputblockhash, 16)
self.tipheight += 1
self.last_block_time += 600
assert_equal(len(self.nodes[0].getblock(inputblockhash, True)["tx"]), TESTING_TX_COUNT + 1)
# 2 more version 4 blocks
Reported by Pylint.
test/functional/wallet_bumpfee.py
99 issues
Line: 70
Column: 9
self.nodes[1].encryptwallet(WALLET_PASSPHRASE)
self.nodes[1].walletpassphrase(WALLET_PASSPHRASE, WALLET_PASSPHRASE_TIMEOUT)
peer_node, rbf_node = self.nodes
rbf_node_address = rbf_node.getnewaddress()
# fund rbf node with 10 coins of 0.001 btc (100,000 satoshis)
self.log.info("Mining blocks...")
peer_node.generate(110)
Reported by Pylint.
Line: 36
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b105_hardcoded_password_string.html
assert_raises_rpc_error,
)
WALLET_PASSPHRASE = "test"
WALLET_PASSPHRASE_TIMEOUT = 3600
# Fee rates (sat/vB)
INSUFFICIENT = 1
ECONOMICAL = 50
Reported by Bandit.
Line: 47
Column: 1
TOO_HIGH = 100000
class BumpFeeTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [[
"-walletrbf={}".format(i),
Reported by Pylint.
Line: 60
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def clear_mempool(self):
# Clear mempool between subtests. The subtests may only depend on chainstate (utxos)
self.nodes[1].generate(1)
self.sync_all()
def run_test(self):
Reported by Pylint.
Line: 107
Column: 5
test_small_output_with_feerate_succeeds(self, rbf_node, dest_address)
test_no_more_inputs_fails(self, rbf_node, dest_address)
def test_invalid_parameters(self, rbf_node, peer_node, dest_address):
self.log.info('Test invalid parameters')
rbfid = spend_one_input(rbf_node, dest_address)
self.sync_mempools((rbf_node, peer_node))
assert rbfid in rbf_node.getrawmempool() and rbfid in peer_node.getrawmempool()
Reported by Pylint.
Line: 111
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.log.info('Test invalid parameters')
rbfid = spend_one_input(rbf_node, dest_address)
self.sync_mempools((rbf_node, peer_node))
assert rbfid in rbf_node.getrawmempool() and rbfid in peer_node.getrawmempool()
for key in ["totalFee", "feeRate"]:
assert_raises_rpc_error(-3, "Unexpected key {}".format(key), rbf_node.bumpfee, rbfid, {key: NORMAL})
# Bumping to just above minrelay should fail to increase the total fee enough.
Reported by Bandit.
Line: 114
Column: 1
assert rbfid in rbf_node.getrawmempool() and rbfid in peer_node.getrawmempool()
for key in ["totalFee", "feeRate"]:
assert_raises_rpc_error(-3, "Unexpected key {}".format(key), rbf_node.bumpfee, rbfid, {key: NORMAL})
# Bumping to just above minrelay should fail to increase the total fee enough.
assert_raises_rpc_error(-8, "Insufficient total fee 0.00000141", rbf_node.bumpfee, rbfid, {"fee_rate": INSUFFICIENT})
self.log.info("Test invalid fee rate settings")
Reported by Pylint.
Line: 117
Column: 1
assert_raises_rpc_error(-3, "Unexpected key {}".format(key), rbf_node.bumpfee, rbfid, {key: NORMAL})
# Bumping to just above minrelay should fail to increase the total fee enough.
assert_raises_rpc_error(-8, "Insufficient total fee 0.00000141", rbf_node.bumpfee, rbfid, {"fee_rate": INSUFFICIENT})
self.log.info("Test invalid fee rate settings")
assert_raises_rpc_error(-4, "Specified or calculated fee 0.141 is too high (cannot be higher than -maxtxfee 0.10",
rbf_node.bumpfee, rbfid, {"fee_rate": TOO_HIGH})
# Test fee_rate with zero values.
Reported by Pylint.
Line: 120
Column: 1
assert_raises_rpc_error(-8, "Insufficient total fee 0.00000141", rbf_node.bumpfee, rbfid, {"fee_rate": INSUFFICIENT})
self.log.info("Test invalid fee rate settings")
assert_raises_rpc_error(-4, "Specified or calculated fee 0.141 is too high (cannot be higher than -maxtxfee 0.10",
rbf_node.bumpfee, rbfid, {"fee_rate": TOO_HIGH})
# Test fee_rate with zero values.
msg = "Insufficient total fee 0.00"
for zero_value in [0, 0.000, 0.00000000, "0", "0.000", "0.00000000"]:
assert_raises_rpc_error(-8, msg, rbf_node.bumpfee, rbfid, {"fee_rate": zero_value})
Reported by Pylint.
Line: 128
Column: 1
assert_raises_rpc_error(-8, msg, rbf_node.bumpfee, rbfid, {"fee_rate": zero_value})
msg = "Invalid amount"
# Test fee_rate values that don't pass fixed-point parsing checks.
for invalid_value in ["", 0.000000001, 1e-09, 1.111111111, 1111111111111111, "31.999999999999999999999"]:
assert_raises_rpc_error(-3, msg, rbf_node.bumpfee, rbfid, {"fee_rate": invalid_value})
# Test fee_rate values that cannot be represented in sat/vB.
for invalid_value in [0.0001, 0.00000001, 0.00099999, 31.99999999, "0.0001", "0.00000001", "0.00099999", "31.99999999"]:
assert_raises_rpc_error(-3, msg, rbf_node.bumpfee, rbfid, {"fee_rate": invalid_value})
# Test fee_rate out of range (negative number).
Reported by Pylint.
test/functional/test_framework/siphash.py
86 issues
Line: 10
Column: 1
This implements SipHash-2-4 for 256-bit integers.
"""
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
Reported by Pylint.
Line: 10
Column: 1
This implements SipHash-2-4 for 256-bit integers.
"""
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
Reported by Pylint.
Line: 10
Column: 1
This implements SipHash-2-4 for 256-bit integers.
"""
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
Reported by Pylint.
Line: 13
Column: 1
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
v1 ^= v0
v0 = rotl64(v0, 32)
v2 = (v2 + v3) & ((1 << 64) - 1)
Reported by Pylint.
Line: 13
Column: 1
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
v1 ^= v0
v0 = rotl64(v0, 32)
v2 = (v2 + v3) & ((1 << 64) - 1)
Reported by Pylint.
Line: 13
Column: 1
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
v1 ^= v0
v0 = rotl64(v0, 32)
v2 = (v2 + v3) & ((1 << 64) - 1)
Reported by Pylint.
Line: 13
Column: 1
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
v1 ^= v0
v0 = rotl64(v0, 32)
v2 = (v2 + v3) & ((1 << 64) - 1)
Reported by Pylint.
Line: 13
Column: 1
def rotl64(n, b):
return n >> (64 - b) | (n & ((1 << (64 - b)) - 1)) << b
def siphash_round(v0, v1, v2, v3):
v0 = (v0 + v1) & ((1 << 64) - 1)
v1 = rotl64(v1, 13)
v1 ^= v0
v0 = rotl64(v0, 32)
v2 = (v2 + v3) & ((1 << 64) - 1)
Reported by Pylint.
Line: 30
Column: 1
v2 = rotl64(v2, 32)
return (v0, v1, v2, v3)
def siphash256(k0, k1, h):
n0 = h & ((1 << 64) - 1)
n1 = (h >> 64) & ((1 << 64) - 1)
n2 = (h >> 128) & ((1 << 64) - 1)
n3 = (h >> 192) & ((1 << 64) - 1)
v0 = 0x736f6d6570736575 ^ k0
Reported by Pylint.
Line: 30
Column: 1
v2 = rotl64(v2, 32)
return (v0, v1, v2, v3)
def siphash256(k0, k1, h):
n0 = h & ((1 << 64) - 1)
n1 = (h >> 64) & ((1 << 64) - 1)
n2 = (h >> 128) & ((1 << 64) - 1)
n3 = (h >> 192) & ((1 << 64) - 1)
v0 = 0x736f6d6570736575 ^ k0
Reported by Pylint.
test/functional/wallet_importdescriptors.py
85 issues
Line: 32
Column: 1
test_address,
)
class ImportDescriptorsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-addresstype=legacy"],
["-addresstype=bech32", "-keypool=5"]
]
Reported by Pylint.
Line: 45
Column: 5
self.skip_if_no_wallet()
self.skip_if_no_sqlite()
def test_importdesc(self, req, success, error_code=None, error_message=None, warnings=None, wallet=None):
"""Run importdescriptors and assert success"""
if warnings is None:
warnings = []
wrpc = self.nodes[1].get_wallet_rpc('w1')
if wallet is not None:
Reported by Pylint.
Line: 45
Column: 1
self.skip_if_no_wallet()
self.skip_if_no_sqlite()
def test_importdesc(self, req, success, error_code=None, error_message=None, warnings=None, wallet=None):
"""Run importdescriptors and assert success"""
if warnings is None:
warnings = []
wrpc = self.nodes[1].get_wallet_rpc('w1')
if wallet is not None:
Reported by Pylint.
Line: 56
Column: 1
result = wrpc.importdescriptors([req])
observed_warnings = []
if 'warnings' in result[0]:
observed_warnings = result[0]['warnings']
assert_equal("\n".join(sorted(warnings)), "\n".join(sorted(observed_warnings)))
assert_equal(result[0]['success'], success)
if error_code is not None:
assert_equal(result[0]['error']['code'], error_code)
assert_equal(result[0]['error']['message'], error_message)
Reported by Pylint.
Line: 63
Column: 5
assert_equal(result[0]['error']['code'], error_code)
assert_equal(result[0]['error']['message'], error_message)
def run_test(self):
self.log.info('Setting up wallets')
self.nodes[0].createwallet(wallet_name='w0', disable_private_keys=False, descriptors=True)
w0 = self.nodes[0].get_wallet_rpc('w0')
self.nodes[1].createwallet(wallet_name='w1', disable_private_keys=True, blank=True, descriptors=True)
Reported by Pylint.
Line: 63
Column: 5
assert_equal(result[0]['error']['code'], error_code)
assert_equal(result[0]['error']['message'], error_message)
def run_test(self):
self.log.info('Setting up wallets')
self.nodes[0].createwallet(wallet_name='w0', disable_private_keys=False, descriptors=True)
w0 = self.nodes[0].get_wallet_rpc('w0')
self.nodes[1].createwallet(wallet_name='w1', disable_private_keys=True, blank=True, descriptors=True)
Reported by Pylint.
Line: 66
Column: 9
def run_test(self):
self.log.info('Setting up wallets')
self.nodes[0].createwallet(wallet_name='w0', disable_private_keys=False, descriptors=True)
w0 = self.nodes[0].get_wallet_rpc('w0')
self.nodes[1].createwallet(wallet_name='w1', disable_private_keys=True, blank=True, descriptors=True)
w1 = self.nodes[1].get_wallet_rpc('w1')
assert_equal(w1.getwalletinfo()['keypoolsize'], 0)
Reported by Pylint.
Line: 68
Column: 1
self.nodes[0].createwallet(wallet_name='w0', disable_private_keys=False, descriptors=True)
w0 = self.nodes[0].get_wallet_rpc('w0')
self.nodes[1].createwallet(wallet_name='w1', disable_private_keys=True, blank=True, descriptors=True)
w1 = self.nodes[1].get_wallet_rpc('w1')
assert_equal(w1.getwalletinfo()['keypoolsize'], 0)
self.nodes[1].createwallet(wallet_name="wpriv", disable_private_keys=False, blank=True, descriptors=True)
wpriv = self.nodes[1].get_wallet_rpc("wpriv")
Reported by Pylint.
Line: 69
Column: 9
w0 = self.nodes[0].get_wallet_rpc('w0')
self.nodes[1].createwallet(wallet_name='w1', disable_private_keys=True, blank=True, descriptors=True)
w1 = self.nodes[1].get_wallet_rpc('w1')
assert_equal(w1.getwalletinfo()['keypoolsize'], 0)
self.nodes[1].createwallet(wallet_name="wpriv", disable_private_keys=False, blank=True, descriptors=True)
wpriv = self.nodes[1].get_wallet_rpc("wpriv")
assert_equal(wpriv.getwalletinfo()['keypoolsize'], 0)
Reported by Pylint.
Line: 72
Column: 1
w1 = self.nodes[1].get_wallet_rpc('w1')
assert_equal(w1.getwalletinfo()['keypoolsize'], 0)
self.nodes[1].createwallet(wallet_name="wpriv", disable_private_keys=False, blank=True, descriptors=True)
wpriv = self.nodes[1].get_wallet_rpc("wpriv")
assert_equal(wpriv.getwalletinfo()['keypoolsize'], 0)
self.log.info('Mining coins')
w0.generatetoaddress(COINBASE_MATURITY + 1, w0.getnewaddress())
Reported by Pylint.
test/functional/wallet_address_types.py
85 issues
Line: 259
Column: 27
address_type = 'p2sh-segwit'
else:
address_type = 'legacy'
self.log.info("Sending from node {} ({}) with{} multisig using {}".format(from_node, self.extra_args[from_node], "" if multisig else "out", "default" if address_type is None else address_type))
old_balances = self.get_balances()
self.log.debug("Old balances are {}".format(old_balances))
to_send = (old_balances[from_node] / (COINBASE_MATURITY + 1)).quantize(Decimal("0.00000001"))
sends = {}
addresses = {}
Reported by Pylint.
Line: 261
Column: 28
address_type = 'legacy'
self.log.info("Sending from node {} ({}) with{} multisig using {}".format(from_node, self.extra_args[from_node], "" if multisig else "out", "default" if address_type is None else address_type))
old_balances = self.get_balances()
self.log.debug("Old balances are {}".format(old_balances))
to_send = (old_balances[from_node] / (COINBASE_MATURITY + 1)).quantize(Decimal("0.00000001"))
sends = {}
addresses = {}
self.log.debug("Prepare sends")
Reported by Pylint.
Line: 297
Column: 28
sends[address] = to_send * 10 * (1 + n)
addresses[to_node] = (address, typ)
self.log.debug("Sending: {}".format(sends))
self.nodes[from_node].sendmany("", sends)
self.sync_mempools()
unconf_balances = self.get_balances('untrusted_pending')
self.log.debug("Check unconfirmed balances: {}".format(unconf_balances))
Reported by Pylint.
Line: 302
Column: 28
self.sync_mempools()
unconf_balances = self.get_balances('untrusted_pending')
self.log.debug("Check unconfirmed balances: {}".format(unconf_balances))
assert_equal(unconf_balances[from_node], 0)
for n, to_node in enumerate(range(from_node + 1, from_node + 4)):
to_node %= 4
assert_equal(unconf_balances[to_node], to_send * 10 * (2 + n))
Reported by Pylint.
Line: 324
Column: 28
assert found
new_balances = self.get_balances()
self.log.debug("Check new balances: {}".format(new_balances))
# We don't know what fee was set, so we can only check bounds on the balance of the sending node
assert_greater_than(new_balances[from_node], to_send * 10)
assert_greater_than(to_send * 11, new_balances[from_node])
for n, to_node in enumerate(range(from_node + 1, from_node + 4)):
to_node %= 4
Reported by Pylint.
Line: 378
Column: 3
if self.options.descriptors:
self.log.info("Descriptor wallets do not have bech32m addresses by default yet")
# TODO: Remove this when they do
assert_raises_rpc_error(-12, "Error: No bech32m addresses available", self.nodes[0].getnewaddress, "", "bech32m")
assert_raises_rpc_error(-12, "Error: No bech32m addresses available", self.nodes[0].getrawchangeaddress, "bech32m")
else:
self.log.info("Legacy wallets cannot make bech32m addresses")
assert_raises_rpc_error(-8, "Legacy wallets cannot provide bech32m addresses", self.nodes[0].getnewaddress, "", "bech32m")
Reported by Pylint.
Line: 68
Column: 1
assert_raises_rpc_error,
)
class AddressTypeTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 6
self.extra_args = [
["-addresstype=legacy"],
["-addresstype=p2sh-segwit"],
Reported by Pylint.
Line: 103
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_address(self, node, address, multisig, typ):
"""Run sanity checks on an address."""
info = self.nodes[node].getaddressinfo(address)
assert self.nodes[node].validateaddress(address)['isvalid']
assert_equal(info.get('solvable'), True)
if not multisig and typ == 'legacy':
# P2PKH
assert not info['isscript']
Reported by Bandit.
Line: 108
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if not multisig and typ == 'legacy':
# P2PKH
assert not info['isscript']
assert not info['iswitness']
assert 'pubkey' in info
elif not multisig and typ == 'p2sh-segwit':
# P2SH-P2WPKH
assert info['isscript']
Reported by Bandit.
Line: 109
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if not multisig and typ == 'legacy':
# P2PKH
assert not info['isscript']
assert not info['iswitness']
assert 'pubkey' in info
elif not multisig and typ == 'p2sh-segwit':
# P2SH-P2WPKH
assert info['isscript']
assert not info['iswitness']
Reported by Bandit.
test/functional/test_runner.py
83 issues
Line: 43
Column: 23
CROSS = "x "
CIRCLE = "o "
if os.name != 'nt' or sys.getwindowsversion() >= (10, 0, 14393):
if os.name == 'nt':
import ctypes
kernel32 = ctypes.windll.kernel32 # type: ignore
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 4
STD_OUTPUT_HANDLE = -11
Reported by Pylint.
Line: 349
Column: 9
args, unknown_args = parser.parse_known_args()
if not args.ansi:
global BOLD, GREEN, RED, GREY
BOLD = ("", "")
GREEN = ("", "")
RED = ("", "")
GREY = ("", "")
Reported by Pylint.
Line: 375
Column: 5
os.makedirs(tmpdir)
logging.debug("Temporary test directory at %s" % tmpdir)
enable_bitcoind = config["components"].getboolean("ENABLE_BITCOIND")
if not enable_bitcoind:
print("No functional tests to run.")
Reported by Pylint.
Line: 463
Column: 12
# Warn if bitcoind is already running
try:
# pgrep exits with code zero when one or more matching processes found
if subprocess.run(["pgrep", "-x", "bitcoind"], stdout=subprocess.DEVNULL).returncode == 0:
print("%sWARNING!%s There is already a bitcoind process running on this system. Tests may fail unexpectedly due to resource contention!" % (BOLD[1], BOLD[0]))
except OSError:
# pgrep not supported
pass
Reported by Pylint.
Line: 491
Column: 9
if enable_coverage:
coverage = RPCCoverage()
flags.append(coverage.flag)
logging.debug("Initializing coverage directory at %s" % coverage.dir)
else:
coverage = None
if len(test_list) > 1 and jobs > 1:
# Populate cache
Reported by Pylint.
Line: 518
Column: 39
max_len_name = len(max(test_list, key=len))
test_count = len(test_list)
for i in range(test_count):
test_result, testdir, stdout, stderr = job_queue.get_next()
test_results.append(test_result)
done_str = "{}/{} - {}{}{}".format(i + 1, test_count, BOLD[1], test_result.name, BOLD[0])
if test_result.status == "Passed":
logging.debug("%s passed, Duration: %s s" % (done_str, test_result.time))
elif test_result.status == "Skipped":
Reported by Pylint.
Line: 518
Column: 31
max_len_name = len(max(test_list, key=len))
test_count = len(test_list)
for i in range(test_count):
test_result, testdir, stdout, stderr = job_queue.get_next()
test_results.append(test_result)
done_str = "{}/{} - {}{}{}".format(i + 1, test_count, BOLD[1], test_result.name, BOLD[0])
if test_result.status == "Passed":
logging.debug("%s passed, Duration: %s s" % (done_str, test_result.time))
elif test_result.status == "Skipped":
Reported by Pylint.
Line: 522
Column: 13
test_results.append(test_result)
done_str = "{}/{} - {}{}{}".format(i + 1, test_count, BOLD[1], test_result.name, BOLD[0])
if test_result.status == "Passed":
logging.debug("%s passed, Duration: %s s" % (done_str, test_result.time))
elif test_result.status == "Skipped":
logging.debug("%s skipped" % (done_str))
else:
print("%s failed, Duration: %s s\n" % (done_str, test_result.time))
print(BOLD[1] + 'stdout:\n' + BOLD[0] + stdout + '\n')
Reported by Pylint.
Line: 524
Column: 13
if test_result.status == "Passed":
logging.debug("%s passed, Duration: %s s" % (done_str, test_result.time))
elif test_result.status == "Skipped":
logging.debug("%s skipped" % (done_str))
else:
print("%s failed, Duration: %s s\n" % (done_str, test_result.time))
print(BOLD[1] + 'stdout:\n' + BOLD[0] + stdout + '\n')
print(BOLD[1] + 'stderr:\n' + BOLD[0] + stderr + '\n')
if combined_logs_len and os.path.isdir(testdir):
Reported by Pylint.
Line: 642
Column: 21
for job in self.jobs:
(name, start_time, proc, testdir, log_out, log_err) = job
if proc.poll() is not None:
log_out.seek(0), log_err.seek(0)
[stdout, stderr] = [log_file.read().decode('utf-8') for log_file in (log_out, log_err)]
log_out.close(), log_err.close()
if proc.returncode == TEST_EXIT_PASSED and stderr == "":
status = "Passed"
elif proc.returncode == TEST_EXIT_SKIPPED:
Reported by Pylint.
src/secp256k1/src/modules/schnorrsig/tests_impl.h
83 issues
Line: 16
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
* bytes) changes the hash function
*/
void nonce_function_bip340_bitflip(unsigned char **args, size_t n_flip, size_t n_bytes, size_t msglen, size_t algolen) {
unsigned char nonces[2][32];
CHECK(nonce_function_bip340(nonces[0], args[0], msglen, args[1], args[2], args[3], algolen, args[4]) == 1);
secp256k1_testrand_flip(args[n_flip], n_bytes);
CHECK(nonce_function_bip340(nonces[1], args[0], msglen, args[1], args[2], args[3], algolen, args[4]) == 1);
CHECK(secp256k1_memcmp_var(nonces[0], nonces[1], 32) != 0);
}
Reported by FlawFinder.
Line: 35
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
}
void run_nonce_function_bip340_tests(void) {
unsigned char tag[13] = "BIP0340/nonce";
unsigned char aux_tag[11] = "BIP0340/aux";
unsigned char algo[13] = "BIP0340/nonce";
size_t algolen = sizeof(algo);
secp256k1_sha256 sha;
secp256k1_sha256 sha_optimized;
Reported by FlawFinder.
Line: 36
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
void run_nonce_function_bip340_tests(void) {
unsigned char tag[13] = "BIP0340/nonce";
unsigned char aux_tag[11] = "BIP0340/aux";
unsigned char algo[13] = "BIP0340/nonce";
size_t algolen = sizeof(algo);
secp256k1_sha256 sha;
secp256k1_sha256 sha_optimized;
unsigned char nonce[32];
Reported by FlawFinder.
Line: 37
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
void run_nonce_function_bip340_tests(void) {
unsigned char tag[13] = "BIP0340/nonce";
unsigned char aux_tag[11] = "BIP0340/aux";
unsigned char algo[13] = "BIP0340/nonce";
size_t algolen = sizeof(algo);
secp256k1_sha256 sha;
secp256k1_sha256 sha_optimized;
unsigned char nonce[32];
unsigned char msg[32];
Reported by FlawFinder.
Line: 41
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
size_t algolen = sizeof(algo);
secp256k1_sha256 sha;
secp256k1_sha256 sha_optimized;
unsigned char nonce[32];
unsigned char msg[32];
size_t msglen = sizeof(msg);
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
Reported by FlawFinder.
Line: 42
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
secp256k1_sha256 sha;
secp256k1_sha256 sha_optimized;
unsigned char nonce[32];
unsigned char msg[32];
size_t msglen = sizeof(msg);
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
unsigned char *args[5];
Reported by FlawFinder.
Line: 44
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
unsigned char nonce[32];
unsigned char msg[32];
size_t msglen = sizeof(msg);
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
unsigned char *args[5];
int i;
Reported by FlawFinder.
Line: 45
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
unsigned char msg[32];
size_t msglen = sizeof(msg);
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
unsigned char *args[5];
int i;
/* Check that hash initialized by
Reported by FlawFinder.
Line: 46
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
size_t msglen = sizeof(msg);
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
unsigned char *args[5];
int i;
/* Check that hash initialized by
* secp256k1_nonce_function_bip340_sha256_tagged has the expected
Reported by FlawFinder.
Line: 47
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
unsigned char key[32];
unsigned char pk[32];
unsigned char aux_rand[32];
unsigned char *args[5];
int i;
/* Check that hash initialized by
* secp256k1_nonce_function_bip340_sha256_tagged has the expected
* state. */
Reported by FlawFinder.
contrib/linearize/linearize-data.py
78 issues
Line: 62
Column: 5
return hash2_o
def calc_hash_str(blk_hdr):
hash = calc_hdr_hash(blk_hdr)
hash = bufreverse(hash)
hash = wordreverse(hash)
hash_str = hash.hex()
return hash_str
Reported by Pylint.
Line: 76
Column: 22
return (dt_ym, nTime)
# When getting the list of block hashes, undo any byte reversals.
def get_block_hashes(settings):
blkindex = []
f = open(settings['hashlist'], "r", encoding="utf8")
for line in f:
line = line.rstrip()
if settings['rev_hash_bytes'] == 'true':
Reported by Pylint.
Line: 77
Column: 5
# When getting the list of block hashes, undo any byte reversals.
def get_block_hashes(settings):
blkindex = []
f = open(settings['hashlist'], "r", encoding="utf8")
for line in f:
line = line.rstrip()
if settings['rev_hash_bytes'] == 'true':
line = hex_switchEndian(line)
Reported by Pylint.
Line: 78
Column: 5
# When getting the list of block hashes, undo any byte reversals.
def get_block_hashes(settings):
blkindex = []
f = open(settings['hashlist'], "r", encoding="utf8")
for line in f:
line = line.rstrip()
if settings['rev_hash_bytes'] == 'true':
line = hex_switchEndian(line)
blkindex.append(line)
Reported by Pylint.
Line: 79
Column: 9
def get_block_hashes(settings):
blkindex = []
f = open(settings['hashlist'], "r", encoding="utf8")
for line in f:
line = line.rstrip()
if settings['rev_hash_bytes'] == 'true':
line = hex_switchEndian(line)
blkindex.append(line)
Reported by Pylint.
Line: 90
Column: 16
return blkindex
# The block map shouldn't give or receive byte-reversed hashes.
def mkblockmap(blkindex):
blkmap = {}
for height,hash in enumerate(blkindex):
blkmap[hash] = height
return blkmap
Reported by Pylint.
Line: 91
Column: 5
# The block map shouldn't give or receive byte-reversed hashes.
def mkblockmap(blkindex):
blkmap = {}
for height,hash in enumerate(blkindex):
blkmap[hash] = height
return blkmap
# This gets the first block file ID that exists from the input block
Reported by Pylint.
Line: 92
Column: 16
# The block map shouldn't give or receive byte-reversed hashes.
def mkblockmap(blkindex):
blkmap = {}
for height,hash in enumerate(blkindex):
blkmap[hash] = height
return blkmap
# This gets the first block file ID that exists from the input block
# file directory.
Reported by Pylint.
Line: 124
Column: 44
BlockExtent = namedtuple('BlockExtent', ['fn', 'offset', 'inhdr', 'blkhdr', 'size'])
class BlockDataCopier:
def __init__(self, settings, blkindex, blkmap):
self.settings = settings
self.blkindex = blkindex
self.blkmap = blkmap
# Get first occurring block file id - for pruned nodes this
Reported by Pylint.
Line: 124
Column: 24
BlockExtent = namedtuple('BlockExtent', ['fn', 'offset', 'inhdr', 'blkhdr', 'size'])
class BlockDataCopier:
def __init__(self, settings, blkindex, blkmap):
self.settings = settings
self.blkindex = blkindex
self.blkmap = blkmap
# Get first occurring block file id - for pruned nodes this
Reported by Pylint.
test/functional/wallet_multiwallet.py
78 issues
Line: 30
Column: 5
def test_load_unload(node, name):
global got_loading_error
while True:
if got_loading_error:
return
try:
node.loadwallet(name)
Reported by Pylint.
Line: 65
Column: 18
data_dir = lambda *p: os.path.join(node.datadir, self.chain, *p)
wallet_dir = lambda *p: data_dir('wallets', *p)
wallet = lambda name: node.get_wallet_rpc(name)
def wallet_file(name):
if name == self.default_wallet_name:
return wallet_dir(self.default_wallet_name, self.wallet_data_filename)
if os.path.isdir(wallet_dir(name)):
Reported by Pylint.
Line: 289
Column: 9
threads.append(t)
for t in threads:
t.join()
global got_loading_error
assert_equal(got_loading_error, True)
self.log.info("Load remaining wallets")
for wallet_name in wallet_names[2:]:
loadwallet_name = self.nodes[0].loadwallet(wallet_name)
Reported by Pylint.
Line: 361
Column: 9
assert_raises_rpc_error(-1, "JSON value is not a string as expected", self.nodes[0].unloadwallet)
assert_raises_rpc_error(-18, "Requested wallet does not exist or is not loaded", self.nodes[0].unloadwallet, "dummy")
assert_raises_rpc_error(-18, "Requested wallet does not exist or is not loaded", node.get_wallet_rpc("dummy").unloadwallet)
assert_raises_rpc_error(-8, "RPC endpoint wallet and wallet_name parameter specify different wallets", w1.unloadwallet, "w2"),
# Successfully unload the specified wallet name
self.nodes[0].unloadwallet("w1")
assert 'w1' not in self.nodes[0].listwallets()
Reported by Pylint.
Line: 26
Column: 1
get_rpc_proxy,
)
got_loading_error = False
def test_load_unload(node, name):
global got_loading_error
while True:
Reported by Pylint.
Line: 29
Column: 1
got_loading_error = False
def test_load_unload(node, name):
global got_loading_error
while True:
if got_loading_error:
return
try:
Reported by Pylint.
Line: 30
Column: 5
def test_load_unload(node, name):
global got_loading_error
while True:
if got_loading_error:
return
try:
node.loadwallet(name)
Reported by Pylint.
Line: 37
Column: 9
try:
node.loadwallet(name)
node.unloadwallet(name)
except JSONRPCException as e:
if e.error['code'] == -4 and 'Wallet already loading' in e.error['message']:
got_loading_error = True
return
Reported by Pylint.
Line: 43
Column: 1
return
class MultiWalletTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.rpc_timeout = 120
self.extra_args = [["-nowallet"], []]
Reported by Pylint.
Line: 60
Column: 5
help='Test data with wallet directories (default: %(default)s)',
)
def run_test(self):
node = self.nodes[0]
data_dir = lambda *p: os.path.join(node.datadir, self.chain, *p)
wallet_dir = lambda *p: data_dir('wallets', *p)
wallet = lambda name: node.get_wallet_rpc(name)
Reported by Pylint.