The following issues were found
test/functional/feature_coinstatsindex.py
21 issues
Line: 37
Column: 1
assert_raises_rpc_error,
)
class CoinStatsIndexTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.supports_cli = False
self.extra_args = [
Reported by Pylint.
Line: 56
Column: 5
self._test_reorg_index()
self._test_index_rejects_hash_serialized()
def block_sanity_check(self, block_info):
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
Reported by Pylint.
Line: 56
Column: 5
self._test_reorg_index()
self._test_index_rejects_hash_serialized()
def block_sanity_check(self, block_info):
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
Reported by Pylint.
Line: 60
Column: 1
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
Reported by Pylint.
Line: 63
Column: 5
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
# Both none and muhash options allow the usage of the index
index_hash_options = ['none', 'muhash']
Reported by Pylint.
Line: 63
Column: 5
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
# Both none and muhash options allow the usage of the index
index_hash_options = ['none', 'muhash']
Reported by Pylint.
Line: 77
Column: 1
self.sync_blocks(timeout=120)
self.log.info("Test that gettxoutsetinfo() output is consistent with or without coinstatsindex option")
res0 = node.gettxoutsetinfo('none')
# The fields 'disk_size' and 'transactions' do not exist on the index
del res0['disk_size'], res0['transactions']
Reported by Pylint.
Line: 92
Column: 1
# Everything left should be the same
assert_equal(res1, res0)
self.log.info("Test that gettxoutsetinfo() can get fetch data on specific heights with index")
# Generate a new tip
node.generate(5)
for hash_option in index_hash_options:
Reported by Pylint.
Line: 111
Column: 1
assert_equal(res0, res3)
# It does not work without coinstatsindex
assert_raises_rpc_error(-8, "Querying specific block heights requires coinstatsindex", node.gettxoutsetinfo, hash_option, 102)
self.log.info("Test gettxoutsetinfo() with index and verbose flag")
for hash_option in index_hash_options:
# Genesis block is unspendable
Reported by Pylint.
Line: 162
Column: 17
tx1_final = self.nodes[0].gettransaction(tx1_txid)
for output in tx1_final['details']:
if output['amount'] == Decimal('21.00000000') and output['category'] == 'receive':
n = output['vout']
# Generate and send another tx with an OP_RETURN output (which is unspendable)
tx2 = CTransaction()
tx2.vin.append(CTxIn(COutPoint(int(tx1_txid, 16), n), b''))
tx2.vout.append(CTxOut(int(20.99 * COIN), CScript([OP_RETURN] + [OP_FALSE]*30)))
Reported by Pylint.
test/functional/wallet_groups.py
19 issues
Line: 45
Column: 9
addrs = addr1 + addr2
# Send 1 + 0.5 coin to each address
[self.nodes[0].sendtoaddress(addr, 1.0) for addr in addrs]
[self.nodes[0].sendtoaddress(addr, 0.5) for addr in addrs]
self.nodes[0].generate(1)
self.sync_all()
Reported by Pylint.
Line: 46
Column: 9
# Send 1 + 0.5 coin to each address
[self.nodes[0].sendtoaddress(addr, 1.0) for addr in addrs]
[self.nodes[0].sendtoaddress(addr, 0.5) for addr in addrs]
self.nodes[0].generate(1)
self.sync_all()
# For each node, send 0.2 coins back to 0;
Reported by Pylint.
Line: 127
Column: 9
assert_equal(2, len(tx4["vout"]))
addr_aps2 = self.nodes[3].getnewaddress()
[self.nodes[0].sendtoaddress(addr_aps2, 1.0) for _ in range(5)]
self.nodes[0].generate(1)
self.sync_all()
with self.nodes[3].assert_debug_log(['Fee non-grouped = 5520, grouped = 8240, using non-grouped']):
txid5 = self.nodes[3].sendtoaddress(self.nodes[0].getnewaddress(), 2.95)
tx5 = self.nodes[3].getrawtransaction(txid5, True)
Reported by Pylint.
Line: 141
Column: 9
# 1 sat higher, crossing the threshold from non-grouped to grouped.
self.log.info("Test wallet option maxapsfee threshold from non-grouped to grouped")
addr_aps3 = self.nodes[4].getnewaddress()
[self.nodes[0].sendtoaddress(addr_aps3, 1.0) for _ in range(5)]
self.nodes[0].generate(1)
self.sync_all()
with self.nodes[4].assert_debug_log(['Fee non-grouped = 5520, grouped = 8240, using grouped']):
txid6 = self.nodes[4].sendtoaddress(self.nodes[0].getnewaddress(), 2.95)
tx6 = self.nodes[4].getrawtransaction(txid6, True)
Reported by Pylint.
Line: 18
Column: 1
)
class WalletGroupTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 5
self.extra_args = [
[],
Reported by Pylint.
Line: 34
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.log.info("Setting up")
# Mine some coins
self.nodes[0].generate(COINBASE_MATURITY + 1)
# Get some addresses from the two nodes
Reported by Pylint.
Line: 34
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.log.info("Setting up")
# Mine some coins
self.nodes[0].generate(COINBASE_MATURITY + 1)
# Get some addresses from the two nodes
Reported by Pylint.
Line: 62
Column: 9
assert_equal(1, len(tx1["vin"]))
assert_equal(2, len(tx1["vout"]))
# one output should be 0.2, the other should be ~0.3
v = [vout["value"] for vout in tx1["vout"]]
v.sort()
assert_approx(v[0], vexp=0.2, vspan=0.0001)
assert_approx(v[1], vexp=0.3, vspan=0.0001)
txid2 = self.nodes[2].sendtoaddress(self.nodes[0].getnewaddress(), 0.2)
Reported by Pylint.
Line: 73
Column: 9
assert_equal(2, len(tx2["vin"]))
assert_equal(2, len(tx2["vout"]))
# one output should be 0.2, the other should be ~1.3
v = [vout["value"] for vout in tx2["vout"]]
v.sort()
assert_approx(v[0], vexp=0.2, vspan=0.0001)
assert_approx(v[1], vexp=1.3, vspan=0.0001)
self.log.info("Test avoiding partial spends if warranted, even if avoidpartialspends is disabled")
Reported by Pylint.
Line: 78
Column: 1
assert_approx(v[0], vexp=0.2, vspan=0.0001)
assert_approx(v[1], vexp=1.3, vspan=0.0001)
self.log.info("Test avoiding partial spends if warranted, even if avoidpartialspends is disabled")
self.sync_all()
self.nodes[0].generate(1)
# Nodes 1-2 now have confirmed UTXOs (letters denote destinations):
# Node #1: Node #2:
# - A 1.0 - D0 1.0
Reported by Pylint.
test/functional/rpc_getdescriptorinfo.py
19 issues
Line: 16
Column: 1
)
class DescriptorTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [["-disablewallet"]]
self.wallet_names = []
Reported by Pylint.
Line: 22
Column: 5
self.extra_args = [["-disablewallet"]]
self.wallet_names = []
def test_desc(self, desc, isrange, issolvable, hasprivatekeys):
info = self.nodes[0].getdescriptorinfo(desc)
assert_equal(info, self.nodes[0].getdescriptorinfo(descsum_create(desc)))
assert_equal(info['descriptor'], descsum_create(desc))
assert_equal(info['isrange'], isrange)
assert_equal(info['issolvable'], issolvable)
Reported by Pylint.
Line: 33
Column: 1
def run_test(self):
assert_raises_rpc_error(-1, 'getdescriptorinfo', self.nodes[0].getdescriptorinfo)
assert_raises_rpc_error(-3, 'Expected type string', self.nodes[0].getdescriptorinfo, 1)
assert_raises_rpc_error(-5, 'is not a valid descriptor function', self.nodes[0].getdescriptorinfo, '')
# P2PK output with the specified public key.
self.test_desc('pk(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2PKH output with the specified public key.
self.test_desc('pkh(02c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5)', isrange=False, issolvable=True, hasprivatekeys=False)
Reported by Pylint.
Line: 36
Column: 1
assert_raises_rpc_error(-5, 'is not a valid descriptor function', self.nodes[0].getdescriptorinfo, '')
# P2PK output with the specified public key.
self.test_desc('pk(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2PKH output with the specified public key.
self.test_desc('pkh(02c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2WPKH output with the specified public key.
self.test_desc('wpkh(02f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2SH-P2WPKH output with the specified public key.
Reported by Pylint.
Line: 38
Column: 1
# P2PK output with the specified public key.
self.test_desc('pk(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2PKH output with the specified public key.
self.test_desc('pkh(02c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2WPKH output with the specified public key.
self.test_desc('wpkh(02f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2SH-P2WPKH output with the specified public key.
self.test_desc('sh(wpkh(03fff97bd5755eeea420453a14355235d382f6472f8568a18b2f057a1460297556))', isrange=False, issolvable=True, hasprivatekeys=False)
# Any P2PK, P2PKH, P2WPKH, or P2SH-P2WPKH output with the specified public key.
Reported by Pylint.
Line: 40
Column: 1
# P2PKH output with the specified public key.
self.test_desc('pkh(02c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2WPKH output with the specified public key.
self.test_desc('wpkh(02f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2SH-P2WPKH output with the specified public key.
self.test_desc('sh(wpkh(03fff97bd5755eeea420453a14355235d382f6472f8568a18b2f057a1460297556))', isrange=False, issolvable=True, hasprivatekeys=False)
# Any P2PK, P2PKH, P2WPKH, or P2SH-P2WPKH output with the specified public key.
self.test_desc('combo(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# An (overly complicated) P2SH-P2WSH-P2PKH output with the specified public key.
Reported by Pylint.
Line: 42
Column: 1
# P2WPKH output with the specified public key.
self.test_desc('wpkh(02f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9)', isrange=False, issolvable=True, hasprivatekeys=False)
# P2SH-P2WPKH output with the specified public key.
self.test_desc('sh(wpkh(03fff97bd5755eeea420453a14355235d382f6472f8568a18b2f057a1460297556))', isrange=False, issolvable=True, hasprivatekeys=False)
# Any P2PK, P2PKH, P2WPKH, or P2SH-P2WPKH output with the specified public key.
self.test_desc('combo(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# An (overly complicated) P2SH-P2WSH-P2PKH output with the specified public key.
self.test_desc('sh(wsh(pkh(02e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd13)))', isrange=False, issolvable=True, hasprivatekeys=False)
# A bare *1-of-2* multisig output with keys in the specified order.
Reported by Pylint.
Line: 44
Column: 1
# P2SH-P2WPKH output with the specified public key.
self.test_desc('sh(wpkh(03fff97bd5755eeea420453a14355235d382f6472f8568a18b2f057a1460297556))', isrange=False, issolvable=True, hasprivatekeys=False)
# Any P2PK, P2PKH, P2WPKH, or P2SH-P2WPKH output with the specified public key.
self.test_desc('combo(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# An (overly complicated) P2SH-P2WSH-P2PKH output with the specified public key.
self.test_desc('sh(wsh(pkh(02e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd13)))', isrange=False, issolvable=True, hasprivatekeys=False)
# A bare *1-of-2* multisig output with keys in the specified order.
self.test_desc('multi(1,022f8bde4d1a07209355b4a7250a5c5128e88b84bddc619ab7cba8d569b240efe4,025cbdf0646e5db4eaa398f365f2ea7a0e3d419b7e0330e39ce92bddedcac4f9bc)', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2SH *2-of-2* multisig output with keys in the specified order.
Reported by Pylint.
Line: 46
Column: 1
# Any P2PK, P2PKH, P2WPKH, or P2SH-P2WPKH output with the specified public key.
self.test_desc('combo(0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798)', isrange=False, issolvable=True, hasprivatekeys=False)
# An (overly complicated) P2SH-P2WSH-P2PKH output with the specified public key.
self.test_desc('sh(wsh(pkh(02e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd13)))', isrange=False, issolvable=True, hasprivatekeys=False)
# A bare *1-of-2* multisig output with keys in the specified order.
self.test_desc('multi(1,022f8bde4d1a07209355b4a7250a5c5128e88b84bddc619ab7cba8d569b240efe4,025cbdf0646e5db4eaa398f365f2ea7a0e3d419b7e0330e39ce92bddedcac4f9bc)', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2SH *2-of-2* multisig output with keys in the specified order.
self.test_desc('sh(multi(2,022f01e5e15cca351daff3843fb70f3c2f0a1bdd05e5af888a67784ef3e10a2a01,03acd484e2f0c7f65309ad178a9f559abde09796974c57e714c35f110dfc27ccbe))', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2WSH *2-of-3* multisig output with keys in the specified order.
Reported by Pylint.
Line: 48
Column: 1
# An (overly complicated) P2SH-P2WSH-P2PKH output with the specified public key.
self.test_desc('sh(wsh(pkh(02e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd13)))', isrange=False, issolvable=True, hasprivatekeys=False)
# A bare *1-of-2* multisig output with keys in the specified order.
self.test_desc('multi(1,022f8bde4d1a07209355b4a7250a5c5128e88b84bddc619ab7cba8d569b240efe4,025cbdf0646e5db4eaa398f365f2ea7a0e3d419b7e0330e39ce92bddedcac4f9bc)', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2SH *2-of-2* multisig output with keys in the specified order.
self.test_desc('sh(multi(2,022f01e5e15cca351daff3843fb70f3c2f0a1bdd05e5af888a67784ef3e10a2a01,03acd484e2f0c7f65309ad178a9f559abde09796974c57e714c35f110dfc27ccbe))', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2WSH *2-of-3* multisig output with keys in the specified order.
self.test_desc('wsh(multi(2,03a0434d9e47f3c86235477c7b1ae6ae5d3442d49b1943c2b752a68e2a47e247c7,03774ae7f858a9411e5ef4246b70c65aac5649980be5c17891bbec17895da008cb,03d01115d548e7561b15c38f004d734633687cf4419620095bc5b0f47070afe85a))', isrange=False, issolvable=True, hasprivatekeys=False)
# A P2SH-P2WSH *1-of-3* multisig output with keys in the specified order.
Reported by Pylint.
test/functional/feature_cltv.py
19 issues
Line: 39
Column: 1
# Helper function to modify a transaction by
# 1) prepending a given script to the scriptSig of vin 0 and
# 2) (optionally) modify the nSequence of vin 0 and the tx's nLockTime
def cltv_modify_tx(tx, prepend_scriptsig, nsequence=None, nlocktime=None):
assert_equal(len(tx.vin), 1)
if nsequence is not None:
tx.vin[0].nSequence = nsequence
tx.nLockTime = nlocktime
Reported by Pylint.
Line: 39
Column: 1
# Helper function to modify a transaction by
# 1) prepending a given script to the scriptSig of vin 0 and
# 2) (optionally) modify the nSequence of vin 0 and the tx's nLockTime
def cltv_modify_tx(tx, prepend_scriptsig, nsequence=None, nlocktime=None):
assert_equal(len(tx.vin), 1)
if nsequence is not None:
tx.vin[0].nSequence = nsequence
tx.nLockTime = nlocktime
Reported by Pylint.
Line: 49
Column: 1
tx.rehash()
def cltv_invalidate(tx, failure_reason):
# Modify the signature in vin 0 and nSequence/nLockTime of the tx to fail CLTV
#
# According to BIP65, OP_CHECKLOCKTIMEVERIFY can fail due the following reasons:
# 1) the stack is empty
# 2) the top item on the stack is less than 0
Reported by Pylint.
Line: 49
Column: 1
tx.rehash()
def cltv_invalidate(tx, failure_reason):
# Modify the signature in vin 0 and nSequence/nLockTime of the tx to fail CLTV
#
# According to BIP65, OP_CHECKLOCKTIMEVERIFY can fail due the following reasons:
# 1) the stack is empty
# 2) the top item on the stack is less than 0
Reported by Pylint.
Line: 59
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# nLockTime field are not the same
# 4) the top stack item is greater than the transaction's nLockTime field
# 5) the nSequence field of the txin is 0xffffffff
assert failure_reason in range(5)
scheme = [
# | Script to prepend to scriptSig | nSequence | nLockTime |
# +-------------------------------------------------+------------+--------------+
[[OP_CHECKLOCKTIMEVERIFY], None, None],
[[OP_1NEGATE, OP_CHECKLOCKTIMEVERIFY, OP_DROP], None, None],
Reported by Bandit.
Line: 65
Column: 1
# +-------------------------------------------------+------------+--------------+
[[OP_CHECKLOCKTIMEVERIFY], None, None],
[[OP_1NEGATE, OP_CHECKLOCKTIMEVERIFY, OP_DROP], None, None],
[[CScriptNum(1000), OP_CHECKLOCKTIMEVERIFY, OP_DROP], 0, 1296688602], # timestamp of genesis block
[[CScriptNum(1000), OP_CHECKLOCKTIMEVERIFY, OP_DROP], 0, 500],
[[CScriptNum(500), OP_CHECKLOCKTIMEVERIFY, OP_DROP], 0xffffffff, 500],
][failure_reason]
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
Reported by Pylint.
Line: 73
Column: 1
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
def cltv_validate(tx, height):
# Modify the signature in vin 0 and nSequence/nLockTime of the tx to pass CLTV
scheme = [[CScriptNum(height), OP_CHECKLOCKTIMEVERIFY, OP_DROP], 0, height]
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
Reported by Pylint.
Line: 73
Column: 1
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
def cltv_validate(tx, height):
# Modify the signature in vin 0 and nSequence/nLockTime of the tx to pass CLTV
scheme = [[CScriptNum(height), OP_CHECKLOCKTIMEVERIFY, OP_DROP], 0, height]
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
Reported by Pylint.
Line: 80
Column: 1
cltv_modify_tx(tx, prepend_scriptsig=scheme[0], nsequence=scheme[1], nlocktime=scheme[2])
class BIP65Test(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [[
'-whitelist=noban@127.0.0.1',
'-par=1', # Use only one script thread to get the exact reject reason for testing
Reported by Pylint.
Line: 91
Column: 5
self.setup_clean_chain = True
self.rpc_timeout = 480
def test_cltv_info(self, *, is_active):
assert_equal(self.nodes[0].getblockchaininfo()['softforks']['bip65'], {
"active": is_active,
"height": CLTV_HEIGHT,
"type": "buried",
},
Reported by Pylint.
test/functional/mempool_unbroadcast.py
18 issues
Line: 19
Column: 1
MAX_INITIAL_BROADCAST_DELAY = 15 * 60 # 15 minutes in seconds
class MempoolUnbroadcastTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
Reported by Pylint.
Line: 30
Column: 5
self.test_broadcast()
self.test_txn_removal()
def test_broadcast(self):
self.log.info("Test that mempool reattempts delivery of locally submitted transaction")
node = self.nodes[0]
min_relay_fee = node.getnetworkinfo()["relayfee"]
utxos = create_confirmed_utxos(min_relay_fee, node, 10)
Reported by Pylint.
Line: 30
Column: 5
self.test_broadcast()
self.test_txn_removal()
def test_broadcast(self):
self.log.info("Test that mempool reattempts delivery of locally submitted transaction")
node = self.nodes[0]
min_relay_fee = node.getnetworkinfo()["relayfee"]
utxos = create_confirmed_utxos(min_relay_fee, node, 10)
Reported by Pylint.
Line: 49
Column: 9
us0 = utxos.pop()
inputs = [{"txid": us0["txid"], "vout": us0["vout"]}]
outputs = {addr: 0.0001}
tx = node.createrawtransaction(inputs, outputs)
node.settxfee(min_relay_fee)
txF = node.fundrawtransaction(tx)
txFS = node.signrawtransactionwithwallet(txF["hex"])
rpc_tx_hsh = node.sendrawtransaction(txFS["hex"])
Reported by Pylint.
Line: 51
Column: 9
outputs = {addr: 0.0001}
tx = node.createrawtransaction(inputs, outputs)
node.settxfee(min_relay_fee)
txF = node.fundrawtransaction(tx)
txFS = node.signrawtransactionwithwallet(txF["hex"])
rpc_tx_hsh = node.sendrawtransaction(txFS["hex"])
# check transactions are in unbroadcast using rpc
mempoolinfo = self.nodes[0].getmempoolinfo()
Reported by Pylint.
Line: 52
Column: 9
tx = node.createrawtransaction(inputs, outputs)
node.settxfee(min_relay_fee)
txF = node.fundrawtransaction(tx)
txFS = node.signrawtransactionwithwallet(txF["hex"])
rpc_tx_hsh = node.sendrawtransaction(txFS["hex"])
# check transactions are in unbroadcast using rpc
mempoolinfo = self.nodes[0].getmempoolinfo()
assert_equal(mempoolinfo['unbroadcastcount'], 2)
Reported by Pylint.
Line: 59
Column: 13
mempoolinfo = self.nodes[0].getmempoolinfo()
assert_equal(mempoolinfo['unbroadcastcount'], 2)
mempool = self.nodes[0].getrawmempool(True)
for tx in mempool:
assert_equal(mempool[tx]['unbroadcast'], True)
# check that second node doesn't have these two txns
mempool = self.nodes[1].getrawmempool()
assert rpc_tx_hsh not in mempool
Reported by Pylint.
Line: 64
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# check that second node doesn't have these two txns
mempool = self.nodes[1].getrawmempool()
assert rpc_tx_hsh not in mempool
assert wallet_tx_hsh not in mempool
# ensure that unbroadcast txs are persisted to mempool.dat
self.restart_node(0)
Reported by Bandit.
Line: 65
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# check that second node doesn't have these two txns
mempool = self.nodes[1].getrawmempool()
assert rpc_tx_hsh not in mempool
assert wallet_tx_hsh not in mempool
# ensure that unbroadcast txs are persisted to mempool.dat
self.restart_node(0)
self.log.info("Reconnect nodes & check if they are sent to node 1")
Reported by Bandit.
Line: 77
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY)
self.sync_mempools(timeout=30)
mempool = self.nodes[1].getrawmempool()
assert rpc_tx_hsh in mempool
assert wallet_tx_hsh in mempool
# check that transactions are no longer in first node's unbroadcast set
mempool = self.nodes[0].getrawmempool(True)
for tx in mempool:
Reported by Bandit.
test/functional/test_framework/muhash.py
18 issues
Line: 9
Column: 1
import hashlib
import unittest
from .util import modinv
def rot32(v, bits):
"""Rotate the 32-bit value v left by bits bits."""
bits %= 32 # Make sure the term below does not throw an exception
return ((v << bits) & 0xffffffff) | (v >> (32 - bits))
Reported by Pylint.
Line: 11
Column: 1
from .util import modinv
def rot32(v, bits):
"""Rotate the 32-bit value v left by bits bits."""
bits %= 32 # Make sure the term below does not throw an exception
return ((v << bits) & 0xffffffff) | (v >> (32 - bits))
def chacha20_doubleround(s):
Reported by Pylint.
Line: 16
Column: 1
bits %= 32 # Make sure the term below does not throw an exception
return ((v << bits) & 0xffffffff) | (v >> (32 - bits))
def chacha20_doubleround(s):
"""Apply a ChaCha20 double round to 16-element state array s.
See https://cr.yp.to/chacha/chacha-20080128.pdf and https://tools.ietf.org/html/rfc8439
"""
QUARTER_ROUNDS = [(0, 4, 8, 12),
Reported by Pylint.
Line: 21
Column: 5
See https://cr.yp.to/chacha/chacha-20080128.pdf and https://tools.ietf.org/html/rfc8439
"""
QUARTER_ROUNDS = [(0, 4, 8, 12),
(1, 5, 9, 13),
(2, 6, 10, 14),
(3, 7, 11, 15),
(0, 5, 10, 15),
(1, 6, 11, 12),
Reported by Pylint.
Line: 30
Column: 18
(2, 7, 8, 13),
(3, 4, 9, 14)]
for a, b, c, d in QUARTER_ROUNDS:
s[a] = (s[a] + s[b]) & 0xffffffff
s[d] = rot32(s[d] ^ s[a], 16)
s[c] = (s[c] + s[d]) & 0xffffffff
s[b] = rot32(s[b] ^ s[c], 12)
s[a] = (s[a] + s[b]) & 0xffffffff
Reported by Pylint.
Line: 30
Column: 9
(2, 7, 8, 13),
(3, 4, 9, 14)]
for a, b, c, d in QUARTER_ROUNDS:
s[a] = (s[a] + s[b]) & 0xffffffff
s[d] = rot32(s[d] ^ s[a], 16)
s[c] = (s[c] + s[d]) & 0xffffffff
s[b] = rot32(s[b] ^ s[c], 12)
s[a] = (s[a] + s[b]) & 0xffffffff
Reported by Pylint.
Line: 30
Column: 12
(2, 7, 8, 13),
(3, 4, 9, 14)]
for a, b, c, d in QUARTER_ROUNDS:
s[a] = (s[a] + s[b]) & 0xffffffff
s[d] = rot32(s[d] ^ s[a], 16)
s[c] = (s[c] + s[d]) & 0xffffffff
s[b] = rot32(s[b] ^ s[c], 12)
s[a] = (s[a] + s[b]) & 0xffffffff
Reported by Pylint.
Line: 30
Column: 15
(2, 7, 8, 13),
(3, 4, 9, 14)]
for a, b, c, d in QUARTER_ROUNDS:
s[a] = (s[a] + s[b]) & 0xffffffff
s[d] = rot32(s[d] ^ s[a], 16)
s[c] = (s[c] + s[d]) & 0xffffffff
s[b] = rot32(s[b] ^ s[c], 12)
s[a] = (s[a] + s[b]) & 0xffffffff
Reported by Pylint.
Line: 43
Column: 5
def chacha20_32_to_384(key32):
"""Specialized ChaCha20 implementation with 32-byte key, 0 IV, 384-byte output."""
# See RFC 8439 section 2.3 for chacha20 parameters
CONSTANTS = [0x61707865, 0x3320646e, 0x79622d32, 0x6b206574]
key_bytes = [0]*8
for i in range(8):
key_bytes[i] = int.from_bytes(key32[(4 * i):(4 * (i+1))], 'little')
Reported by Pylint.
Line: 49
Column: 5
for i in range(8):
key_bytes[i] = int.from_bytes(key32[(4 * i):(4 * (i+1))], 'little')
INITIALIZATION_VECTOR = [0] * 4
init = CONSTANTS + key_bytes + INITIALIZATION_VECTOR
out = bytearray()
for counter in range(6):
init[12] = counter
s = init.copy()
Reported by Pylint.
test/functional/rpc_invalid_address_message.py
18 issues
Line: 18
Column: 1
BECH32_INVALID_BECH32 = 'bcrt1p0xlxvlhemja6c4dqv22uapctqupfhlxm9h8z3k2e72q4k9hcz7vqdmchcc'
BECH32_INVALID_BECH32M = 'bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7k35mrzd'
BECH32_INVALID_VERSION = 'bcrt130xlxvlhemja6c4dqv22uapctqupfhlxm9h8z3k2e72q4k9hcz7vqynjegk'
BECH32_INVALID_SIZE = 'bcrt1s0xlxvlhemja6c4dqv22uapctqupfhlxm9h8z3k2e72q4k9hcz7v8n0nx0muaewav25430mtr'
BECH32_INVALID_V0_SIZE = 'bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kqqq5k3my'
BECH32_INVALID_PREFIX = 'bc1pw508d6qejxtdg4y5r3zarvary0c5xw7kw508d6qejxtdg4y5r3zarvary0c5xw7k7grplx'
BASE58_VALID = 'mipcBbFg9gMiCh81Kj8tqqdgoZub1ZJRfn'
BASE58_INVALID_PREFIX = '17VZNX1SN5NtKa8UQFxwQbFeFc3iqRYhem'
Reported by Pylint.
Line: 27
Column: 1
INVALID_ADDRESS = 'asfah14i8fajz0123f'
class InvalidAddressErrorMessageTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def skip_test_if_missing_module(self):
Reported by Pylint.
Line: 35
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def test_validateaddress(self):
node = self.nodes[0]
# Bech32
info = node.validateaddress(BECH32_INVALID_SIZE)
assert not info['isvalid']
Reported by Pylint.
Line: 40
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# Bech32
info = node.validateaddress(BECH32_INVALID_SIZE)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid Bech32 address data size')
info = node.validateaddress(BECH32_INVALID_PREFIX)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid prefix for Bech32 address')
Reported by Bandit.
Line: 44
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(info['error'], 'Invalid Bech32 address data size')
info = node.validateaddress(BECH32_INVALID_PREFIX)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid prefix for Bech32 address')
info = node.validateaddress(BECH32_INVALID_BECH32)
assert not info['isvalid']
assert_equal(info['error'], 'Version 1+ witness address must use Bech32m checksum')
Reported by Bandit.
Line: 48
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(info['error'], 'Invalid prefix for Bech32 address')
info = node.validateaddress(BECH32_INVALID_BECH32)
assert not info['isvalid']
assert_equal(info['error'], 'Version 1+ witness address must use Bech32m checksum')
info = node.validateaddress(BECH32_INVALID_BECH32M)
assert not info['isvalid']
assert_equal(info['error'], 'Version 0 witness address must use Bech32 checksum')
Reported by Bandit.
Line: 52
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(info['error'], 'Version 1+ witness address must use Bech32m checksum')
info = node.validateaddress(BECH32_INVALID_BECH32M)
assert not info['isvalid']
assert_equal(info['error'], 'Version 0 witness address must use Bech32 checksum')
info = node.validateaddress(BECH32_INVALID_V0_SIZE)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid Bech32 v0 address data size')
Reported by Bandit.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(info['error'], 'Version 0 witness address must use Bech32 checksum')
info = node.validateaddress(BECH32_INVALID_V0_SIZE)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid Bech32 v0 address data size')
info = node.validateaddress(BECH32_VALID)
assert info['isvalid']
assert 'error' not in info
Reported by Bandit.
Line: 60
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert_equal(info['error'], 'Invalid Bech32 v0 address data size')
info = node.validateaddress(BECH32_VALID)
assert info['isvalid']
assert 'error' not in info
# Base58
info = node.validateaddress(BASE58_INVALID_PREFIX)
assert not info['isvalid']
Reported by Bandit.
Line: 61
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
info = node.validateaddress(BECH32_VALID)
assert info['isvalid']
assert 'error' not in info
# Base58
info = node.validateaddress(BASE58_INVALID_PREFIX)
assert not info['isvalid']
assert_equal(info['error'], 'Invalid prefix for Base58-encoded address')
Reported by Bandit.
test/functional/p2p_permissions.py
18 issues
Line: 28
Column: 1
)
class P2PPermissionsTests(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
def run_test(self):
Reported by Pylint.
Line: 91
Column: 1
["forcerelay", "noban", "mempool", "bloomfilter", "relay", "download", "addr"])
self.stop_node(1)
self.nodes[1].assert_start_raises_init_error(["-whitelist=oopsie@127.0.0.1"], "Invalid P2P permission", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
Reported by Pylint.
Line: 92
Column: 1
self.stop_node(1)
self.nodes[1].assert_start_raises_init_error(["-whitelist=oopsie@127.0.0.1"], "Invalid P2P permission", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
self.sync_all()
Reported by Pylint.
Line: 93
Column: 1
self.stop_node(1)
self.nodes[1].assert_start_raises_init_error(["-whitelist=oopsie@127.0.0.1"], "Invalid P2P permission", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
self.sync_all()
Reported by Pylint.
Line: 95
Column: 5
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
self.sync_all()
self.log.debug("Create a connection from a forcerelay peer that rebroadcasts raw txs")
# A test framework p2p connection is needed to send the raw transaction directly. If a full node was used, it could only
Reported by Pylint.
Line: 96
Column: 1
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
self.sync_all()
self.log.debug("Create a connection from a forcerelay peer that rebroadcasts raw txs")
# A test framework p2p connection is needed to send the raw transaction directly. If a full node was used, it could only
# rebroadcast via the inv-getdata mechanism. However, even for forcerelay connections, a full node would
Reported by Pylint.
Line: 100
Column: 1
self.sync_all()
self.log.debug("Create a connection from a forcerelay peer that rebroadcasts raw txs")
# A test framework p2p connection is needed to send the raw transaction directly. If a full node was used, it could only
# rebroadcast via the inv-getdata mechanism. However, even for forcerelay connections, a full node would
# currently not request a txid that is already in the mempool.
self.restart_node(1, extra_args=["-whitelist=forcerelay@127.0.0.1"])
p2p_rebroadcast_wallet = self.nodes[1].add_p2p_connection(P2PDataStore())
Reported by Pylint.
Line: 101
Column: 1
self.log.debug("Create a connection from a forcerelay peer that rebroadcasts raw txs")
# A test framework p2p connection is needed to send the raw transaction directly. If a full node was used, it could only
# rebroadcast via the inv-getdata mechanism. However, even for forcerelay connections, a full node would
# currently not request a txid that is already in the mempool.
self.restart_node(1, extra_args=["-whitelist=forcerelay@127.0.0.1"])
p2p_rebroadcast_wallet = self.nodes[1].add_p2p_connection(P2PDataStore())
self.log.debug("Send a tx from the wallet initially")
Reported by Pylint.
Line: 107
Column: 9
p2p_rebroadcast_wallet = self.nodes[1].add_p2p_connection(P2PDataStore())
self.log.debug("Send a tx from the wallet initially")
tx = tx_from_hex(
self.nodes[0].createrawtransaction(
inputs=[{
'txid': block_op_true['tx'][0],
'vout': 0,
}], outputs=[{
Reported by Pylint.
Line: 123
Column: 1
self.log.debug("Wait until tx is in node[1]'s mempool")
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
self.log.debug("Check that node[1] will send the tx to node[0] even though it is already in the mempool")
self.connect_nodes(1, 0)
with self.nodes[1].assert_debug_log(["Force relaying tx {} from peer=0".format(txid)]):
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
self.wait_until(lambda: txid in self.nodes[0].getrawmempool())
Reported by Pylint.
test/functional/test_framework/bdb.py
18 issues
Line: 46
Column: 1
# Deserializes a leaf page into a dict.
# Btree internal pages have the same header, for those, return None.
# For the btree leaf pages, deserialize them and put all the data into a dict
def dump_leaf_page(data):
page_info = {}
page_header = data[0:26]
_, pgno, prev_pgno, next_pgno, entries, hf_offset, level, pg_type = struct.unpack('QIIIHHBB', page_header)
page_info['pgno'] = pgno
page_info['prev_pgno'] = prev_pgno
Reported by Pylint.
Line: 46
Column: 1
# Deserializes a leaf page into a dict.
# Btree internal pages have the same header, for those, return None.
# For the btree leaf pages, deserialize them and put all the data into a dict
def dump_leaf_page(data):
page_info = {}
page_header = data[0:26]
_, pgno, prev_pgno, next_pgno, entries, hf_offset, level, pg_type = struct.unpack('QIIIHHBB', page_header)
page_info['pgno'] = pgno
page_info['prev_pgno'] = prev_pgno
Reported by Pylint.
Line: 49
Column: 1
def dump_leaf_page(data):
page_info = {}
page_header = data[0:26]
_, pgno, prev_pgno, next_pgno, entries, hf_offset, level, pg_type = struct.unpack('QIIIHHBB', page_header)
page_info['pgno'] = pgno
page_info['prev_pgno'] = prev_pgno
page_info['next_pgno'] = next_pgno
page_info['hf_offset'] = hf_offset
page_info['level'] = level
Reported by Pylint.
Line: 60
Column: 1
page_info['entries'] = []
if pg_type == BTREE_INTERNAL:
# Skip internal pages. These are the internal nodes of the btree and don't contain anything relevant to us
return None
assert pg_type == BTREE_LEAF, 'A non-btree leaf page has been encountered while dumping leaves'
for i in range(0, entries):
Reported by Pylint.
Line: 63
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# Skip internal pages. These are the internal nodes of the btree and don't contain anything relevant to us
return None
assert pg_type == BTREE_LEAF, 'A non-btree leaf page has been encountered while dumping leaves'
for i in range(0, entries):
offset = page_info['entry_offsets'][i]
entry = {'offset': offset}
page_data_header = data[offset:offset + 3]
Reported by Bandit.
Line: 79
Column: 1
# Deserializes a btree metadata page into a dict.
# Does a simple sanity check on the magic value, type, and version
def dump_meta_page(page):
# metadata page
# general metadata
metadata = {}
meta_page = page[0:72]
_, pgno, magic, version, pagesize, encrypt_alg, pg_type, metaflags, _, free, last_pgno, nparts, key_count, record_count, flags, uid = struct.unpack('QIIIIBBBBIIIIII20s', meta_page)
Reported by Pylint.
Line: 79
Column: 1
# Deserializes a btree metadata page into a dict.
# Does a simple sanity check on the magic value, type, and version
def dump_meta_page(page):
# metadata page
# general metadata
metadata = {}
meta_page = page[0:72]
_, pgno, magic, version, pagesize, encrypt_alg, pg_type, metaflags, _, free, last_pgno, nparts, key_count, record_count, flags, uid = struct.unpack('QIIIIBBBBIIIIII20s', meta_page)
Reported by Pylint.
Line: 84
Column: 1
# general metadata
metadata = {}
meta_page = page[0:72]
_, pgno, magic, version, pagesize, encrypt_alg, pg_type, metaflags, _, free, last_pgno, nparts, key_count, record_count, flags, uid = struct.unpack('QIIIIBBBBIIIIII20s', meta_page)
metadata['pgno'] = pgno
metadata['magic'] = magic
metadata['version'] = version
metadata['pagesize'] = pagesize
metadata['encrypt_alg'] = encrypt_alg
Reported by Pylint.
Line: 100
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
metadata['flags'] = flags
metadata['uid'] = uid.hex().encode()
assert magic == BTREE_MAGIC, 'bdb magic does not match bdb btree magic'
assert pg_type == BTREE_META, 'Metadata page is not a btree metadata page'
assert version == DB_VERSION, 'Database too new'
# btree metadata
btree_meta_page = page[72:512]
Reported by Bandit.
Line: 101
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
metadata['uid'] = uid.hex().encode()
assert magic == BTREE_MAGIC, 'bdb magic does not match bdb btree magic'
assert pg_type == BTREE_META, 'Metadata page is not a btree metadata page'
assert version == DB_VERSION, 'Database too new'
# btree metadata
btree_meta_page = page[72:512]
_, minkey, re_len, re_pad, root, _, crypto_magic, _, iv, chksum = struct.unpack('IIIII368sI12s16s20s', btree_meta_page)
Reported by Bandit.
src/leveldb/benchmarks/db_bench.cc
18 issues
Line: 295
CWE codes:
665
bool start GUARDED_BY(mu);
SharedState(int total)
: cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {}
};
// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
int tid; // 0..n-1 when running in n threads
Reported by Cppcheck.
Line: 262
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
// Rate is computed on actual elapsed time, not the sum of per-thread
// elapsed times.
double elapsed = (finish_ - start_) * 1e-6;
char rate[100];
snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / elapsed);
extra = rate;
}
AppendWithSpace(&extra, message_);
Reported by FlawFinder.
Line: 369
Column: 21
CWE codes:
362
time_t now = time(nullptr);
fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline
FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
if (cpuinfo != nullptr) {
char line[1000];
int num_cpus = 0;
std::string cpu_type;
std::string cache_size;
Reported by FlawFinder.
Line: 371
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
if (cpuinfo != nullptr) {
char line[1000];
int num_cpus = 0;
std::string cpu_type;
std::string cache_size;
while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
const char* sep = strchr(line, ':');
Reported by FlawFinder.
Line: 650
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
Reported by FlawFinder.
Line: 713
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
void DoWrite(ThreadState* thread, bool seq) {
if (num_ != FLAGS_num) {
char msg[100];
snprintf(msg, sizeof(msg), "(%d ops)", num_);
thread->stats.AddMessage(msg);
}
RandomGenerator gen;
Reported by FlawFinder.
Line: 726
Column: 9
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
char key[100];
snprintf(key, sizeof(key), "%016d", k);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + strlen(key);
thread->stats.FinishedSingleOp();
}
Reported by FlawFinder.
Line: 772
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
std::string value;
int found = 0;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = thread->rand.Next() % FLAGS_num;
snprintf(key, sizeof(key), "%016d", k);
if (db_->Get(options, key, &value).ok()) {
found++;
}
Reported by FlawFinder.
Line: 780
Column: 5
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
}
thread->stats.FinishedSingleOp();
}
char msg[100];
snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
thread->stats.AddMessage(msg);
}
void ReadMissing(ThreadState* thread) {
Reported by FlawFinder.
Line: 789
Column: 7
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
ReadOptions options;
std::string value;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = thread->rand.Next() % FLAGS_num;
snprintf(key, sizeof(key), "%016d.", k);
db_->Get(options, key, &value);
thread->stats.FinishedSingleOp();
}
Reported by FlawFinder.