The following issues were found
contrib/devtools/test-symbol-check.py
39 issues
Line: 17
Column: 9
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
def get_machine(cc: List[str]):
Reported by Pylint.
Line: 23
Column: 9
return (p.returncode, p.stdout.rstrip())
def get_machine(cc: List[str]):
p = subprocess.run([*cc,'-dumpmachine'], stdout=subprocess.PIPE, universal_newlines=True)
return p.stdout.rstrip()
class TestSymbolChecks(unittest.TestCase):
def test_ELF(self):
source = 'test1.c'
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
'''
Test script for symbol-check.py
'''
import os
import subprocess
Reported by Pylint.
Line: 9
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
Test script for symbol-check.py
'''
import os
import subprocess
from typing import List
import unittest
from utils import determine_wellknown_cmd
Reported by Bandit.
Line: 15
Column: 1
from utils import determine_wellknown_cmd
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
Reported by Pylint.
Line: 15
Column: 1
from utils import determine_wellknown_cmd
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
Reported by Pylint.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
from utils import determine_wellknown_cmd
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
Reported by Bandit.
Line: 17
Column: 1
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
def get_machine(cc: List[str]):
Reported by Pylint.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
def get_machine(cc: List[str]):
Reported by Bandit.
Line: 17
Column: 5
def call_symbol_check(cc: List[str], source, executable, options):
subprocess.run([*cc,source,'-o',executable] + options, check=True)
p = subprocess.run(['./contrib/devtools/symbol-check.py',executable], stdout=subprocess.PIPE, universal_newlines=True)
os.remove(source)
os.remove(executable)
return (p.returncode, p.stdout.rstrip())
def get_machine(cc: List[str]):
Reported by Pylint.
test/functional/rpc_net.py
37 issues
Line: 54
Column: 9
def run_test(self):
# We need miniwallet to make a transaction
self.wallet = MiniWallet(self.nodes[0])
self.wallet.generate(1)
# Get out of IBD for the minfeefilter and getpeerinfo tests.
self.nodes[0].generate(COINBASE_MATURITY + 1)
# By default, the test framework sets up an addnode connection from
Reported by Pylint.
Line: 126
Column: 89
for peer_before in peer_info_before:
peer_after = lambda: next(p for p in self.nodes[0].getpeerinfo() if p['id'] == peer_before['id'])
self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + 32, timeout=1)
self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + 32, timeout=1)
def test_getnetworkinfo(self):
self.log.info("Test getnetworkinfo")
info = self.nodes[0].getnetworkinfo()
Reported by Pylint.
Line: 126
Column: 37
for peer_before in peer_info_before:
peer_after = lambda: next(p for p in self.nodes[0].getpeerinfo() if p['id'] == peer_before['id'])
self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + 32, timeout=1)
self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + 32, timeout=1)
def test_getnetworkinfo(self):
self.log.info("Test getnetworkinfo")
info = self.nodes[0].getnetworkinfo()
Reported by Pylint.
Line: 127
Column: 37
for peer_before in peer_info_before:
peer_after = lambda: next(p for p in self.nodes[0].getpeerinfo() if p['id'] == peer_before['id'])
self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + 32, timeout=1)
self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + 32, timeout=1)
def test_getnetworkinfo(self):
self.log.info("Test getnetworkinfo")
info = self.nodes[0].getnetworkinfo()
assert_equal(info['networkactive'], True)
Reported by Pylint.
Line: 127
Column: 89
for peer_before in peer_info_before:
peer_after = lambda: next(p for p in self.nodes[0].getpeerinfo() if p['id'] == peer_before['id'])
self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + 32, timeout=1)
self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + 32, timeout=1)
def test_getnetworkinfo(self):
self.log.info("Test getnetworkinfo")
info = self.nodes[0].getnetworkinfo()
assert_equal(info['networkactive'], True)
Reported by Pylint.
Line: 42
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
servicesflag_generated = 0
for servicename in servicenames:
servicesflag_generated |= getattr(test_framework.messages, 'NODE_' + servicename)
assert servicesflag_generated == servicesflag
class NetTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
Reported by Bandit.
Line: 45
Column: 1
assert servicesflag_generated == servicesflag
class NetTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.extra_args = [["-minrelaytxfee=0.00001000"], ["-minrelaytxfee=0.00000500"]]
self.supports_cli = False
Reported by Pylint.
Line: 75
Column: 5
self.test_getnodeaddresses()
self.test_addpeeraddress()
def test_connection_count(self):
self.log.info("Test getconnectioncount")
# After using `connect_nodes` to connect nodes 0 and 1 to each other.
assert_equal(self.nodes[0].getconnectioncount(), 2)
def test_getpeerinfo(self):
Reported by Pylint.
Line: 80
Column: 5
# After using `connect_nodes` to connect nodes 0 and 1 to each other.
assert_equal(self.nodes[0].getconnectioncount(), 2)
def test_getpeerinfo(self):
self.log.info("Test getpeerinfo")
# Create a few getpeerinfo last_block/last_transaction values.
self.wallet.send_self_transfer(from_node=self.nodes[0]) # Make a transaction so we can see it in the getpeerinfo results
self.nodes[1].generate(1)
self.sync_all()
Reported by Pylint.
Line: 83
Column: 1
def test_getpeerinfo(self):
self.log.info("Test getpeerinfo")
# Create a few getpeerinfo last_block/last_transaction values.
self.wallet.send_self_transfer(from_node=self.nodes[0]) # Make a transaction so we can see it in the getpeerinfo results
self.nodes[1].generate(1)
self.sync_all()
time_now = int(time.time())
peer_info = [x.getpeerinfo() for x in self.nodes]
# Verify last_block and last_transaction keys/values.
Reported by Pylint.
test/functional/wallet_signer.py
37 issues
Line: 48
Column: 9
os.remove(os.path.join(node.cwd, "mock_result"))
def run_test(self):
self.log.debug(f"-signer={self.mock_signer_path()}")
# Create new wallets for an external signer.
# disable_private_keys and descriptors must be true:
assert_raises_rpc_error(-4, "Private keys must be disabled when using an external signer", self.nodes[1].createwallet, wallet_name='not_hww', disable_private_keys=False, descriptors=True, external_signer=True)
if self.is_bdb_compiled():
Reported by Pylint.
Line: 68
Column: 3
# assert_raises_rpc_error(-4, "Multiple signers found, please specify which to use", wallet_name='not_hww', disable_private_keys=True, descriptors=True, external_signer=True)
# TODO: Handle error thrown by script
# self.set_mock_result(self.nodes[1], "2")
# assert_raises_rpc_error(-1, 'Unable to parse JSON',
# self.nodes[1].createwallet, wallet_name='not_hww2', disable_private_keys=True, descriptors=True, external_signer=False
# )
# self.clear_mock_result(self.nodes[1])
Reported by Pylint.
Line: 20
Column: 1
)
class WalletSignerTest(BitcoinTestFramework):
def mock_signer_path(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'signer.py')
if platform.system() == "Windows":
return "py " + path
else:
Reported by Pylint.
Line: 21
Column: 5
class WalletSignerTest(BitcoinTestFramework):
def mock_signer_path(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'signer.py')
if platform.system() == "Windows":
return "py " + path
else:
return path
Reported by Pylint.
Line: 21
Column: 5
class WalletSignerTest(BitcoinTestFramework):
def mock_signer_path(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'signer.py')
if platform.system() == "Windows":
return "py " + path
else:
return path
Reported by Pylint.
Line: 23
Column: 9
class WalletSignerTest(BitcoinTestFramework):
def mock_signer_path(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'signer.py')
if platform.system() == "Windows":
return "py " + path
else:
return path
def set_test_params(self):
Reported by Pylint.
Line: 40
Column: 5
self.skip_if_no_external_signer()
self.skip_if_no_wallet()
def set_mock_result(self, node, res):
with open(os.path.join(node.cwd, "mock_result"), "w", encoding="utf8") as f:
f.write(res)
def clear_mock_result(self, node):
os.remove(os.path.join(node.cwd, "mock_result"))
Reported by Pylint.
Line: 40
Column: 5
self.skip_if_no_external_signer()
self.skip_if_no_wallet()
def set_mock_result(self, node, res):
with open(os.path.join(node.cwd, "mock_result"), "w", encoding="utf8") as f:
f.write(res)
def clear_mock_result(self, node):
os.remove(os.path.join(node.cwd, "mock_result"))
Reported by Pylint.
Line: 41
Column: 83
self.skip_if_no_wallet()
def set_mock_result(self, node, res):
with open(os.path.join(node.cwd, "mock_result"), "w", encoding="utf8") as f:
f.write(res)
def clear_mock_result(self, node):
os.remove(os.path.join(node.cwd, "mock_result"))
Reported by Pylint.
Line: 44
Column: 5
with open(os.path.join(node.cwd, "mock_result"), "w", encoding="utf8") as f:
f.write(res)
def clear_mock_result(self, node):
os.remove(os.path.join(node.cwd, "mock_result"))
def run_test(self):
self.log.debug(f"-signer={self.mock_signer_path()}")
Reported by Pylint.
test/functional/feature_nulldummy.py
37 issues
Line: 63
Column: 9
self.nodes[0].createwallet(wallet_name='wmulti', disable_private_keys=True)
wmulti = self.nodes[0].get_wallet_rpc('wmulti')
w0 = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.address = w0.getnewaddress()
self.pubkey = w0.getaddressinfo(self.address)['pubkey']
self.ms_address = wmulti.addmultisigaddress(1, [self.pubkey])['address']
self.wit_address = w0.getnewaddress(address_type='p2sh-segwit')
self.wit_ms_address = wmulti.addmultisigaddress(1, [self.pubkey], '', 'p2sh-segwit')['address']
if not self.options.descriptors:
Reported by Pylint.
Line: 64
Column: 9
wmulti = self.nodes[0].get_wallet_rpc('wmulti')
w0 = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.address = w0.getnewaddress()
self.pubkey = w0.getaddressinfo(self.address)['pubkey']
self.ms_address = wmulti.addmultisigaddress(1, [self.pubkey])['address']
self.wit_address = w0.getnewaddress(address_type='p2sh-segwit')
self.wit_ms_address = wmulti.addmultisigaddress(1, [self.pubkey], '', 'p2sh-segwit')['address']
if not self.options.descriptors:
# Legacy wallets need to import these so that they are watched by the wallet. This is unnecessary (and does not need to be tested) for descriptor wallets
Reported by Pylint.
Line: 65
Column: 9
w0 = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.address = w0.getnewaddress()
self.pubkey = w0.getaddressinfo(self.address)['pubkey']
self.ms_address = wmulti.addmultisigaddress(1, [self.pubkey])['address']
self.wit_address = w0.getnewaddress(address_type='p2sh-segwit')
self.wit_ms_address = wmulti.addmultisigaddress(1, [self.pubkey], '', 'p2sh-segwit')['address']
if not self.options.descriptors:
# Legacy wallets need to import these so that they are watched by the wallet. This is unnecessary (and does not need to be tested) for descriptor wallets
wmulti.importaddress(self.ms_address)
Reported by Pylint.
Line: 66
Column: 9
self.address = w0.getnewaddress()
self.pubkey = w0.getaddressinfo(self.address)['pubkey']
self.ms_address = wmulti.addmultisigaddress(1, [self.pubkey])['address']
self.wit_address = w0.getnewaddress(address_type='p2sh-segwit')
self.wit_ms_address = wmulti.addmultisigaddress(1, [self.pubkey], '', 'p2sh-segwit')['address']
if not self.options.descriptors:
# Legacy wallets need to import these so that they are watched by the wallet. This is unnecessary (and does not need to be tested) for descriptor wallets
wmulti.importaddress(self.ms_address)
wmulti.importaddress(self.wit_ms_address)
Reported by Pylint.
Line: 67
Column: 9
self.pubkey = w0.getaddressinfo(self.address)['pubkey']
self.ms_address = wmulti.addmultisigaddress(1, [self.pubkey])['address']
self.wit_address = w0.getnewaddress(address_type='p2sh-segwit')
self.wit_ms_address = wmulti.addmultisigaddress(1, [self.pubkey], '', 'p2sh-segwit')['address']
if not self.options.descriptors:
# Legacy wallets need to import these so that they are watched by the wallet. This is unnecessary (and does not need to be tested) for descriptor wallets
wmulti.importaddress(self.ms_address)
wmulti.importaddress(self.wit_ms_address)
Reported by Pylint.
Line: 73
Column: 9
wmulti.importaddress(self.ms_address)
wmulti.importaddress(self.wit_ms_address)
self.coinbase_blocks = self.nodes[0].generate(2) # block height = 2
coinbase_txid = []
for i in self.coinbase_blocks:
coinbase_txid.append(self.nodes[0].getblock(i)['tx'][0])
self.nodes[0].generate(COINBASE_MATURITY) # block height = COINBASE_MATURITY + 2
self.lastblockhash = self.nodes[0].getbestblockhash()
Reported by Pylint.
Line: 78
Column: 9
for i in self.coinbase_blocks:
coinbase_txid.append(self.nodes[0].getblock(i)['tx'][0])
self.nodes[0].generate(COINBASE_MATURITY) # block height = COINBASE_MATURITY + 2
self.lastblockhash = self.nodes[0].getbestblockhash()
self.lastblockheight = COINBASE_MATURITY + 2
self.lastblocktime = int(time.time()) + self.lastblockheight
self.log.info(f"Test 1: NULLDUMMY compliant base transactions should be accepted to mempool and mined before activation [{COINBASE_MATURITY + 3}]")
test1txs = [create_transaction(self.nodes[0], coinbase_txid[0], self.ms_address, amount=49)]
Reported by Pylint.
Line: 79
Column: 9
coinbase_txid.append(self.nodes[0].getblock(i)['tx'][0])
self.nodes[0].generate(COINBASE_MATURITY) # block height = COINBASE_MATURITY + 2
self.lastblockhash = self.nodes[0].getbestblockhash()
self.lastblockheight = COINBASE_MATURITY + 2
self.lastblocktime = int(time.time()) + self.lastblockheight
self.log.info(f"Test 1: NULLDUMMY compliant base transactions should be accepted to mempool and mined before activation [{COINBASE_MATURITY + 3}]")
test1txs = [create_transaction(self.nodes[0], coinbase_txid[0], self.ms_address, amount=49)]
txid1 = self.nodes[0].sendrawtransaction(test1txs[0].serialize_with_witness().hex(), 0)
Reported by Pylint.
Line: 80
Column: 9
self.nodes[0].generate(COINBASE_MATURITY) # block height = COINBASE_MATURITY + 2
self.lastblockhash = self.nodes[0].getbestblockhash()
self.lastblockheight = COINBASE_MATURITY + 2
self.lastblocktime = int(time.time()) + self.lastblockheight
self.log.info(f"Test 1: NULLDUMMY compliant base transactions should be accepted to mempool and mined before activation [{COINBASE_MATURITY + 3}]")
test1txs = [create_transaction(self.nodes[0], coinbase_txid[0], self.ms_address, amount=49)]
txid1 = self.nodes[0].sendrawtransaction(test1txs[0].serialize_with_witness().hex(), 0)
test1txs.append(create_transaction(self.nodes[0], txid1, self.ms_address, amount=48))
Reported by Pylint.
Line: 82
Column: 9
self.lastblockheight = COINBASE_MATURITY + 2
self.lastblocktime = int(time.time()) + self.lastblockheight
self.log.info(f"Test 1: NULLDUMMY compliant base transactions should be accepted to mempool and mined before activation [{COINBASE_MATURITY + 3}]")
test1txs = [create_transaction(self.nodes[0], coinbase_txid[0], self.ms_address, amount=49)]
txid1 = self.nodes[0].sendrawtransaction(test1txs[0].serialize_with_witness().hex(), 0)
test1txs.append(create_transaction(self.nodes[0], txid1, self.ms_address, amount=48))
txid2 = self.nodes[0].sendrawtransaction(test1txs[1].serialize_with_witness().hex(), 0)
test1txs.append(create_transaction(self.nodes[0], coinbase_txid[1], self.wit_ms_address, amount=49))
Reported by Pylint.
test/functional/wallet_balance.py
35 issues
Line: 179
Column: 3
assert_equal(self.nodes[0].getbalance(minconf=0), Decimal('9.99'))
assert_equal(self.nodes[1].getbalance(minconf=0), Decimal('0'))
# getbalance with a minconf incorrectly excludes coins that have been spent more recently than the minconf blocks ago
# TODO: fix getbalance tracking of coin spentness depth
assert_equal(self.nodes[0].getbalance(minconf=1), Decimal('0'))
assert_equal(self.nodes[1].getbalance(minconf=1), Decimal('0'))
# getunconfirmedbalance
assert_equal(self.nodes[0].getunconfirmedbalance(), Decimal('60')) # output of node 1's spend
assert_equal(self.nodes[1].getunconfirmedbalance(), Decimal('30') - fee_node_1) # Doesn't include output of node 0's send since it was spent
Reported by Pylint.
Line: 217
Column: 3
self.sync_all()
# getbalance with a minconf incorrectly excludes coins that have been spent more recently than the minconf blocks ago
# TODO: fix getbalance tracking of coin spentness depth
# getbalance with minconf=3 should still show the old balance
assert_equal(self.nodes[1].getbalance(minconf=3), Decimal('0'))
# getbalance with minconf=2 will show the new balance.
assert_equal(self.nodes[1].getbalance(minconf=2), Decimal('0'))
Reported by Pylint.
Line: 18
Column: 1
)
def create_transactions(node, address, amt, fees):
# Create and sign raw transactions from node to address for amt.
# Creates a transaction for each fee and returns an array
# of the raw transactions.
utxos = [u for u in node.listunspent(0) if u['spendable']]
Reported by Pylint.
Line: 33
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if ins_total >= amt + max(fees):
break
# make sure there was enough utxos
assert ins_total >= amt + max(fees)
txs = []
for fee in fees:
outputs = {address: amt}
# prevent 0 change output
Reported by Bandit.
Line: 48
Column: 1
return txs
class WalletTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [
['-limitdescendantcount=3'], # Limit mempool descendants as a hack to have wallet txs rejected from the mempool
Reported by Pylint.
Line: 53
Column: 1
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [
['-limitdescendantcount=3'], # Limit mempool descendants as a hack to have wallet txs rejected from the mempool
[],
]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
Reported by Pylint.
Line: 60
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
if not self.options.descriptors:
# Tests legacy watchonly behavior which is not present (and does not need to be tested) in descriptor wallets
self.nodes[0].importaddress(ADDRESS_WATCHONLY)
# Check that nodes don't own any UTXOs
assert_equal(len(self.nodes[0].listunspent()), 0)
Reported by Pylint.
Line: 62
Column: 1
def run_test(self):
if not self.options.descriptors:
# Tests legacy watchonly behavior which is not present (and does not need to be tested) in descriptor wallets
self.nodes[0].importaddress(ADDRESS_WATCHONLY)
# Check that nodes don't own any UTXOs
assert_equal(len(self.nodes[0].listunspent()), 0)
assert_equal(len(self.nodes[1].listunspent()), 0)
Reported by Pylint.
Line: 69
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(len(self.nodes[1].listunspent()), 0)
self.log.info("Check that only node 0 is watching an address")
assert 'watchonly' in self.nodes[0].getbalances()
assert 'watchonly' not in self.nodes[1].getbalances()
self.log.info("Mining blocks ...")
self.nodes[0].generate(1)
self.sync_all()
Reported by Bandit.
Line: 70
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.log.info("Check that only node 0 is watching an address")
assert 'watchonly' in self.nodes[0].getbalances()
assert 'watchonly' not in self.nodes[1].getbalances()
self.log.info("Mining blocks ...")
self.nodes[0].generate(1)
self.sync_all()
self.nodes[1].generate(1)
Reported by Bandit.
contrib/seeds/generate-seeds.py
35 issues
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2014-2021 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
'''
Script to generate list of seed nodes for chainparams.cpp.
This script expects two text files in the directory that is passed as an
argument:
Reported by Pylint.
Line: 37
Column: 1
import os
import re
class BIP155Network(Enum):
IPV4 = 1
IPV6 = 2
TORV2 = 3 # no longer supported
TORV3 = 4
I2P = 5
Reported by Pylint.
Line: 45
Column: 1
I2P = 5
CJDNS = 6
def name_to_bip155(addr):
'''Convert address string to BIP155 (networkID, addr) tuple.'''
if addr.endswith('.onion'):
vchAddr = b32decode(addr[0:-6], True)
if len(vchAddr) == 35:
assert vchAddr[34] == 3
Reported by Pylint.
Line: 48
Column: 9
def name_to_bip155(addr):
'''Convert address string to BIP155 (networkID, addr) tuple.'''
if addr.endswith('.onion'):
vchAddr = b32decode(addr[0:-6], True)
if len(vchAddr) == 35:
assert vchAddr[34] == 3
return (BIP155Network.TORV3, vchAddr[:32])
elif len(vchAddr) == 10:
return (BIP155Network.TORV2, vchAddr)
Reported by Pylint.
Line: 49
Column: 9
'''Convert address string to BIP155 (networkID, addr) tuple.'''
if addr.endswith('.onion'):
vchAddr = b32decode(addr[0:-6], True)
if len(vchAddr) == 35:
assert vchAddr[34] == 3
return (BIP155Network.TORV3, vchAddr[:32])
elif len(vchAddr) == 10:
return (BIP155Network.TORV2, vchAddr)
else:
Reported by Pylint.
Line: 50
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if addr.endswith('.onion'):
vchAddr = b32decode(addr[0:-6], True)
if len(vchAddr) == 35:
assert vchAddr[34] == 3
return (BIP155Network.TORV3, vchAddr[:32])
elif len(vchAddr) == 10:
return (BIP155Network.TORV2, vchAddr)
else:
raise ValueError('Invalid onion %s' % vchAddr)
Reported by Bandit.
Line: 57
Column: 9
else:
raise ValueError('Invalid onion %s' % vchAddr)
elif addr.endswith('.b32.i2p'):
vchAddr = b32decode(addr[0:-8] + '====', True)
if len(vchAddr) == 32:
return (BIP155Network.I2P, vchAddr)
else:
raise ValueError(f'Invalid I2P {vchAddr}')
elif '.' in addr: # IPv4
Reported by Pylint.
Line: 58
Column: 9
raise ValueError('Invalid onion %s' % vchAddr)
elif addr.endswith('.b32.i2p'):
vchAddr = b32decode(addr[0:-8] + '====', True)
if len(vchAddr) == 32:
return (BIP155Network.I2P, vchAddr)
else:
raise ValueError(f'Invalid I2P {vchAddr}')
elif '.' in addr: # IPv4
return (BIP155Network.IPV4, bytes((int(x) for x in addr.split('.'))))
Reported by Pylint.
Line: 66
Column: 9
return (BIP155Network.IPV4, bytes((int(x) for x in addr.split('.'))))
elif ':' in addr: # IPv6
sub = [[], []] # prefix, suffix
x = 0
addr = addr.split(':')
for i,comp in enumerate(addr):
if comp == '':
if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
continue
Reported by Pylint.
Line: 70
Column: 20
addr = addr.split(':')
for i,comp in enumerate(addr):
if comp == '':
if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
continue
x += 1 # :: skips to suffix
assert(x < 2)
else: # two bytes per component
val = int(comp, 16)
Reported by Pylint.
test/functional/rpc_users.py
34 issues
Line: 44
Column: 9
def setup_chain(self):
super().setup_chain()
#Append rpcauth to bitcoin.conf before initialization
self.rtpassword = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM="
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
self.rpcuser = "rpcuser💻"
self.rpcpassword = "rpcpassword🔑"
Reported by Pylint.
Line: 47
Column: 9
self.rtpassword = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM="
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
self.rpcuser = "rpcuser💻"
self.rpcpassword = "rpcpassword🔑"
config = configparser.ConfigParser()
config.read_file(open(self.options.configfile))
gen_rpcauth = config['environment']['RPCAUTH']
Reported by Pylint.
Line: 48
Column: 9
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
self.rpcuser = "rpcuser💻"
self.rpcpassword = "rpcpassword🔑"
config = configparser.ConfigParser()
config.read_file(open(self.options.configfile))
gen_rpcauth = config['environment']['RPCAUTH']
Reported by Pylint.
Line: 55
Column: 9
gen_rpcauth = config['environment']['RPCAUTH']
# Generate RPCAUTH with specified password
self.rt2password = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI="
p = subprocess.Popen([sys.executable, gen_rpcauth, 'rt2', self.rt2password], stdout=subprocess.PIPE, universal_newlines=True)
lines = p.stdout.read().splitlines()
rpcauth2 = lines[1]
# Generate RPCAUTH without specifying password
Reported by Pylint.
Line: 61
Column: 9
rpcauth2 = lines[1]
# Generate RPCAUTH without specifying password
self.user = ''.join(SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(10))
p = subprocess.Popen([sys.executable, gen_rpcauth, self.user], stdout=subprocess.PIPE, universal_newlines=True)
lines = p.stdout.read().splitlines()
rpcauth3 = lines[1]
self.password = lines[3]
Reported by Pylint.
Line: 65
Column: 9
p = subprocess.Popen([sys.executable, gen_rpcauth, self.user], stdout=subprocess.PIPE, universal_newlines=True)
lines = p.stdout.read().splitlines()
rpcauth3 = lines[1]
self.password = lines[3]
with open(os.path.join(get_datadir_path(self.options.tmpdir, 0), "bitcoin.conf"), 'a', encoding='utf8') as f:
f.write(rpcauth + "\n")
f.write(rpcauth2 + "\n")
f.write(rpcauth3 + "\n")
Reported by Pylint.
Line: 14
Column: 1
str_to_b64str,
)
import os
import http.client
import urllib.parse
import subprocess
from random import SystemRandom
import string
Reported by Pylint.
Line: 15
Column: 1
)
import os
import http.client
import urllib.parse
import subprocess
from random import SystemRandom
import string
import configparser
Reported by Pylint.
Line: 16
Column: 1
import os
import http.client
import urllib.parse
import subprocess
from random import SystemRandom
import string
import configparser
import sys
Reported by Pylint.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import os
import http.client
import urllib.parse
import subprocess
from random import SystemRandom
import string
import configparser
import sys
Reported by Bandit.
test/functional/mempool_packages.py
34 issues
Line: 211
Column: 3
assert set(mempool1).issubset(set(mempool0))
for tx in chain[:MAX_ANCESTORS_CUSTOM]:
assert tx in mempool1
# TODO: more detailed check of node1's mempool (fees etc.)
# check transaction unbroadcast info (should be false if in both mempools)
mempool = self.nodes[0].getrawmempool(True)
for tx in mempool:
assert_equal(mempool[tx]['unbroadcast'], False)
Reported by Pylint.
Line: 217
Column: 3
for tx in mempool:
assert_equal(mempool[tx]['unbroadcast'], False)
# TODO: test ancestor size limits
# Now test descendant chain limits
txid = utxo[1]['txid']
value = utxo[1]['amount']
vout = utxo[1]['vout']
Reported by Pylint.
Line: 268
Column: 3
assert tx in mempool1
for tx in chain[MAX_DESCENDANTS_CUSTOM:]:
assert tx not in mempool1
# TODO: more detailed check of node1's mempool (fees etc.)
# TODO: test descendant size limits
# Test reorg handling
# First, the basics:
Reported by Pylint.
Line: 270
Column: 3
assert tx not in mempool1
# TODO: more detailed check of node1's mempool (fees etc.)
# TODO: test descendant size limits
# Test reorg handling
# First, the basics:
self.nodes[0].generate(1)
self.sync_blocks()
Reported by Pylint.
Line: 26
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# custom limits for node1
MAX_ANCESTORS_CUSTOM = 5
MAX_DESCENDANTS_CUSTOM = 10
assert MAX_DESCENDANTS_CUSTOM >= MAX_ANCESTORS_CUSTOM
class MempoolPackagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [
Reported by Bandit.
Line: 28
Column: 1
MAX_DESCENDANTS_CUSTOM = 10
assert MAX_DESCENDANTS_CUSTOM >= MAX_ANCESTORS_CUSTOM
class MempoolPackagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [
[
"-maxorphantx=1000",
Reported by Pylint.
Line: 46
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Mine some blocks and have them mature.
peer_inv_store = self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs
self.nodes[0].generate(COINBASE_MATURITY + 1)
utxo = self.nodes[0].listunspent(10)
txid = utxo[0]['txid']
Reported by Pylint.
Line: 46
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Mine some blocks and have them mature.
peer_inv_store = self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs
self.nodes[0].generate(COINBASE_MATURITY + 1)
utxo = self.nodes[0].listunspent(10)
txid = utxo[0]['txid']
Reported by Pylint.
Line: 46
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Mine some blocks and have them mature.
peer_inv_store = self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs
self.nodes[0].generate(COINBASE_MATURITY + 1)
utxo = self.nodes[0].listunspent(10)
txid = utxo[0]['txid']
Reported by Pylint.
Line: 68
Column: 1
witnesstx = self.nodes[0].decoderawtransaction(fulltx, True)
witness_chain.append(witnesstx['hash'])
# Wait until mempool transactions have passed initial broadcast (sent inv and received getdata)
# Otherwise, getrawmempool may be inconsistent with getmempoolentry if unbroadcast changes in between
peer_inv_store.wait_for_broadcast(witness_chain)
# Check mempool has MAX_ANCESTORS transactions in it, and descendant and ancestor
# count and fees should look correct
Reported by Pylint.
test/functional/wallet_abandonconflict.py
34 issues
Line: 96
Column: 3
balance = newbalance
# Restart the node with a higher min relay fee so the parent tx is no longer in mempool
# TODO: redo with eviction
self.restart_node(0, extra_args=["-minrelaytxfee=0.0001"])
assert self.nodes[0].getmempoolinfo()['loaded']
# Verify txs no longer in either node's mempool
assert_equal(len(self.nodes[0].getrawmempool()), 0)
Reported by Pylint.
Line: 179
Column: 9
#assert_equal(newbalance, balance - Decimal("10"))
self.log.info("If balance has not declined after invalidateblock then out of mempool wallet tx which is no longer")
self.log.info("conflicted has not resumed causing its inputs to be seen as spent. See Issue #7315")
self.log.info(str(balance) + " -> " + str(newbalance) + " ?")
if __name__ == '__main__':
AbandonConflictTest().main()
Reported by Pylint.
Line: 23
Column: 1
)
class AbandonConflictTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-minrelaytxfee=0.00001"], []]
def skip_test_if_missing_module(self):
Reported by Pylint.
Line: 31
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.nodes[1].generate(COINBASE_MATURITY)
self.sync_blocks()
balance = self.nodes[0].getbalance()
txA = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txB = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
Reported by Pylint.
Line: 31
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.nodes[1].generate(COINBASE_MATURITY)
self.sync_blocks()
balance = self.nodes[0].getbalance()
txA = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txB = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
Reported by Pylint.
Line: 35
Column: 9
self.nodes[1].generate(COINBASE_MATURITY)
self.sync_blocks()
balance = self.nodes[0].getbalance()
txA = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txB = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txC = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
self.sync_mempools()
self.nodes[1].generate(1)
Reported by Pylint.
Line: 36
Column: 9
self.sync_blocks()
balance = self.nodes[0].getbalance()
txA = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txB = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txC = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
self.sync_mempools()
self.nodes[1].generate(1)
# Can not abandon non-wallet transaction
Reported by Pylint.
Line: 37
Column: 9
balance = self.nodes[0].getbalance()
txA = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txB = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
txC = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), Decimal("10"))
self.sync_mempools()
self.nodes[1].generate(1)
# Can not abandon non-wallet transaction
assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', lambda: self.nodes[0].abandontransaction(txid='ff' * 32))
Reported by Pylint.
Line: 42
Column: 1
self.nodes[1].generate(1)
# Can not abandon non-wallet transaction
assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', lambda: self.nodes[0].abandontransaction(txid='ff' * 32))
# Can not abandon confirmed transaction
assert_raises_rpc_error(-5, 'Transaction not eligible for abandonment', lambda: self.nodes[0].abandontransaction(txid=txA))
self.sync_blocks()
newbalance = self.nodes[0].getbalance()
Reported by Pylint.
Line: 44
Column: 1
# Can not abandon non-wallet transaction
assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', lambda: self.nodes[0].abandontransaction(txid='ff' * 32))
# Can not abandon confirmed transaction
assert_raises_rpc_error(-5, 'Transaction not eligible for abandonment', lambda: self.nodes[0].abandontransaction(txid=txA))
self.sync_blocks()
newbalance = self.nodes[0].getbalance()
assert balance - newbalance < Decimal("0.001") #no more than fees lost
balance = newbalance
Reported by Pylint.
test/functional/test_framework/blocktools.py
34 issues
Line: 11
Column: 1
import time
import unittest
from .address import (
key_to_p2sh_p2wpkh,
key_to_p2wpkh,
script_to_p2sh_p2wsh,
script_to_p2wsh,
)
Reported by Pylint.
Line: 17
Column: 1
script_to_p2sh_p2wsh,
script_to_p2wsh,
)
from .messages import (
CBlock,
COIN,
COutPoint,
CTransaction,
CTxIn,
Reported by Pylint.
Line: 30
Column: 1
tx_from_hex,
uint256_from_str,
)
from .script import (
CScript,
CScriptNum,
CScriptOp,
OP_1,
OP_CHECKMULTISIG,
Reported by Pylint.
Line: 40
Column: 1
OP_RETURN,
OP_TRUE,
)
from .script_util import (
key_to_p2wpkh_script,
script_to_p2wsh_script,
)
from .util import assert_equal
Reported by Pylint.
Line: 44
Column: 1
key_to_p2wpkh_script,
script_to_p2wsh_script,
)
from .util import assert_equal
WITNESS_SCALE_FACTOR = 4
MAX_BLOCK_SIGOPS = 20000
MAX_BLOCK_SIGOPS_WEIGHT = MAX_BLOCK_SIGOPS * WITNESS_SCALE_FACTOR
Reported by Pylint.
Line: 217
Column: 9
pkscript = key_to_p2wpkh_script(pubkey)
else:
# 1-of-1 multisig
witness_script = CScript([OP_1, bytes.fromhex(pubkey), OP_1, OP_CHECKMULTISIG])
pkscript = script_to_p2wsh_script(witness_script)
return pkscript.hex()
def create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount):
"""Return a transaction (in hex) that spends the given utxo to a segwit output.
Reported by Pylint.
Line: 84
Column: 13
coinbase = create_coinbase(height=tmpl['height'])
block.vtx.append(coinbase)
if txlist:
for tx in txlist:
if not hasattr(tx, 'calc_sha256'):
tx = tx_from_hex(tx)
block.vtx.append(tx)
block.hashMerkleRoot = block.calc_merkle_root()
block.calc_sha256()
Reported by Pylint.
Line: 86
Column: 17
if txlist:
for tx in txlist:
if not hasattr(tx, 'calc_sha256'):
tx = tx_from_hex(tx)
block.vtx.append(tx)
block.hashMerkleRoot = block.calc_merkle_root()
block.calc_sha256()
return block
Reported by Pylint.
Line: 92
Column: 1
block.calc_sha256()
return block
def get_witness_script(witness_root, witness_nonce):
witness_commitment = uint256_from_str(hash256(ser_uint256(witness_root) + ser_uint256(witness_nonce)))
output_data = WITNESS_COMMITMENT_HEADER + ser_uint256(witness_commitment)
return CScript([OP_RETURN, output_data])
def add_witness_commitment(block, nonce=0):
Reported by Pylint.
Line: 93
Column: 1
return block
def get_witness_script(witness_root, witness_nonce):
witness_commitment = uint256_from_str(hash256(ser_uint256(witness_root) + ser_uint256(witness_nonce)))
output_data = WITNESS_COMMITMENT_HEADER + ser_uint256(witness_commitment)
return CScript([OP_RETURN, output_data])
def add_witness_commitment(block, nonce=0):
"""Add a witness commitment to the block's coinbase transaction.
Reported by Pylint.