The following issues were found
test/functional/wallet_implicitsegwit.py
13 issues
Line: 10
Column: 3
import test_framework.address as address
from test_framework.test_framework import BitcoinTestFramework
# TODO: Might be nice to test p2pk here too
address_types = ('legacy', 'bech32', 'p2sh-segwit')
def key_to_address(key, address_type):
if address_type == 'legacy':
return address.key_to_p2pkh(key)
Reported by Pylint.
Line: 13
Column: 1
# TODO: Might be nice to test p2pk here too
address_types = ('legacy', 'bech32', 'p2sh-segwit')
def key_to_address(key, address_type):
if address_type == 'legacy':
return address.key_to_p2pkh(key)
elif address_type == 'p2sh-segwit':
return address.key_to_p2sh_p2wpkh(key)
elif address_type == 'bech32':
Reported by Pylint.
Line: 13
Column: 1
# TODO: Might be nice to test p2pk here too
address_types = ('legacy', 'bech32', 'p2sh-segwit')
def key_to_address(key, address_type):
if address_type == 'legacy':
return address.key_to_p2pkh(key)
elif address_type == 'p2sh-segwit':
return address.key_to_p2sh_p2wpkh(key)
elif address_type == 'bech32':
Reported by Pylint.
Line: 14
Column: 5
address_types = ('legacy', 'bech32', 'p2sh-segwit')
def key_to_address(key, address_type):
if address_type == 'legacy':
return address.key_to_p2pkh(key)
elif address_type == 'p2sh-segwit':
return address.key_to_p2sh_p2wpkh(key)
elif address_type == 'bech32':
return address.key_to_p2wpkh(key)
Reported by Pylint.
Line: 21
Column: 1
elif address_type == 'bech32':
return address.key_to_p2wpkh(key)
def send_a_to_b(receive_node, send_node):
keys = {}
for a in address_types:
a_address = receive_node.getnewaddress(address_type=a)
pubkey = receive_node.getaddressinfo(a_address)['pubkey']
keys[a] = pubkey
Reported by Pylint.
Line: 23
Column: 9
def send_a_to_b(receive_node, send_node):
keys = {}
for a in address_types:
a_address = receive_node.getnewaddress(address_type=a)
pubkey = receive_node.getaddressinfo(a_address)['pubkey']
keys[a] = pubkey
for b in address_types:
b_address = key_to_address(pubkey, b)
Reported by Pylint.
Line: 27
Column: 13
a_address = receive_node.getnewaddress(address_type=a)
pubkey = receive_node.getaddressinfo(a_address)['pubkey']
keys[a] = pubkey
for b in address_types:
b_address = key_to_address(pubkey, b)
send_node.sendtoaddress(address=b_address, amount=1)
return keys
def check_implicit_transactions(implicit_keys, implicit_node):
Reported by Pylint.
Line: 32
Column: 1
send_node.sendtoaddress(address=b_address, amount=1)
return keys
def check_implicit_transactions(implicit_keys, implicit_node):
# The implicit segwit node allows conversion all possible ways
txs = implicit_node.listtransactions(None, 99999)
for a in address_types:
pubkey = implicit_keys[a]
for b in address_types:
Reported by Pylint.
Line: 35
Column: 9
def check_implicit_transactions(implicit_keys, implicit_node):
# The implicit segwit node allows conversion all possible ways
txs = implicit_node.listtransactions(None, 99999)
for a in address_types:
pubkey = implicit_keys[a]
for b in address_types:
b_address = key_to_address(pubkey, b)
assert(('receive', b_address) in tuple((tx['category'], tx['address']) for tx in txs))
Reported by Pylint.
Line: 37
Column: 13
txs = implicit_node.listtransactions(None, 99999)
for a in address_types:
pubkey = implicit_keys[a]
for b in address_types:
b_address = key_to_address(pubkey, b)
assert(('receive', b_address) in tuple((tx['category'], tx['address']) for tx in txs))
class ImplicitSegwitTest(BitcoinTestFramework):
def set_test_params(self):
Reported by Pylint.
src/secp256k1/src/modules/ecdh/tests_impl.h
13 issues
Line: 22
Column: 5
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
(void)data;
/* Save x and y as uncompressed public key */
output[0] = 0x04;
memcpy(output + 1, x, 32);
memcpy(output + 33, y, 32);
return 1;
}
void test_ecdh_api(void) {
Reported by FlawFinder.
Line: 23
Column: 5
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
/* Save x and y as uncompressed public key */
output[0] = 0x04;
memcpy(output + 1, x, 32);
memcpy(output + 33, y, 32);
return 1;
}
void test_ecdh_api(void) {
/* Setup context that just counts errors */
Reported by FlawFinder.
Line: 31
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
/* Setup context that just counts errors */
secp256k1_context *tctx = secp256k1_context_create(SECP256K1_CONTEXT_SIGN);
secp256k1_pubkey point;
unsigned char res[32];
unsigned char s_one[32] = { 0 };
int32_t ecount = 0;
s_one[31] = 1;
secp256k1_context_set_error_callback(tctx, counting_illegal_callback_fn, &ecount);
Reported by FlawFinder.
Line: 32
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
secp256k1_context *tctx = secp256k1_context_create(SECP256K1_CONTEXT_SIGN);
secp256k1_pubkey point;
unsigned char res[32];
unsigned char s_one[32] = { 0 };
int32_t ecount = 0;
s_one[31] = 1;
secp256k1_context_set_error_callback(tctx, counting_illegal_callback_fn, &ecount);
secp256k1_context_set_illegal_callback(tctx, counting_illegal_callback_fn, &ecount);
Reported by FlawFinder.
Line: 57
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
}
void test_ecdh_generator_basepoint(void) {
unsigned char s_one[32] = { 0 };
secp256k1_pubkey point[2];
int i;
s_one[31] = 1;
/* Check against pubkey creation when the basepoint is the generator */
Reported by FlawFinder.
Line: 65
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
/* Check against pubkey creation when the basepoint is the generator */
for (i = 0; i < 100; ++i) {
secp256k1_sha256 sha;
unsigned char s_b32[32];
unsigned char output_ecdh[65];
unsigned char output_ser[32];
unsigned char point_ser[65];
size_t point_ser_len = sizeof(point_ser);
secp256k1_scalar s;
Reported by FlawFinder.
Line: 66
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
for (i = 0; i < 100; ++i) {
secp256k1_sha256 sha;
unsigned char s_b32[32];
unsigned char output_ecdh[65];
unsigned char output_ser[32];
unsigned char point_ser[65];
size_t point_ser_len = sizeof(point_ser);
secp256k1_scalar s;
Reported by FlawFinder.
Line: 67
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
secp256k1_sha256 sha;
unsigned char s_b32[32];
unsigned char output_ecdh[65];
unsigned char output_ser[32];
unsigned char point_ser[65];
size_t point_ser_len = sizeof(point_ser);
secp256k1_scalar s;
random_scalar_order(&s);
Reported by FlawFinder.
Line: 68
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
unsigned char s_b32[32];
unsigned char output_ecdh[65];
unsigned char output_ser[32];
unsigned char point_ser[65];
size_t point_ser_len = sizeof(point_ser);
secp256k1_scalar s;
random_scalar_order(&s);
secp256k1_scalar_get_b32(s_b32, &s);
Reported by FlawFinder.
Line: 98
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
}
void test_bad_scalar(void) {
unsigned char s_zero[32] = { 0 };
unsigned char s_overflow[32] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfe,
0xba, 0xae, 0xdc, 0xe6, 0xaf, 0x48, 0xa0, 0x3b,
0xbf, 0xd2, 0x5e, 0x8c, 0xd0, 0x36, 0x41, 0x41
Reported by FlawFinder.
test/functional/p2p_ping.py
13 issues
Line: 50
Column: 9
self.nodes[0].setmocktime(self.mock_time)
def run_test(self):
self.mock_time = int(time.time())
self.mock_forward(0)
self.log.info('Check that ping is sent after connection is established')
no_pong_node = self.nodes[0].add_p2p_connection(NodeNoPong())
self.mock_forward(3)
Reported by Pylint.
Line: 18
Column: 1
PING_INTERVAL = 2 * 60
class msg_pong_corrupt(msg_pong):
def serialize(self):
return b""
class NodePongAdd1(P2PInterface):
Reported by Pylint.
Line: 18
Column: 1
PING_INTERVAL = 2 * 60
class msg_pong_corrupt(msg_pong):
def serialize(self):
return b""
class NodePongAdd1(P2PInterface):
Reported by Pylint.
Line: 23
Column: 1
return b""
class NodePongAdd1(P2PInterface):
def on_ping(self, message):
self.send_message(msg_pong(message.nonce + 1))
class NodeNoPong(P2PInterface):
Reported by Pylint.
Line: 28
Column: 1
self.send_message(msg_pong(message.nonce + 1))
class NodeNoPong(P2PInterface):
def on_ping(self, message):
pass
class PingPongTest(BitcoinTestFramework):
Reported by Pylint.
Line: 33
Column: 1
pass
class PingPongTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [['-peertimeout=3']]
Reported by Pylint.
Line: 39
Column: 5
self.num_nodes = 1
self.extra_args = [['-peertimeout=3']]
def check_peer_info(self, *, pingtime, minping, pingwait):
stats = self.nodes[0].getpeerinfo()[0]
assert_equal(stats.pop('pingtime', None), pingtime)
assert_equal(stats.pop('minping', None), minping)
assert_equal(stats.pop('pingwait', None), pingwait)
Reported by Pylint.
Line: 45
Column: 5
assert_equal(stats.pop('minping', None), minping)
assert_equal(stats.pop('pingwait', None), pingwait)
def mock_forward(self, delta):
self.mock_time += delta
self.nodes[0].setmocktime(self.mock_time)
def run_test(self):
self.mock_time = int(time.time())
Reported by Pylint.
Line: 49
Column: 5
self.mock_time += delta
self.nodes[0].setmocktime(self.mock_time)
def run_test(self):
self.mock_time = int(time.time())
self.mock_forward(0)
self.log.info('Check that ping is sent after connection is established')
no_pong_node = self.nodes[0].add_p2p_connection(NodeNoPong())
Reported by Pylint.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.log.info('Check that ping is sent after connection is established')
no_pong_node = self.nodes[0].add_p2p_connection(NodeNoPong())
self.mock_forward(3)
assert no_pong_node.last_message.pop('ping').nonce != 0
self.check_peer_info(pingtime=None, minping=None, pingwait=3)
self.log.info('Reply without nonce cancels ping')
with self.nodes[0].assert_debug_log(['pong peer=0: Short payload']):
no_pong_node.send_and_ping(msg_pong_corrupt())
Reported by Bandit.
contrib/signet/getcoins.py
13 issues
Line: 33
Column: 1
data = {'address': args.addr, 'password': args.password}
try:
res = requests.post(args.faucet, data=data)
except:
print('Unexpected error when contacting faucet:', sys.exc_info()[0])
exit()
print(res.text)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
import argparse
import subprocess
import requests
import sys
Reported by Pylint.
Line: 7
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
import argparse
import subprocess
import requests
import sys
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
Reported by Bandit.
Line: 9
Column: 1
import argparse
import subprocess
import requests
import sys
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
Reported by Pylint.
Line: 11
Column: 1
import requests
import sys
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
parser.add_argument('-p', '--password', dest='password', default='', help='Faucet password, if any')
parser.add_argument('bitcoin_cli_args', nargs='*', help='Arguments to pass on to bitcoin-cli (default: -signet)')
Reported by Pylint.
Line: 12
Column: 1
import sys
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
parser.add_argument('-p', '--password', dest='password', default='', help='Faucet password, if any')
parser.add_argument('bitcoin_cli_args', nargs='*', help='Arguments to pass on to bitcoin-cli (default: -signet)')
Reported by Pylint.
Line: 13
Column: 1
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
parser.add_argument('-p', '--password', dest='password', default='', help='Faucet password, if any')
parser.add_argument('bitcoin_cli_args', nargs='*', help='Arguments to pass on to bitcoin-cli (default: -signet)')
args = parser.parse_args()
Reported by Pylint.
Line: 14
Column: 1
parser = argparse.ArgumentParser(description='Script to get coins from a faucet.', epilog='You may need to start with double-dash (--) when providing bitcoin-cli arguments.')
parser.add_argument('-c', '--cmd', dest='cmd', default='bitcoin-cli', help='bitcoin-cli command to use')
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
parser.add_argument('-p', '--password', dest='password', default='', help='Faucet password, if any')
parser.add_argument('bitcoin_cli_args', nargs='*', help='Arguments to pass on to bitcoin-cli (default: -signet)')
args = parser.parse_args()
Reported by Pylint.
Line: 16
Column: 1
parser.add_argument('-f', '--faucet', dest='faucet', default='https://signetfaucet.com/claim', help='URL of the faucet')
parser.add_argument('-a', '--addr', dest='addr', default='', help='Bitcoin address to which the faucet should send')
parser.add_argument('-p', '--password', dest='password', default='', help='Faucet password, if any')
parser.add_argument('bitcoin_cli_args', nargs='*', help='Arguments to pass on to bitcoin-cli (default: -signet)')
args = parser.parse_args()
if args.addr == '':
if args.bitcoin_cli_args == []:
Reported by Pylint.
Line: 25
Column: 1
args.bitcoin_cli_args = ['-signet']
# get address for receiving coins
try:
args.addr = subprocess.check_output([args.cmd] + args.bitcoin_cli_args + ['getnewaddress', 'faucet', 'bech32']).strip()
except FileNotFoundError:
print('The binary', args.cmd, 'could not be found.')
exit()
data = {'address': args.addr, 'password': args.password}
Reported by Pylint.
test/functional/mining_getblocktemplate_longpoll.py
12 issues
Line: 17
Column: 1
from test_framework.wallet import MiniWallet
class LongpollThread(threading.Thread):
def __init__(self, node):
threading.Thread.__init__(self)
# query current longpollid
template = node.getblocktemplate({'rules': ['segwit']})
self.longpollid = template['longpollid']
Reported by Pylint.
Line: 30
Column: 1
def run(self):
self.node.getblocktemplate({'longpollid': self.longpollid, 'rules': ['segwit']})
class GetBlockTemplateLPTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.supports_cli = False
def run_test(self):
Reported by Pylint.
Line: 37
Column: 1
def run_test(self):
self.log.info("Warning: this test will take about 70 seconds in the best case. Be patient.")
self.log.info("Test that longpollid doesn't change between successive getblocktemplate() invocations if nothing else happens")
self.nodes[0].generate(10)
template = self.nodes[0].getblocktemplate({'rules': ['segwit']})
longpollid = template['longpollid']
template2 = self.nodes[0].getblocktemplate({'rules': ['segwit']})
assert template2['longpollid'] == longpollid
Reported by Pylint.
Line: 42
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
template = self.nodes[0].getblocktemplate({'rules': ['segwit']})
longpollid = template['longpollid']
template2 = self.nodes[0].getblocktemplate({'rules': ['segwit']})
assert template2['longpollid'] == longpollid
self.log.info("Test that longpoll waits if we do nothing")
thr = LongpollThread(self.nodes[0])
thr.start()
# check that thread still lives
Reported by Bandit.
Line: 49
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
thr.start()
# check that thread still lives
thr.join(5) # wait 5 seconds or until thread exits
assert thr.is_alive()
miniwallets = [ MiniWallet(node) for node in self.nodes ]
self.log.info("Test that longpoll will terminate if another node generates a block")
miniwallets[1].generate(1) # generate a block on another node
# check that thread will exit now that new transaction entered mempool
Reported by Bandit.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
miniwallets[1].generate(1) # generate a block on another node
# check that thread will exit now that new transaction entered mempool
thr.join(5) # wait 5 seconds or until thread exits
assert not thr.is_alive()
self.log.info("Test that longpoll will terminate if we generate a block ourselves")
thr = LongpollThread(self.nodes[0])
thr.start()
miniwallets[0].generate(1) # generate a block on own node
Reported by Bandit.
Line: 63
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
thr.start()
miniwallets[0].generate(1) # generate a block on own node
thr.join(5) # wait 5 seconds or until thread exits
assert not thr.is_alive()
# Add enough mature utxos to the wallets, so that all txs spend confirmed coins
self.nodes[0].generate(COINBASE_MATURITY)
self.sync_blocks()
Reported by Bandit.
Line: 69
Column: 1
self.nodes[0].generate(COINBASE_MATURITY)
self.sync_blocks()
self.log.info("Test that introducing a new transaction into the mempool will terminate the longpoll")
thr = LongpollThread(self.nodes[0])
thr.start()
# generate a random transaction and submit it
min_relay_fee = self.nodes[0].getnetworkinfo()["relayfee"]
fee_rate = min_relay_fee + Decimal('0.00000010') * random.randint(0,20)
Reported by Pylint.
Line: 74
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
thr.start()
# generate a random transaction and submit it
min_relay_fee = self.nodes[0].getnetworkinfo()["relayfee"]
fee_rate = min_relay_fee + Decimal('0.00000010') * random.randint(0,20)
miniwallets[0].send_self_transfer(from_node=random.choice(self.nodes),
fee_rate=fee_rate)
# after one minute, every 10 seconds the mempool is probed, so in 80 seconds it should have returned
thr.join(60 + 20)
assert not thr.is_alive()
Reported by Bandit.
Line: 75
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
# generate a random transaction and submit it
min_relay_fee = self.nodes[0].getnetworkinfo()["relayfee"]
fee_rate = min_relay_fee + Decimal('0.00000010') * random.randint(0,20)
miniwallets[0].send_self_transfer(from_node=random.choice(self.nodes),
fee_rate=fee_rate)
# after one minute, every 10 seconds the mempool is probed, so in 80 seconds it should have returned
thr.join(60 + 20)
assert not thr.is_alive()
Reported by Bandit.
src/crypto/aes.h
12 issues
Line: 24
Column: 43
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
AES256_ctx ctx;
public:
explicit AES256Encrypt(const unsigned char key[32]);
~AES256Encrypt();
void Encrypt(unsigned char ciphertext[16], const unsigned char plaintext[16]) const;
};
/** A decryption class for AES-256. */
Reported by FlawFinder.
Line: 26
Column: 27
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
public:
explicit AES256Encrypt(const unsigned char key[32]);
~AES256Encrypt();
void Encrypt(unsigned char ciphertext[16], const unsigned char plaintext[16]) const;
};
/** A decryption class for AES-256. */
class AES256Decrypt
{
Reported by FlawFinder.
Line: 26
Column: 63
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
public:
explicit AES256Encrypt(const unsigned char key[32]);
~AES256Encrypt();
void Encrypt(unsigned char ciphertext[16], const unsigned char plaintext[16]) const;
};
/** A decryption class for AES-256. */
class AES256Decrypt
{
Reported by FlawFinder.
Line: 36
Column: 43
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
AES256_ctx ctx;
public:
explicit AES256Decrypt(const unsigned char key[32]);
~AES256Decrypt();
void Decrypt(unsigned char plaintext[16], const unsigned char ciphertext[16]) const;
};
class AES256CBCEncrypt
Reported by FlawFinder.
Line: 38
Column: 27
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
public:
explicit AES256Decrypt(const unsigned char key[32]);
~AES256Decrypt();
void Decrypt(unsigned char plaintext[16], const unsigned char ciphertext[16]) const;
};
class AES256CBCEncrypt
{
public:
Reported by FlawFinder.
Line: 38
Column: 62
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
public:
explicit AES256Decrypt(const unsigned char key[32]);
~AES256Decrypt();
void Decrypt(unsigned char plaintext[16], const unsigned char ciphertext[16]) const;
};
class AES256CBCEncrypt
{
public:
Reported by FlawFinder.
Line: 44
Column: 78
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
class AES256CBCEncrypt
{
public:
AES256CBCEncrypt(const unsigned char key[AES256_KEYSIZE], const unsigned char ivIn[AES_BLOCKSIZE], bool padIn);
~AES256CBCEncrypt();
int Encrypt(const unsigned char* data, int size, unsigned char* out) const;
private:
const AES256Encrypt enc;
Reported by FlawFinder.
Line: 44
Column: 37
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
class AES256CBCEncrypt
{
public:
AES256CBCEncrypt(const unsigned char key[AES256_KEYSIZE], const unsigned char ivIn[AES_BLOCKSIZE], bool padIn);
~AES256CBCEncrypt();
int Encrypt(const unsigned char* data, int size, unsigned char* out) const;
private:
const AES256Encrypt enc;
Reported by FlawFinder.
Line: 51
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
private:
const AES256Encrypt enc;
const bool pad;
unsigned char iv[AES_BLOCKSIZE];
};
class AES256CBCDecrypt
{
public:
Reported by FlawFinder.
Line: 57
Column: 78
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
class AES256CBCDecrypt
{
public:
AES256CBCDecrypt(const unsigned char key[AES256_KEYSIZE], const unsigned char ivIn[AES_BLOCKSIZE], bool padIn);
~AES256CBCDecrypt();
int Decrypt(const unsigned char* data, int size, unsigned char* out) const;
private:
const AES256Decrypt dec;
Reported by FlawFinder.
test/functional/p2p_node_network_limited.py
12 issues
Line: 89
Column: 9
self.connect_nodes(0, 2)
try:
self.sync_blocks([self.nodes[0], self.nodes[2]], timeout=5)
except:
pass
# node2 must remain at height 0
assert_equal(self.nodes[2].getblockheader(self.nodes[2].getbestblockhash())['height'], 0)
# now connect also to node 1 (non pruned)
Reported by Pylint.
Line: 11
Column: 1
and that it responds to getdata requests for blocks correctly:
- send a block within 288 + 2 of the tip
- disconnect peers who request blocks older than that."""
from test_framework.messages import CInv, MSG_BLOCK, msg_getdata, msg_verack, NODE_NETWORK_LIMITED, NODE_WITNESS
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
)
Reported by Pylint.
Line: 19
Column: 1
)
class P2PIgnoreInv(P2PInterface):
firstAddrnServices = 0
def on_inv(self, message):
# The node will send us invs for other blocks. Ignore them.
pass
def on_addr(self, message):
Reported by Pylint.
Line: 25
Column: 9
# The node will send us invs for other blocks. Ignore them.
pass
def on_addr(self, message):
self.firstAddrnServices = message.addrs[0].nServices
def wait_for_addr(self, timeout=5):
test_function = lambda: self.last_message.get("addr")
self.wait_until(test_function, timeout=timeout)
def send_getdata_for_block(self, blockhash):
getdata_request = msg_getdata()
Reported by Pylint.
Line: 26
Column: 5
pass
def on_addr(self, message):
self.firstAddrnServices = message.addrs[0].nServices
def wait_for_addr(self, timeout=5):
test_function = lambda: self.last_message.get("addr")
self.wait_until(test_function, timeout=timeout)
def send_getdata_for_block(self, blockhash):
getdata_request = msg_getdata()
getdata_request.inv.append(CInv(MSG_BLOCK, int(blockhash, 16)))
Reported by Pylint.
Line: 29
Column: 5
def wait_for_addr(self, timeout=5):
test_function = lambda: self.last_message.get("addr")
self.wait_until(test_function, timeout=timeout)
def send_getdata_for_block(self, blockhash):
getdata_request = msg_getdata()
getdata_request.inv.append(CInv(MSG_BLOCK, int(blockhash, 16)))
self.send_message(getdata_request)
class NodeNetworkLimitedTest(BitcoinTestFramework):
Reported by Pylint.
Line: 34
Column: 1
getdata_request.inv.append(CInv(MSG_BLOCK, int(blockhash, 16)))
self.send_message(getdata_request)
class NodeNetworkLimitedTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 3
self.extra_args = [['-prune=550', '-addrmantest'], [], []]
Reported by Pylint.
Line: 40
Column: 5
self.num_nodes = 3
self.extra_args = [['-prune=550', '-addrmantest'], [], []]
def disconnect_all(self):
self.disconnect_nodes(0, 1)
self.disconnect_nodes(0, 2)
self.disconnect_nodes(1, 2)
def setup_network(self):
Reported by Pylint.
Line: 62
Column: 1
self.log.info("Mine enough blocks to reach the NODE_NETWORK_LIMITED range.")
self.connect_nodes(0, 1)
blocks = self.nodes[1].generatetoaddress(292, self.nodes[1].get_deterministic_priv_key().address)
self.sync_blocks([self.nodes[0], self.nodes[1]])
self.log.info("Make sure we can max retrieve block at tip-288.")
node.send_getdata_for_block(blocks[1]) # last block in valid range
node.wait_for_block(int(blocks[1], 16), timeout=3)
Reported by Pylint.
Line: 85
Column: 1
self.nodes[0].disconnect_p2ps()
# connect unsynced node 2 with pruned NODE_NETWORK_LIMITED peer
# because node 2 is in IBD and node 0 is a NODE_NETWORK_LIMITED peer, sync must not be possible
self.connect_nodes(0, 2)
try:
self.sync_blocks([self.nodes[0], self.nodes[2]], timeout=5)
except:
pass
Reported by Pylint.
test/functional/mempool_expiry.py
12 issues
Line: 36
Column: 9
"""Tests that a transaction expires after the expiry timeout and its
children are removed as well."""
node = self.nodes[0]
self.wallet = MiniWallet(node)
# Add enough mature utxos to the wallet so that all txs spend confirmed coins.
self.wallet.generate(4)
node.generate(COINBASE_MATURITY)
Reported by Pylint.
Line: 61
Column: 23
node.setmocktime(half_expiry_time)
child_txid = self.wallet.send_self_transfer(from_node=node, utxo_to_spend=parent_utxo)['txid']
assert_equal(parent_txid, node.getmempoolentry(child_txid)['depends'][0])
self.log.info('Broadcast child transaction after {} hours.'.format(
timedelta(seconds=(half_expiry_time-entry_time))))
# Broadcast another (independent) transaction.
independent_txid = self.wallet.send_self_transfer(from_node=node, utxo_to_spend=independent_utxo)['txid']
Reported by Pylint.
Line: 74
Column: 23
# Broadcast a transaction as the expiry of transactions in the mempool is only checked
# when a new transaction is added to the mempool.
self.wallet.send_self_transfer(from_node=node, utxo_to_spend=trigger_utxo1)
self.log.info('Test parent tx not expired after {} hours.'.format(
timedelta(seconds=(nearly_expiry_time-entry_time))))
assert_equal(entry_time, node.getmempoolentry(parent_txid)['time'])
# Transaction should be evicted from the mempool after the expiry time
# has passed.
Reported by Pylint.
Line: 84
Column: 23
node.setmocktime(expiry_time)
# Again, broadcast a transaction so the expiry of transactions in the mempool is checked.
self.wallet.send_self_transfer(from_node=node, utxo_to_spend=trigger_utxo2)
self.log.info('Test parent tx expiry after {} hours.'.format(
timedelta(seconds=(expiry_time-entry_time))))
assert_raises_rpc_error(-5, 'Transaction not in mempool',
node.getmempoolentry, parent_txid)
# The child transaction should be removed from the mempool as well.
Reported by Pylint.
Line: 95
Column: 23
node.getmempoolentry, child_txid)
# Check that the independent tx is still in the mempool.
self.log.info('Test the independent tx not expired after {} hours.'.format(
timedelta(seconds=(expiry_time-half_expiry_time))))
assert_equal(half_expiry_time, node.getmempoolentry(independent_txid)['time'])
def run_test(self):
self.log.info('Test default mempool expiry timeout of %d hours.' %
Reported by Pylint.
Line: 100
Column: 9
assert_equal(half_expiry_time, node.getmempoolentry(independent_txid)['time'])
def run_test(self):
self.log.info('Test default mempool expiry timeout of %d hours.' %
DEFAULT_MEMPOOL_EXPIRY)
self.test_transaction_expiry(DEFAULT_MEMPOOL_EXPIRY)
self.log.info('Test custom mempool expiry timeout of %d hours.' %
CUSTOM_MEMPOOL_EXPIRY)
Reported by Pylint.
Line: 104
Column: 9
DEFAULT_MEMPOOL_EXPIRY)
self.test_transaction_expiry(DEFAULT_MEMPOOL_EXPIRY)
self.log.info('Test custom mempool expiry timeout of %d hours.' %
CUSTOM_MEMPOOL_EXPIRY)
self.restart_node(0, ['-mempoolexpiry=%d' % CUSTOM_MEMPOOL_EXPIRY])
self.test_transaction_expiry(CUSTOM_MEMPOOL_EXPIRY)
Reported by Pylint.
Line: 27
Column: 1
CUSTOM_MEMPOOL_EXPIRY = 10 # hours
class MempoolExpiryTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
def test_transaction_expiry(self, timeout):
Reported by Pylint.
Line: 47
Column: 1
parent_utxo = self.wallet.get_utxo(txid=parent_txid)
independent_utxo = self.wallet.get_utxo()
# Ensure the transactions we send to trigger the mempool check spend utxos that are independent of
# the transactions being tested for expiration.
trigger_utxo1 = self.wallet.get_utxo()
trigger_utxo2 = self.wallet.get_utxo()
# Set the mocktime to the arrival time of the parent transaction.
Reported by Pylint.
Line: 56
Column: 1
entry_time = node.getmempoolentry(parent_txid)['time']
node.setmocktime(entry_time)
# Let half of the timeout elapse and broadcast the child transaction spending the parent transaction.
half_expiry_time = entry_time + int(60 * 60 * timeout/2)
node.setmocktime(half_expiry_time)
child_txid = self.wallet.send_self_transfer(from_node=node, utxo_to_spend=parent_utxo)['txid']
assert_equal(parent_txid, node.getmempoolentry(child_txid)['depends'][0])
self.log.info('Broadcast child transaction after {} hours.'.format(
Reported by Pylint.
test/functional/mempool_limit.py
12 issues
Line: 10
Column: 1
from decimal import Decimal
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_greater_than, assert_raises_rpc_error, create_confirmed_utxos, create_lots_of_big_transactions, gen_return_txouts
class MempoolLimitTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
Reported by Pylint.
Line: 12
Column: 1
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_greater_than, assert_raises_rpc_error, create_confirmed_utxos, create_lots_of_big_transactions, gen_return_txouts
class MempoolLimitTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [[
"-acceptnonstdtxn=1",
Reported by Pylint.
Line: 41
Column: 9
us0 = utxos.pop()
inputs = [{ "txid" : us0["txid"], "vout" : us0["vout"]}]
outputs = {self.nodes[0].getnewaddress() : 0.0001}
tx = self.nodes[0].createrawtransaction(inputs, outputs)
self.nodes[0].settxfee(relayfee) # specifically fund this tx with low fee
txF = self.nodes[0].fundrawtransaction(tx)
self.nodes[0].settxfee(0) # return to automatic fee selection
txFS = self.nodes[0].signrawtransactionwithwallet(txF['hex'])
txid = self.nodes[0].sendrawtransaction(txFS['hex'])
Reported by Pylint.
Line: 43
Column: 9
outputs = {self.nodes[0].getnewaddress() : 0.0001}
tx = self.nodes[0].createrawtransaction(inputs, outputs)
self.nodes[0].settxfee(relayfee) # specifically fund this tx with low fee
txF = self.nodes[0].fundrawtransaction(tx)
self.nodes[0].settxfee(0) # return to automatic fee selection
txFS = self.nodes[0].signrawtransactionwithwallet(txF['hex'])
txid = self.nodes[0].sendrawtransaction(txFS['hex'])
relayfee = self.nodes[0].getnetworkinfo()['relayfee']
Reported by Pylint.
Line: 45
Column: 9
self.nodes[0].settxfee(relayfee) # specifically fund this tx with low fee
txF = self.nodes[0].fundrawtransaction(tx)
self.nodes[0].settxfee(0) # return to automatic fee selection
txFS = self.nodes[0].signrawtransactionwithwallet(txF['hex'])
txid = self.nodes[0].sendrawtransaction(txFS['hex'])
relayfee = self.nodes[0].getnetworkinfo()['relayfee']
base_fee = relayfee*100
for i in range (3):
Reported by Pylint.
Line: 52
Column: 1
base_fee = relayfee*100
for i in range (3):
txids.append([])
txids[i] = create_lots_of_big_transactions(self.nodes[0], txouts, utxos[30*i:30*i+30], 30, (i+1)*base_fee)
self.log.info('The tx should be evicted by now')
assert txid not in self.nodes[0].getrawmempool()
txdata = self.nodes[0].gettransaction(txid)
assert txdata['confirmations'] == 0 #confirmation should still be 0
Reported by Pylint.
Line: 55
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
txids[i] = create_lots_of_big_transactions(self.nodes[0], txouts, utxos[30*i:30*i+30], 30, (i+1)*base_fee)
self.log.info('The tx should be evicted by now')
assert txid not in self.nodes[0].getrawmempool()
txdata = self.nodes[0].gettransaction(txid)
assert txdata['confirmations'] == 0 #confirmation should still be 0
self.log.info('Check that mempoolminfee is larger than minrelytxfee')
assert_equal(self.nodes[0].getmempoolinfo()['minrelaytxfee'], Decimal('0.00001000'))
Reported by Bandit.
Line: 57
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.log.info('The tx should be evicted by now')
assert txid not in self.nodes[0].getrawmempool()
txdata = self.nodes[0].gettransaction(txid)
assert txdata['confirmations'] == 0 #confirmation should still be 0
self.log.info('Check that mempoolminfee is larger than minrelytxfee')
assert_equal(self.nodes[0].getmempoolinfo()['minrelaytxfee'], Decimal('0.00001000'))
assert_greater_than(self.nodes[0].getmempoolinfo()['mempoolminfee'], Decimal('0.00001000'))
Reported by Bandit.
Line: 67
Column: 9
us0 = utxos.pop()
inputs = [{ "txid" : us0["txid"], "vout" : us0["vout"]}]
outputs = {self.nodes[0].getnewaddress() : 0.0001}
tx = self.nodes[0].createrawtransaction(inputs, outputs)
# specifically fund this tx with a fee < mempoolminfee, >= than minrelaytxfee
txF = self.nodes[0].fundrawtransaction(tx, {'feeRate': relayfee})
txFS = self.nodes[0].signrawtransactionwithwallet(txF['hex'])
assert_raises_rpc_error(-26, "mempool min fee not met", self.nodes[0].sendrawtransaction, txFS['hex'])
Reported by Pylint.
Line: 69
Column: 9
outputs = {self.nodes[0].getnewaddress() : 0.0001}
tx = self.nodes[0].createrawtransaction(inputs, outputs)
# specifically fund this tx with a fee < mempoolminfee, >= than minrelaytxfee
txF = self.nodes[0].fundrawtransaction(tx, {'feeRate': relayfee})
txFS = self.nodes[0].signrawtransactionwithwallet(txF['hex'])
assert_raises_rpc_error(-26, "mempool min fee not met", self.nodes[0].sendrawtransaction, txFS['hex'])
if __name__ == '__main__':
MempoolLimitTest().main()
Reported by Pylint.
src/random.cpp
12 issues
Line: 231
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
inner_hasher.Write(seed, sizeof(seed));
// Hash loop
unsigned char buffer[64];
int64_t stop = GetTimeMicros() + microseconds;
do {
for (int i = 0; i < 1000; ++i) {
inner_hasher.Finalize(buffer);
inner_hasher.Reset();
Reported by FlawFinder.
Line: 258
Column: 13
CWE codes:
362
*/
static void GetDevURandom(unsigned char *ent32)
{
int f = open("/dev/urandom", O_RDONLY);
if (f == -1) {
RandFailure();
}
int have = 0;
do {
Reported by FlawFinder.
Line: 362
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
* observe the RNG's state, fresh entropy is always mixed when
* GetStrongRandBytes is called.
*/
unsigned char m_state[32] GUARDED_BY(m_mutex) = {0};
uint64_t m_counter GUARDED_BY(m_mutex) = 0;
bool m_strongly_seeded GUARDED_BY(m_mutex) = false;
Mutex m_events_mutex;
CSHA256 m_events_hasher GUARDED_BY(m_events_mutex);
Reported by FlawFinder.
Line: 399
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
// since we want it to be fast as network peers may be able to trigger it repeatedly.
LOCK(m_events_mutex);
unsigned char events_hash[32];
m_events_hasher.Finalize(events_hash);
hasher.Write(events_hash, 32);
// Re-initialize the hasher with the finalized state to use later.
m_events_hasher.Reset();
Reported by FlawFinder.
Line: 415
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
bool MixExtract(unsigned char* out, size_t num, CSHA512&& hasher, bool strong_seed) noexcept
{
assert(num <= 32);
unsigned char buf[64];
static_assert(sizeof(buf) == CSHA512::OUTPUT_SIZE, "Buffer needs to have hasher's output size");
bool ret;
{
LOCK(m_mutex);
ret = (m_strongly_seeded |= strong_seed);
Reported by FlawFinder.
Line: 429
Column: 13
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
// Finalize the hasher
hasher.Finalize(buf);
// Store the last 32 bytes of the hash output as new RNG state.
memcpy(m_state, buf + 32, 32);
}
// If desired, copy (up to) the first 32 bytes of the hash output as output.
if (num) {
assert(out != nullptr);
memcpy(out, buf, num);
Reported by FlawFinder.
Line: 434
Column: 13
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
// If desired, copy (up to) the first 32 bytes of the hash output as output.
if (num) {
assert(out != nullptr);
memcpy(out, buf, num);
}
// Best effort cleanup of internal state
hasher.Reset();
memory_cleanse(buf, 64);
return ret;
Reported by FlawFinder.
Line: 465
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
static void SeedFast(CSHA512& hasher) noexcept
{
unsigned char buffer[32];
// Stack pointer to indirectly commit to thread/callstack
const unsigned char* ptr = buffer;
hasher.Write((const unsigned char*)&ptr, sizeof(ptr));
Reported by FlawFinder.
Line: 480
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
static void SeedSlow(CSHA512& hasher, RNGState& rng) noexcept
{
unsigned char buffer[32];
// Everything that the 'fast' seeder includes
SeedFast(hasher);
// OS randomness
Reported by FlawFinder.
Line: 503
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
static void SeedStrengthen(CSHA512& hasher, RNGState& rng, int microseconds) noexcept
{
// Generate 32 bytes of entropy from the RNG, and a copy of the entropy already in hasher.
unsigned char strengthen_seed[32];
rng.MixExtract(strengthen_seed, sizeof(strengthen_seed), CSHA512(hasher), false);
// Strengthen the seed, and feed it into hasher.
Strengthen(strengthen_seed, microseconds, hasher);
}
Reported by FlawFinder.