The following issues were found
test/functional/feature_notifications.py
29 issues
Line: 32
Column: 9
self.setup_clean_chain = True
def setup_network(self):
self.wallet = ''.join(chr(i) for i in range(FILE_CHAR_START, FILE_CHAR_END) if chr(i) not in FILE_CHARS_DISALLOWED)
self.alertnotify_dir = os.path.join(self.options.tmpdir, "alertnotify")
self.blocknotify_dir = os.path.join(self.options.tmpdir, "blocknotify")
self.walletnotify_dir = os.path.join(self.options.tmpdir, "walletnotify")
os.mkdir(self.alertnotify_dir)
os.mkdir(self.blocknotify_dir)
Reported by Pylint.
Line: 33
Column: 9
def setup_network(self):
self.wallet = ''.join(chr(i) for i in range(FILE_CHAR_START, FILE_CHAR_END) if chr(i) not in FILE_CHARS_DISALLOWED)
self.alertnotify_dir = os.path.join(self.options.tmpdir, "alertnotify")
self.blocknotify_dir = os.path.join(self.options.tmpdir, "blocknotify")
self.walletnotify_dir = os.path.join(self.options.tmpdir, "walletnotify")
os.mkdir(self.alertnotify_dir)
os.mkdir(self.blocknotify_dir)
os.mkdir(self.walletnotify_dir)
Reported by Pylint.
Line: 34
Column: 9
def setup_network(self):
self.wallet = ''.join(chr(i) for i in range(FILE_CHAR_START, FILE_CHAR_END) if chr(i) not in FILE_CHARS_DISALLOWED)
self.alertnotify_dir = os.path.join(self.options.tmpdir, "alertnotify")
self.blocknotify_dir = os.path.join(self.options.tmpdir, "blocknotify")
self.walletnotify_dir = os.path.join(self.options.tmpdir, "walletnotify")
os.mkdir(self.alertnotify_dir)
os.mkdir(self.blocknotify_dir)
os.mkdir(self.walletnotify_dir)
Reported by Pylint.
Line: 35
Column: 9
self.wallet = ''.join(chr(i) for i in range(FILE_CHAR_START, FILE_CHAR_END) if chr(i) not in FILE_CHARS_DISALLOWED)
self.alertnotify_dir = os.path.join(self.options.tmpdir, "alertnotify")
self.blocknotify_dir = os.path.join(self.options.tmpdir, "blocknotify")
self.walletnotify_dir = os.path.join(self.options.tmpdir, "walletnotify")
os.mkdir(self.alertnotify_dir)
os.mkdir(self.blocknotify_dir)
os.mkdir(self.walletnotify_dir)
# -alertnotify and -blocknotify on node0, walletnotify on node1
Reported by Pylint.
Line: 41
Column: 9
os.mkdir(self.walletnotify_dir)
# -alertnotify and -blocknotify on node0, walletnotify on node1
self.extra_args = [[
"-alertnotify=echo > {}".format(os.path.join(self.alertnotify_dir, '%s')),
"-blocknotify=echo > {}".format(os.path.join(self.blocknotify_dir, '%s')),
], [
"-rescan",
"-walletnotify=echo %h_%b > {}".format(os.path.join(self.walletnotify_dir, notify_outputname('%w', '%s'))),
Reported by Pylint.
Line: 160
Column: 3
self.expect_wallet_notify([(bump2, blockheight2, blockhash2), (tx2, -1, UNCONFIRMED_HASH_STRING)])
assert_equal(self.nodes[1].gettransaction(bump2)["confirmations"], 1)
# TODO: add test for `-alertnotify` large fork notifications
def expect_wallet_notify(self, tx_details):
self.wait_until(lambda: len(os.listdir(self.walletnotify_dir)) >= len(tx_details), timeout=10)
# Should have no more and no less files than expected
assert_equal(sorted(notify_outputname(self.wallet, tx_id) for tx_id, _, _ in tx_details), sorted(os.listdir(self.walletnotify_dir)))
Reported by Pylint.
Line: 170
Column: 53
for tx_id, blockheight, blockhash in tx_details:
fname = os.path.join(self.walletnotify_dir, notify_outputname(self.wallet, tx_id))
# Wait for the cached writes to hit storage
self.wait_until(lambda: os.path.getsize(fname) > 0, timeout=10)
with open(fname, 'rt', encoding='utf-8') as f:
text = f.read()
# Universal newline ensures '\n' on 'nt'
assert_equal(text[-1], '\n')
text = text[:-1]
Reported by Pylint.
Line: 22
Column: 1
FILE_CHARS_DISALLOWED = '/\\?%*:|"<>' if os.name == 'nt' else '/'
UNCONFIRMED_HASH_STRING = 'unconfirmed'
def notify_outputname(walletname, txid):
return txid if os.name == 'nt' else '{}_{}'.format(walletname, txid)
class NotificationsTest(BitcoinTestFramework):
def set_test_params(self):
Reported by Pylint.
Line: 26
Column: 1
return txid if os.name == 'nt' else '{}_{}'.format(walletname, txid)
class NotificationsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
def setup_network(self):
Reported by Pylint.
Line: 26
Column: 1
return txid if os.name == 'nt' else '{}_{}'.format(walletname, txid)
class NotificationsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
def setup_network(self):
Reported by Pylint.
contrib/seeds/makeseeds.py
29 issues
Line: 11
Column: 1
import re
import sys
import dns.resolver
import collections
NSEEDS=512
MAX_SEEDS_PER_ASN=2
Reported by Pylint.
Line: 63
Column: 3
if m.group(1) in ['::']: # Not interested in localhost
return None
ipstr = m.group(1)
sortkey = ipstr # XXX parse IPv6 into number, could use name_to_ipv6 from generate-seeds
port = int(m.group(2))
else:
# Do IPv4 sanity check
ip = 0
for i in range(0,4):
Reported by Pylint.
Line: 143
Column: 12
reversed(ipaddr.split('.'))) + prefix + '.asn.cymru.com',
'TXT').response.answer][0].split('\"')[1].split(' ')[0])
return asn
except Exception:
sys.stderr.write('ERR: Could not resolve ASN for "' + ip + '"\n')
return None
# Based on Greg Maxwell's seed_filter.py
def filterbyasn(ips, max_per_asn, max_per_net):
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2013-2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Generate seeds.txt from Pieter's DNS seeder
#
import re
Reported by Pylint.
Line: 12
Column: 1
import re
import sys
import dns.resolver
import collections
NSEEDS=512
MAX_SEEDS_PER_ASN=2
Reported by Pylint.
Line: 41
Column: 1
r"0.21.99"
r")")
def parseline(line):
sline = line.split()
if len(sline) < 11:
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
Reported by Pylint.
Line: 41
Column: 1
r"0.21.99"
r")")
def parseline(line):
sline = line.split()
if len(sline) < 11:
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
Reported by Pylint.
Line: 44
Column: 1
def parseline(line):
sline = line.split()
if len(sline) < 11:
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
ip = None
if m is None:
m = PATTERN_IPV6.match(sline[0])
Reported by Pylint.
Line: 45
Column: 5
sline = line.split()
if len(sline) < 11:
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
ip = None
if m is None:
m = PATTERN_IPV6.match(sline[0])
if m is None:
Reported by Pylint.
Line: 47
Column: 5
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
ip = None
if m is None:
m = PATTERN_IPV6.match(sline[0])
if m is None:
m = PATTERN_ONION.match(sline[0])
if m is None:
Reported by Pylint.
test/functional/p2p_invalid_messages.py
29 issues
Line: 163
Column: 9
# will produce unexpected results.
conn.wait_for_sendaddrv2()
self.log.info('Test addrv2: ' + label)
msg = msg_unrecognized(str_data=b'')
msg.msgtype = b'addrv2'
with node.assert_debug_log(required_log_messages):
# override serialize() which would include the length of the data
Reported by Pylint.
Line: 234
Column: 23
def test_oversized_msg(self, msg, size):
msg_type = msg.msgtype.decode('ascii')
self.log.info("Test {} message of size {} is logged as misbehaving".format(msg_type, size))
with self.nodes[0].assert_debug_log(['Misbehaving', '{} message size = {}'.format(msg_type, size)]):
self.nodes[0].add_p2p_connection(P2PInterface()).send_and_ping(msg)
self.nodes[0].disconnect_p2ps()
def test_oversized_inv_msg(self):
Reported by Pylint.
Line: 36
Column: 1
VALID_DATA_LIMIT = MAX_PROTOCOL_MESSAGE_LENGTH - 5 # Account for the 5-byte length prefix
class msg_unrecognized:
"""Nonsensical message. Modeled after similar types in test_framework.messages."""
msgtype = b'badmsg\x01'
def __init__(self, *, str_data):
Reported by Pylint.
Line: 44
Column: 5
def __init__(self, *, str_data):
self.str_data = str_data.encode() if not isinstance(str_data, bytes) else str_data
def serialize(self):
return ser_string(self.str_data)
def __repr__(self):
return "{}(data={})".format(self.msgtype, self.str_data)
Reported by Pylint.
Line: 51
Column: 1
return "{}(data={})".format(self.msgtype, self.str_data)
class SenderOfAddrV2(P2PInterface):
def wait_for_sendaddrv2(self):
self.wait_until(lambda: 'sendaddrv2' in self.last_message)
class InvalidMessagesTest(BitcoinTestFramework):
Reported by Pylint.
Line: 52
Column: 5
class SenderOfAddrV2(P2PInterface):
def wait_for_sendaddrv2(self):
self.wait_until(lambda: 'sendaddrv2' in self.last_message)
class InvalidMessagesTest(BitcoinTestFramework):
def set_test_params(self):
Reported by Pylint.
Line: 56
Column: 1
self.wait_until(lambda: 'sendaddrv2' in self.last_message)
class InvalidMessagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [["-whitelist=addr@127.0.0.1"]]
Reported by Pylint.
Line: 78
Column: 5
self.test_oversized_headers_msg()
self.test_resource_exhaustion()
def test_buffer(self):
self.log.info("Test message with header split across two buffers is received")
conn = self.nodes[0].add_p2p_connection(P2PDataStore())
# Create valid message
msg = conn.build_message(msg_ping(nonce=12345))
cut_pos = 12 # Chosen at an arbitrary position within the header
Reported by Pylint.
Line: 97
Column: 5
conn.sync_with_ping(timeout=1)
self.nodes[0].disconnect_p2ps()
def test_duplicate_version_msg(self):
self.log.info("Test duplicate version message is ignored")
conn = self.nodes[0].add_p2p_connection(P2PDataStore())
with self.nodes[0].assert_debug_log(['redundant version message from peer']):
conn.send_and_ping(msg_version())
self.nodes[0].disconnect_p2ps()
Reported by Pylint.
Line: 104
Column: 5
conn.send_and_ping(msg_version())
self.nodes[0].disconnect_p2ps()
def test_magic_bytes(self):
self.log.info("Test message with invalid magic bytes disconnects peer")
conn = self.nodes[0].add_p2p_connection(P2PDataStore())
with self.nodes[0].assert_debug_log(['Header error: Wrong MessageStart ffffffff received']):
msg = conn.build_message(msg_unrecognized(str_data="d"))
# modify magic bytes
Reported by Pylint.
test/functional/wallet_keypool.py
29 issues
Line: 13
Column: 1
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
class KeyPoolTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
Reported by Pylint.
Line: 20
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
nodes = self.nodes
addr_before_encrypting = nodes[0].getnewaddress()
addr_before_encrypting_data = nodes[0].getaddressinfo(addr_before_encrypting)
wallet_info_old = nodes[0].getwalletinfo()
if not self.options.descriptors:
Reported by Pylint.
Line: 20
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
nodes = self.nodes
addr_before_encrypting = nodes[0].getnewaddress()
addr_before_encrypting_data = nodes[0].getaddressinfo(addr_before_encrypting)
wallet_info_old = nodes[0].getwalletinfo()
if not self.options.descriptors:
Reported by Pylint.
Line: 26
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
addr_before_encrypting_data = nodes[0].getaddressinfo(addr_before_encrypting)
wallet_info_old = nodes[0].getwalletinfo()
if not self.options.descriptors:
assert addr_before_encrypting_data['hdseedid'] == wallet_info_old['hdseedid']
# Encrypt wallet and wait to terminate
nodes[0].encryptwallet('test')
if self.options.descriptors:
# Import hardened derivation only descriptors
Reported by Bandit.
Line: 35
Column: 1
nodes[0].walletpassphrase('test', 10)
nodes[0].importdescriptors([
{
"desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
Reported by Pylint.
Line: 41
Column: 1
"active": True
},
{
"desc": "pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/1h/*h)#a0nyvl0k",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
Reported by Pylint.
Line: 47
Column: 1
"active": True
},
{
"desc": "sh(wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/2h/*h))#lmeu2axg",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
Reported by Pylint.
Line: 53
Column: 1
"active": True
},
{
"desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/3h/*h)#jkl636gm",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
},
Reported by Pylint.
Line: 60
Column: 1
"internal": True
},
{
"desc": "pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/4h/*h)#l3crwaus",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
},
Reported by Pylint.
Line: 67
Column: 1
"internal": True
},
{
"desc": "sh(wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/5h/*h))#qg8wa75f",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
}
Reported by Pylint.
test/functional/feature_dbcrash.py
28 issues
Line: 78
Column: 5
self.import_deterministic_coinbase_privkeys()
# Leave them unconnected, we'll use submitblock directly in this test
def restart_node(self, node_index, expected_tip):
"""Start up a given node id, wait for the tip to reach the given block hash, and calculate the utxo hash.
Exceptions on startup should indicate node crash (due to -dbcrashratio), in which case we try again. Give up
after 60 seconds. Returns the utxo hash of the given node."""
Reported by Pylint.
Line: 92
Column: 13
self.nodes[node_index].waitforblock(expected_tip)
utxo_hash = self.nodes[node_index].gettxoutsetinfo()['hash_serialized_2']
return utxo_hash
except:
# An exception here should mean the node is about to crash.
# If bitcoind exits, then try again. wait_for_node_exit()
# should raise an exception if bitcoind doesn't exit.
self.wait_for_node_exit(node_index, timeout=10)
self.crashed_on_restart += 1
Reported by Pylint.
Line: 103
Column: 3
# If we got here, bitcoind isn't coming back up on restart. Could be a
# bug in bitcoind, or we've gotten unlucky with our dbcrash ratio --
# perhaps we generated a test case that blew up our cache?
# TODO: If this happens a lot, we should try to restart without -dbcrashratio
# and make sure that recovery happens.
raise AssertionError("Unable to successfully restart node %d in allotted time", node_index)
def submit_block_catch_error(self, node_index, block):
"""Try submitting a block to the given node.
Reported by Pylint.
Line: 105
Column: 9
# perhaps we generated a test case that blew up our cache?
# TODO: If this happens a lot, we should try to restart without -dbcrashratio
# and make sure that recovery happens.
raise AssertionError("Unable to successfully restart node %d in allotted time", node_index)
def submit_block_catch_error(self, node_index, block):
"""Try submitting a block to the given node.
Catch any exceptions that indicate the node has crashed.
Reported by Pylint.
Line: 150
Column: 3
# Get the block from node3, and submit to node_i
self.log.debug("submitting block %s", block_hash)
if not self.submit_block_catch_error(i, block):
# TODO: more carefully check that the crash is due to -dbcrashratio
# (change the exit code perhaps, and check that here?)
self.wait_for_node_exit(i, timeout=30)
self.log.debug("Restarting node %d after block hash %s", i, block_hash)
nodei_utxo_hash = self.restart_node(i, block_hash)
assert nodei_utxo_hash is not None
Reported by Pylint.
Line: 189
Column: 3
assert_equal(nodei_utxo_hash, node3_utxo_hash)
def generate_small_transactions(self, node, count, utxo_list):
FEE = 1000 # TODO: replace this with node relay fee based calculation
num_transactions = 0
random.shuffle(utxo_list)
while len(utxo_list) >= 2 and num_transactions < count:
tx = CTransaction()
input_amount = 0
Reported by Pylint.
Line: 215
Column: 9
def run_test(self):
# Track test coverage statistics
self.restart_counts = [0, 0, 0] # Track the restarts for nodes 0-2
self.crashed_on_restart = 0 # Track count of crashes during recovery
# Start by creating a lot of utxos on node3
initial_height = self.nodes[3].getblockcount()
utxo_list = create_confirmed_utxos(self.nodes[3].getnetworkinfo()['relayfee'], self.nodes[3], 5000)
Reported by Pylint.
Line: 216
Column: 9
def run_test(self):
# Track test coverage statistics
self.restart_counts = [0, 0, 0] # Track the restarts for nodes 0-2
self.crashed_on_restart = 0 # Track count of crashes during recovery
# Start by creating a lot of utxos on node3
initial_height = self.nodes[3].getblockcount()
utxo_list = create_confirmed_utxos(self.nodes[3].getnetworkinfo()['relayfee'], self.nodes[3], 5000)
self.log.info("Prepped %d utxo entries", len(utxo_list))
Reported by Pylint.
Line: 47
Column: 1
)
class ChainstateWriteCrashTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 4
self.rpc_timeout = 480
self.supports_cli = False
Reported by Pylint.
Line: 47
Column: 1
)
class ChainstateWriteCrashTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 4
self.rpc_timeout = 480
self.supports_cli = False
Reported by Pylint.
test/functional/feature_fee_estimation.py
28 issues
Line: 212
Column: 13
self.confutxo.append(utx)
else:
newmem.append(utx)
self.memutxo = newmem
def run_test(self):
self.log.info("This test is time consuming, please be patient")
self.log.info("Splitting inputs so we can generate tx's")
Reported by Pylint.
Line: 220
Column: 9
# Start node0
self.start_node(0)
self.txouts = []
self.txouts2 = []
# Split a coinbase into two transaction puzzle outputs
split_inputs(self.nodes[0], self.nodes[0].listunspent(0), self.txouts, True)
# Mine
Reported by Pylint.
Line: 221
Column: 9
# Start node0
self.start_node(0)
self.txouts = []
self.txouts2 = []
# Split a coinbase into two transaction puzzle outputs
split_inputs(self.nodes[0], self.nodes[0].listunspent(0), self.txouts, True)
# Mine
while len(self.nodes[0].getrawmempool()) > 0:
Reported by Pylint.
Line: 256
Column: 9
self.sync_all()
self.fees_per_kb = []
self.memutxo = []
self.confutxo = self.txouts # Start with the set of confirmed txouts after splitting
self.log.info("Will output estimates for 1/2/3/6/15/25 blocks")
for _ in range(2):
Reported by Pylint.
Line: 257
Column: 9
self.sync_all()
self.fees_per_kb = []
self.memutxo = []
self.confutxo = self.txouts # Start with the set of confirmed txouts after splitting
self.log.info("Will output estimates for 1/2/3/6/15/25 blocks")
for _ in range(2):
self.log.info("Creating transactions and mining them with a block size that can't keep up")
Reported by Pylint.
Line: 258
Column: 9
self.fees_per_kb = []
self.memutxo = []
self.confutxo = self.txouts # Start with the set of confirmed txouts after splitting
self.log.info("Will output estimates for 1/2/3/6/15/25 blocks")
for _ in range(2):
self.log.info("Creating transactions and mining them with a block size that can't keep up")
# Create transactions and mine 10 small blocks with node 2, but create txs faster than we can mine
Reported by Pylint.
Line: 47
Column: 1
SCRIPT_SIG = [CScript([OP_TRUE, REDEEM_SCRIPT_1]), CScript([OP_TRUE, REDEEM_SCRIPT_2])]
def small_txpuzzle_randfee(from_node, conflist, unconflist, amount, min_fee, fee_increment):
"""Create and send a transaction with a random fee.
The transaction pays to a trivial P2SH script, and assumes that its inputs
are of the same form.
The function takes a list of confirmed outputs and unconfirmed outputs
Reported by Pylint.
Line: 60
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
# It's best to exponentially distribute our random fees
# because the buckets are exponentially spaced.
# Exponentially distributed from 1-128 * fee_increment
rand_fee = float(fee_increment) * (1.1892 ** random.randint(0, 28))
# Total fee ranges from min_fee to min_fee + 127*fee_increment
fee = min_fee - fee_increment + satoshi_round(rand_fee)
tx = CTransaction()
total_in = Decimal("0.00000000")
while total_in <= (amount + fee) and len(conflist) > 0:
Reported by Bandit.
Line: 63
Column: 5
rand_fee = float(fee_increment) * (1.1892 ** random.randint(0, 28))
# Total fee ranges from min_fee to min_fee + 127*fee_increment
fee = min_fee - fee_increment + satoshi_round(rand_fee)
tx = CTransaction()
total_in = Decimal("0.00000000")
while total_in <= (amount + fee) and len(conflist) > 0:
t = conflist.pop(0)
total_in += t["amount"]
tx.vin.append(CTxIn(COutPoint(int(t["txid"], 16), t["vout"]), b""))
Reported by Pylint.
Line: 66
Column: 9
tx = CTransaction()
total_in = Decimal("0.00000000")
while total_in <= (amount + fee) and len(conflist) > 0:
t = conflist.pop(0)
total_in += t["amount"]
tx.vin.append(CTxIn(COutPoint(int(t["txid"], 16), t["vout"]), b""))
if total_in <= amount + fee:
while total_in <= (amount + fee) and len(unconflist) > 0:
t = unconflist.pop(0)
Reported by Pylint.
test/functional/wallet_listtransactions.py
28 issues
Line: 18
Column: 1
assert_equal,
)
class ListTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
# This test isn't testing txn relay/timing, so set whitelist on the
# peers for instant txn relay. This speeds up the test run time 2-3x.
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
Reported by Pylint.
Line: 44
Column: 1
self.sync_all()
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid},
{"category": "send", "amount": Decimal("-0.1"), "confirmations": 1, "blockhash": blockhash, "blockheight": blockheight})
assert_array_result(self.nodes[1].listtransactions(),
{"txid": txid},
{"category": "receive", "amount": Decimal("0.1"), "confirmations": 1, "blockhash": blockhash, "blockheight": blockheight})
self.log.info("Test send-to-self on node0")
Reported by Pylint.
Line: 47
Column: 1
{"category": "send", "amount": Decimal("-0.1"), "confirmations": 1, "blockhash": blockhash, "blockheight": blockheight})
assert_array_result(self.nodes[1].listtransactions(),
{"txid": txid},
{"category": "receive", "amount": Decimal("0.1"), "confirmations": 1, "blockhash": blockhash, "blockheight": blockheight})
self.log.info("Test send-to-self on node0")
txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.2)
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid, "category": "send"},
Reported by Pylint.
Line: 99
Column: 1
txid = self.nodes[1].sendtoaddress(multisig["address"], 0.1)
self.nodes[1].generate(1)
self.sync_all()
assert_equal(len(self.nodes[0].listtransactions(label="watchonly", include_watchonly=True)), 1)
assert_equal(len(self.nodes[0].listtransactions(dummy="watchonly", include_watchonly=True)), 1)
assert len(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=False)) == 0
assert_array_result(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "label": "watchonly"})
Reported by Pylint.
Line: 100
Column: 1
self.nodes[1].generate(1)
self.sync_all()
assert_equal(len(self.nodes[0].listtransactions(label="watchonly", include_watchonly=True)), 1)
assert_equal(len(self.nodes[0].listtransactions(dummy="watchonly", include_watchonly=True)), 1)
assert len(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=False)) == 0
assert_array_result(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "label": "watchonly"})
Reported by Pylint.
Line: 101
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.sync_all()
assert_equal(len(self.nodes[0].listtransactions(label="watchonly", include_watchonly=True)), 1)
assert_equal(len(self.nodes[0].listtransactions(dummy="watchonly", include_watchonly=True)), 1)
assert len(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=False)) == 0
assert_array_result(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "label": "watchonly"})
self.run_rbf_opt_in_test()
Reported by Bandit.
Line: 101
Column: 1
self.sync_all()
assert_equal(len(self.nodes[0].listtransactions(label="watchonly", include_watchonly=True)), 1)
assert_equal(len(self.nodes[0].listtransactions(dummy="watchonly", include_watchonly=True)), 1)
assert len(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=False)) == 0
assert_array_result(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "label": "watchonly"})
self.run_rbf_opt_in_test()
Reported by Pylint.
Line: 102
Column: 1
assert_equal(len(self.nodes[0].listtransactions(label="watchonly", include_watchonly=True)), 1)
assert_equal(len(self.nodes[0].listtransactions(dummy="watchonly", include_watchonly=True)), 1)
assert len(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=False)) == 0
assert_array_result(self.nodes[0].listtransactions(label="watchonly", count=100, include_watchonly=True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "label": "watchonly"})
self.run_rbf_opt_in_test()
Reported by Pylint.
Line: 109
Column: 5
self.run_rbf_opt_in_test()
def run_rbf_opt_in_test(self):
"""Test the opt-in-rbf flag for sent and received transactions."""
def is_opt_in(node, txid):
"""Check whether a transaction signals opt-in RBF itself."""
rawtx = node.getrawtransaction(txid, 1)
Reported by Pylint.
Line: 109
Column: 5
self.run_rbf_opt_in_test()
def run_rbf_opt_in_test(self):
"""Test the opt-in-rbf flag for sent and received transactions."""
def is_opt_in(node, txid):
"""Check whether a transaction signals opt-in RBF itself."""
rawtx = node.getrawtransaction(txid, 1)
Reported by Pylint.
test/functional/wallet_dump.py
28 issues
Line: 17
Column: 1
)
def read_dump(file_name, addrs, script_addrs, hd_master_addr_old):
"""
Read the given dump, count the addrs that match, count change and reserve.
Also check that the old hd_master is inactive
"""
with open(file_name, encoding='utf8') as inputfile:
Reported by Pylint.
Line: 17
Column: 1
)
def read_dump(file_name, addrs, script_addrs, hd_master_addr_old):
"""
Read the given dump, count the addrs that match, count change and reserve.
Also check that the old hd_master is inactive
"""
with open(file_name, encoding='utf8') as inputfile:
Reported by Pylint.
Line: 17
Column: 1
)
def read_dump(file_name, addrs, script_addrs, hd_master_addr_old):
"""
Read the given dump, count the addrs that match, count change and reserve.
Also check that the old hd_master is inactive
"""
with open(file_name, encoding='utf8') as inputfile:
Reported by Pylint.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
keypath = None
if keytype == "inactivehdseed=1":
# ensure the old master is still available
assert hd_master_addr_old == addr
elif keytype == "hdseed=1":
# ensure we have generated a new hd master key
assert hd_master_addr_old != addr
hd_master_addr_ret = addr
elif keytype == "script=1":
Reported by Bandit.
Line: 59
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert hd_master_addr_old == addr
elif keytype == "hdseed=1":
# ensure we have generated a new hd master key
assert hd_master_addr_old != addr
hd_master_addr_ret = addr
elif keytype == "script=1":
# scripts don't have keypaths
keypath = None
else:
Reported by Bandit.
Line: 68
Column: 21
keypath = addr_keypath.rstrip().split("hdkeypath=")[1]
# count key types
for addrObj in addrs:
if addrObj['address'] == addr.split(",")[0] and addrObj['hdkeypath'] == keypath and keytype == "label=":
if addr.startswith('m') or addr.startswith('n'):
# P2PKH address
found_legacy_addr += 1
elif addr.startswith('2'):
Reported by Pylint.
Line: 69
Column: 21
# count key types
for addrObj in addrs:
if addrObj['address'] == addr.split(",")[0] and addrObj['hdkeypath'] == keypath and keytype == "label=":
if addr.startswith('m') or addr.startswith('n'):
# P2PKH address
found_legacy_addr += 1
elif addr.startswith('2'):
# P2SH-segwit address
Reported by Pylint.
Line: 69
Column: 1
# count key types
for addrObj in addrs:
if addrObj['address'] == addr.split(",")[0] and addrObj['hdkeypath'] == keypath and keytype == "label=":
if addr.startswith('m') or addr.startswith('n'):
# P2PKH address
found_legacy_addr += 1
elif addr.startswith('2'):
# P2SH-segwit address
Reported by Pylint.
Line: 92
Column: 1
found_script_addr += 1
break
return found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_ret
class WalletDumpTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
Reported by Pylint.
Line: 95
Column: 1
return found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_ret
class WalletDumpTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [["-keypool=90", "-addresstype=legacy"]]
self.rpc_timeout = 120
Reported by Pylint.
test/functional/rpc_help.py
27 issues
Line: 10
Column: 1
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
from collections import defaultdict
import os
import re
def parse_string(s):
Reported by Pylint.
Line: 11
Column: 1
from test_framework.util import assert_equal, assert_raises_rpc_error
from collections import defaultdict
import os
import re
def parse_string(s):
assert s[0] == '"'
Reported by Pylint.
Line: 12
Column: 1
from collections import defaultdict
import os
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
Reported by Pylint.
Line: 15
Column: 1
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
Reported by Pylint.
Line: 15
Column: 1
import re
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
Reported by Pylint.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
def process_mapping(fname):
Reported by Bandit.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def parse_string(s):
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
def process_mapping(fname):
"""Find and parse conversion table in implementation file `fname`."""
Reported by Bandit.
Line: 25
Column: 47
"""Find and parse conversion table in implementation file `fname`."""
cmds = []
in_rpcs = False
with open(fname, "r", encoding="utf8") as f:
for line in f:
line = line.rstrip()
if not in_rpcs:
if line == 'static const CRPCConvertParam vRPCConvertParams[] =':
in_rpcs = True
Reported by Pylint.
Line: 35
Column: 21
if line.startswith('};'):
in_rpcs = False
elif '{' in line and '"' in line:
m = re.search(r'{ *("[^"]*"), *([0-9]+) *, *("[^"]*") *},', line)
assert m, 'No match to table expression: %s' % line
name = parse_string(m.group(1))
idx = int(m.group(2))
argname = parse_string(m.group(3))
cmds.append((name, idx, argname))
Reported by Pylint.
Line: 36
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
in_rpcs = False
elif '{' in line and '"' in line:
m = re.search(r'{ *("[^"]*"), *([0-9]+) *, *("[^"]*") *},', line)
assert m, 'No match to table expression: %s' % line
name = parse_string(m.group(1))
idx = int(m.group(2))
argname = parse_string(m.group(3))
cmds.append((name, idx, argname))
assert not in_rpcs and cmds
Reported by Bandit.
test/functional/test_framework/script_util.py
27 issues
Line: 28
Column: 29
DUMMY_P2WPKH_SCRIPT = CScript([b'a' * 21])
DUMMY_2_P2WPKH_SCRIPT = CScript([b'b' * 21])
def keyhash_to_p2pkh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
Reported by Pylint.
Line: 28
Column: 35
DUMMY_P2WPKH_SCRIPT = CScript([b'a' * 21])
DUMMY_2_P2WPKH_SCRIPT = CScript([b'b' * 21])
def keyhash_to_p2pkh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
Reported by Pylint.
Line: 32
Column: 37
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_HASH160, hash, OP_EQUAL])
def key_to_p2pkh_script(key, main = False):
key = check_key(key)
Reported by Pylint.
Line: 32
Column: 31
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_HASH160, hash, OP_EQUAL])
def key_to_p2pkh_script(key, main = False):
key = check_key(key)
Reported by Pylint.
Line: 49
Column: 49
p2shscript = CScript([OP_0, hash160(key)])
return script_to_p2sh_script(p2shscript, main)
def program_to_witness_script(version, program, main = False):
if isinstance(program, str):
program = bytes.fromhex(program)
assert 0 <= version <= 16
assert 2 <= len(program) <= 40
assert version > 0 or len(program) in [20, 32]
Reported by Pylint.
Line: 6
Column: 1
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Useful Script constants and utils."""
from test_framework.script import CScript, hash160, sha256, OP_0, OP_DUP, OP_HASH160, OP_CHECKSIG, OP_EQUAL, OP_EQUALVERIFY
# To prevent a "tx-size-small" policy rule error, a transaction has to have a
# non-witness size of at least 82 bytes (MIN_STANDARD_TX_NONWITNESS_SIZE in
# src/policy/policy.h). Considering a Tx with the smallest possible single
# input (blank, empty scriptSig), and with an output omitting the scriptPubKey,
Reported by Pylint.
Line: 28
Column: 1
DUMMY_P2WPKH_SCRIPT = CScript([b'a' * 21])
DUMMY_2_P2WPKH_SCRIPT = CScript([b'b' * 21])
def keyhash_to_p2pkh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
Reported by Pylint.
Line: 29
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
DUMMY_2_P2WPKH_SCRIPT = CScript([b'b' * 21])
def keyhash_to_p2pkh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_HASH160, hash, OP_EQUAL])
Reported by Bandit.
Line: 32
Column: 1
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_HASH160, hash, OP_EQUAL])
def key_to_p2pkh_script(key, main = False):
key = check_key(key)
Reported by Pylint.
Line: 33
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
def scripthash_to_p2sh_script(hash, main = False):
assert len(hash) == 20
return CScript([OP_HASH160, hash, OP_EQUAL])
def key_to_p2pkh_script(key, main = False):
key = check_key(key)
return keyhash_to_p2pkh_script(hash160(key), main)
Reported by Bandit.