The following issues were found
test/functional/p2p_timeouts.py
17 issues
Line: 31
Column: 1
from test_framework.test_framework import BitcoinTestFramework
class TestP2PConn(P2PInterface):
def on_version(self, message):
# Don't send a verack in response
pass
Reported by Pylint.
Line: 37
Column: 1
pass
class TimeoutsTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
# set timeout to receive version/verack to 3 seconds
self.extra_args = [["-peertimeout=3"]]
Reported by Pylint.
Line: 47
Column: 1
def run_test(self):
# Setup the p2p connections
no_verack_node = self.nodes[0].add_p2p_connection(TestP2PConn(), wait_for_verack=False)
no_version_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
no_send_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
# Wait until we got the verack in response to the version. Though, don't wait for the other node to receive the
# verack, since we never sent one
no_verack_node.wait_for_verack()
Reported by Pylint.
Line: 48
Column: 1
# Setup the p2p connections
no_verack_node = self.nodes[0].add_p2p_connection(TestP2PConn(), wait_for_verack=False)
no_version_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
no_send_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
# Wait until we got the verack in response to the version. Though, don't wait for the other node to receive the
# verack, since we never sent one
no_verack_node.wait_for_verack()
Reported by Pylint.
Line: 50
Column: 1
no_version_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
no_send_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False, wait_for_verack=False)
# Wait until we got the verack in response to the version. Though, don't wait for the other node to receive the
# verack, since we never sent one
no_verack_node.wait_for_verack()
sleep(1)
Reported by Pylint.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
sleep(1)
assert no_verack_node.is_connected
assert no_version_node.is_connected
assert no_send_node.is_connected
with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']):
no_verack_node.send_message(msg_ping())
Reported by Bandit.
Line: 57
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
sleep(1)
assert no_verack_node.is_connected
assert no_version_node.is_connected
assert no_send_node.is_connected
with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']):
no_verack_node.send_message(msg_ping())
with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']):
Reported by Bandit.
Line: 58
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert no_verack_node.is_connected
assert no_version_node.is_connected
assert no_send_node.is_connected
with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']):
no_verack_node.send_message(msg_ping())
with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']):
no_version_node.send_message(msg_ping())
Reported by Bandit.
Line: 60
Column: 1
assert no_version_node.is_connected
assert no_send_node.is_connected
with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']):
no_verack_node.send_message(msg_ping())
with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']):
no_version_node.send_message(msg_ping())
sleep(1)
Reported by Pylint.
Line: 62
Column: 1
with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']):
no_verack_node.send_message(msg_ping())
with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']):
no_version_node.send_message(msg_ping())
sleep(1)
assert "version" in no_verack_node.last_message
Reported by Pylint.
test/functional/p2p_invalid_tx.py
17 issues
Line: 24
Column: 1
from data import invalid_txs
class InvalidTxRequestTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [[
"-acceptnonstdtxn=1",
]]
Reported by Pylint.
Line: 47
Column: 5
self.nodes[0].disconnect_p2ps()
self.bootstrap_p2p(**kwargs)
def run_test(self):
node = self.nodes[0] # convenience reference to the node
self.bootstrap_p2p() # Add one p2p connection to the node
best_block = self.nodes[0].getbestblockhash()
Reported by Pylint.
Line: 47
Column: 5
self.nodes[0].disconnect_p2ps()
self.bootstrap_p2p(**kwargs)
def run_test(self):
node = self.nodes[0] # convenience reference to the node
self.bootstrap_p2p() # Add one p2p connection to the node
best_block = self.nodes[0].getbestblockhash()
Reported by Pylint.
Line: 74
Column: 13
for BadTxTemplate in invalid_txs.iter_all_templates():
self.log.info("Testing invalid transaction: %s", BadTxTemplate.__name__)
template = BadTxTemplate(spend_block=block1)
tx = template.get_tx()
node.p2ps[0].send_txs_and_test(
[tx], node, success=False,
expect_disconnect=template.expect_disconnect,
reject_reason=template.reject_reason,
)
Reported by Pylint.
Line: 93
Column: 9
self.log.info('Test orphan transaction handling ... ')
# Create a root transaction that we withhold until all dependent transactions
# are sent out and in the orphan cache
SCRIPT_PUB_KEY_OP_TRUE = b'\x51\x75' * 15 + b'\x51'
tx_withhold = CTransaction()
tx_withhold.vin.append(CTxIn(outpoint=COutPoint(block1.vtx[0].sha256, 0)))
tx_withhold.vout.append(CTxOut(nValue=50 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
tx_withhold.calc_sha256()
Reported by Pylint.
Line: 96
Column: 1
SCRIPT_PUB_KEY_OP_TRUE = b'\x51\x75' * 15 + b'\x51'
tx_withhold = CTransaction()
tx_withhold.vin.append(CTxIn(outpoint=COutPoint(block1.vtx[0].sha256, 0)))
tx_withhold.vout.append(CTxOut(nValue=50 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
tx_withhold.calc_sha256()
# Our first orphan tx with some outputs to create further orphan txs
tx_orphan_1 = CTransaction()
tx_orphan_1.vin.append(CTxIn(outpoint=COutPoint(tx_withhold.sha256, 0)))
Reported by Pylint.
Line: 108
Column: 1
# A valid transaction with low fee
tx_orphan_2_no_fee = CTransaction()
tx_orphan_2_no_fee.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_1.sha256, 0)))
tx_orphan_2_no_fee.vout.append(CTxOut(nValue=10 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
# A valid transaction with sufficient fee
tx_orphan_2_valid = CTransaction()
tx_orphan_2_valid.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_1.sha256, 1)))
tx_orphan_2_valid.vout.append(CTxOut(nValue=10 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
Reported by Pylint.
Line: 113
Column: 1
# A valid transaction with sufficient fee
tx_orphan_2_valid = CTransaction()
tx_orphan_2_valid.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_1.sha256, 1)))
tx_orphan_2_valid.vout.append(CTxOut(nValue=10 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
tx_orphan_2_valid.calc_sha256()
# An invalid transaction with negative fee
tx_orphan_2_invalid = CTransaction()
tx_orphan_2_invalid.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_1.sha256, 2)))
Reported by Pylint.
Line: 119
Column: 1
# An invalid transaction with negative fee
tx_orphan_2_invalid = CTransaction()
tx_orphan_2_invalid.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_1.sha256, 2)))
tx_orphan_2_invalid.vout.append(CTxOut(nValue=11 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE))
tx_orphan_2_invalid.calc_sha256()
self.log.info('Send the orphans ... ')
# Send valid orphan txs from p2ps[0]
node.p2ps[0].send_txs_and_test([tx_orphan_1, tx_orphan_2_no_fee, tx_orphan_2_valid], node, success=False)
Reported by Pylint.
Line: 124
Column: 1
self.log.info('Send the orphans ... ')
# Send valid orphan txs from p2ps[0]
node.p2ps[0].send_txs_and_test([tx_orphan_1, tx_orphan_2_no_fee, tx_orphan_2_valid], node, success=False)
# Send invalid tx from p2ps[1]
node.p2ps[1].send_txs_and_test([tx_orphan_2_invalid], node, success=False)
assert_equal(0, node.getmempoolinfo()['size']) # Mempool should be empty
assert_equal(2, len(node.getpeerinfo())) # p2ps[1] is still connected
Reported by Pylint.
test/functional/p2p_feefilter.py
16 issues
Line: 17
Column: 1
from test_framework.wallet import MiniWallet
class FeefilterConn(P2PInterface):
feefilter_received = False
def on_feefilter(self, message):
self.feefilter_received = True
Reported by Pylint.
Line: 23
Column: 5
def on_feefilter(self, message):
self.feefilter_received = True
def assert_feefilter_received(self, recv: bool):
with p2p_lock:
assert_equal(self.feefilter_received, recv)
class TestP2PConn(P2PInterface):
Reported by Pylint.
Line: 28
Column: 1
assert_equal(self.feefilter_received, recv)
class TestP2PConn(P2PInterface):
def __init__(self):
super().__init__()
self.txinvs = []
def on_inv(self, message):
Reported by Pylint.
Line: 38
Column: 5
if (i.type == MSG_TX) or (i.type == MSG_WTX):
self.txinvs.append('{:064x}'.format(i.hash))
def wait_for_invs_to_match(self, invs_expected):
invs_expected.sort()
self.wait_until(lambda: invs_expected == sorted(self.txinvs))
def clear_invs(self):
with p2p_lock:
Reported by Pylint.
Line: 42
Column: 5
invs_expected.sort()
self.wait_until(lambda: invs_expected == sorted(self.txinvs))
def clear_invs(self):
with p2p_lock:
self.txinvs = []
class FeeFilterTest(BitcoinTestFramework):
Reported by Pylint.
Line: 47
Column: 1
self.txinvs = []
class FeeFilterTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
# We lower the various required feerates for this test
# to catch a corner-case where feefilter used to slightly undercut
# mempool and wallet feerate calculation based on GetFee
Reported by Pylint.
Line: 67
Column: 5
self.test_feefilter()
self.test_feefilter_blocksonly()
def test_feefilter_forcerelay(self):
self.log.info('Check that peers without forcerelay permission (default) get a feefilter message')
self.nodes[0].add_p2p_connection(FeefilterConn()).assert_feefilter_received(True)
self.log.info('Check that peers with forcerelay permission do not get a feefilter message')
self.restart_node(0, extra_args=['-whitelist=forcerelay@127.0.0.1'])
Reported by Pylint.
Line: 68
Column: 1
self.test_feefilter_blocksonly()
def test_feefilter_forcerelay(self):
self.log.info('Check that peers without forcerelay permission (default) get a feefilter message')
self.nodes[0].add_p2p_connection(FeefilterConn()).assert_feefilter_received(True)
self.log.info('Check that peers with forcerelay permission do not get a feefilter message')
self.restart_node(0, extra_args=['-whitelist=forcerelay@127.0.0.1'])
self.nodes[0].add_p2p_connection(FeefilterConn()).assert_feefilter_received(False)
Reported by Pylint.
Line: 79
Column: 5
self.restart_node(0)
self.connect_nodes(1, 0)
def test_feefilter(self):
node1 = self.nodes[1]
node0 = self.nodes[0]
miniwallet = MiniWallet(node1)
# Add enough mature utxos to the wallet, so that all txs spend confirmed coins
miniwallet.generate(5)
Reported by Pylint.
Line: 90
Column: 1
conn = self.nodes[0].add_p2p_connection(TestP2PConn())
self.log.info("Test txs paying 0.2 sat/byte are received by test connection")
txids = [miniwallet.send_self_transfer(fee_rate=Decimal('0.00000200'), from_node=node1)['wtxid'] for _ in range(3)]
conn.wait_for_invs_to_match(txids)
conn.clear_invs()
# Set a fee filter of 0.15 sat/byte on test connection
conn.send_and_ping(msg_feefilter(150))
Reported by Pylint.
test/functional/example_test.py
16 issues
Line: 58
Column: 9
def on_inv(self, message):
"""Override the standard on_inv callback"""
pass
def custom_function():
"""Do some custom behaviour
If this function is more generally useful for other tests, consider
Reported by Pylint.
Line: 66
Column: 5
If this function is more generally useful for other tests, consider
moving it to a module in test_framework."""
# self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework
pass
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
Reported by Pylint.
Line: 165
Column: 9
self.custom_method()
self.log.info("Create some blocks")
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
height = self.nodes[0].getblockcount()
for _ in range(10):
Reported by Pylint.
Line: 166
Column: 9
self.log.info("Create some blocks")
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
height = self.nodes[0].getblockcount()
for _ in range(10):
# Use the blocktools functionality to manually build a block.
Reported by Pylint.
Line: 179
Column: 13
block_message = msg_block(block)
# Send message is used to send a P2P message to the node over our P2PInterface
peer_messaging.send_message(block_message)
self.tip = block.sha256
blocks.append(self.tip)
self.block_time += 1
height += 1
self.log.info("Wait for node1 to reach current tip (height 11) using RPC")
Reported by Pylint.
Line: 33
Column: 1
# P2PInterface is a class containing callbacks to be executed when a P2P
# message is received from the node-under-test. Subclass P2PInterface and
# override the on_*() methods if you need custom behaviour.
class BaseNode(P2PInterface):
def __init__(self):
"""Initialize the P2PInterface
Used to initialize custom properties for the Node that aren't
included by default in the base class. Be aware that the P2PInterface
Reported by Pylint.
Line: 65
Column: 1
If this function is more generally useful for other tests, consider
moving it to a module in test_framework."""
# self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework
pass
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
Reported by Pylint.
Line: 69
Column: 1
pass
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
# Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network()
# and setup_nodes() methods to customize the test setup as required.
Reported by Pylint.
Line: 72
Column: 1
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
# Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network()
# and setup_nodes() methods to customize the test setup as required.
def set_test_params(self):
"""Override test parameters for your individual test.
Reported by Pylint.
Line: 87
Column: 1
# Use self.extra_args to change command-line arguments for the nodes
self.extra_args = [[], ["-logips"], []]
# self.log.info("I've finished set_test_params") # Oops! Can't run self.log before run_test()
# Use skip_test_if_missing_module() to skip the test if your test requires certain modules to be present.
# This test uses generate which requires wallet to be compiled
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
Reported by Pylint.
src/randomenv.cpp
16 issues
Line: 138
Column: 13
CWE codes:
362
void AddFile(CSHA512& hasher, const char *path)
{
struct stat sb = {};
int f = open(path, O_RDONLY);
size_t total = 0;
if (f != -1) {
unsigned char fbuf[4096];
int n;
hasher.Write((const unsigned char*)&f, sizeof(f));
Reported by FlawFinder.
Line: 141
Column: 18
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
int f = open(path, O_RDONLY);
size_t total = 0;
if (f != -1) {
unsigned char fbuf[4096];
int n;
hasher.Write((const unsigned char*)&f, sizeof(f));
if (fstat(f, &sb) == 0) hasher << sb;
do {
n = read(f, fbuf, sizeof(fbuf));
Reported by FlawFinder.
Line: 170
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
void AddSysctl(CSHA512& hasher)
{
int CTL[sizeof...(S)] = {S...};
unsigned char buffer[65536];
size_t siz = 65536;
int ret = sysctl(CTL, sizeof...(S), buffer, &siz, nullptr, 0);
if (ret == 0 || (ret == -1 && errno == ENOMEM)) {
hasher << sizeof(CTL);
hasher.Write((const unsigned char*)CTL, sizeof(CTL));
Reported by FlawFinder.
Line: 359
Column: 5
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
hasher << &hasher << &RandAddStaticEnv << &malloc << &errno << &environ;
// Hostname
char hname[256];
if (gethostname(hname, 256) == 0) {
hasher.Write((const unsigned char*)hname, strnlen(hname, 256));
}
#if HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS
Reported by FlawFinder.
Line: 146
Column: 17
CWE codes:
120
20
hasher.Write((const unsigned char*)&f, sizeof(f));
if (fstat(f, &sb) == 0) hasher << sb;
do {
n = read(f, fbuf, sizeof(fbuf));
if (n > 0) hasher.Write(fbuf, n);
total += n;
/* not bothering with EINTR handling. */
} while (n == sizeof(fbuf) && total < 1048576); // Read only the first 1 Mbyte
close(f);
Reported by FlawFinder.
Line: 159
Column: 50
CWE codes:
126
{
struct stat sb = {};
if (stat(path, &sb) == 0) {
hasher.Write((const unsigned char*)path, strlen(path) + 1);
hasher << sb;
}
}
#endif
Reported by FlawFinder.
Line: 323
Column: 58
CWE codes:
126
#endif
#ifdef __VERSION__
const char* COMPILER_VERSION = __VERSION__;
hasher.Write((const unsigned char*)COMPILER_VERSION, strlen(COMPILER_VERSION) + 1);
#endif
// Bitcoin client version
hasher << CLIENT_VERSION;
Reported by FlawFinder.
Line: 343
Column: 72
CWE codes:
126
# endif
# ifdef AT_PLATFORM
const char* platform_str = (const char*)getauxval(AT_PLATFORM);
if (platform_str) hasher.Write((const unsigned char*)platform_str, strlen(platform_str) + 1);
# endif
# ifdef AT_EXECFN
const char* exec_str = (const char*)getauxval(AT_EXECFN);
if (exec_str) hasher.Write((const unsigned char*)exec_str, strlen(exec_str) + 1);
# endif
Reported by FlawFinder.
Line: 347
Column: 64
CWE codes:
126
# endif
# ifdef AT_EXECFN
const char* exec_str = (const char*)getauxval(AT_EXECFN);
if (exec_str) hasher.Write((const unsigned char*)exec_str, strlen(exec_str) + 1);
# endif
#endif // HAVE_STRONG_GETAUXVAL || HAVE_WEAK_GETAUXVAL
#ifdef HAVE_GETCPUID
AddAllCPUID(hasher);
Reported by FlawFinder.
Line: 371
Column: 60
CWE codes:
126
struct ifaddrs *ifit = ifad;
while (ifit != NULL) {
hasher.Write((const unsigned char*)&ifit, sizeof(ifit));
hasher.Write((const unsigned char*)ifit->ifa_name, strlen(ifit->ifa_name) + 1);
hasher.Write((const unsigned char*)&ifit->ifa_flags, sizeof(ifit->ifa_flags));
AddSockaddr(hasher, ifit->ifa_addr);
AddSockaddr(hasher, ifit->ifa_netmask);
AddSockaddr(hasher, ifit->ifa_dstaddr);
ifit = ifit->ifa_next;
Reported by FlawFinder.
test/functional/p2p_disconnect_ban.py
16 issues
Line: 14
Column: 1
assert_raises_rpc_error,
)
class DisconnectBanTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.supports_cli = False
def run_test(self):
Reported by Pylint.
Line: 19
Column: 5
self.num_nodes = 2
self.supports_cli = False
def run_test(self):
self.log.info("Connect nodes both way")
# By default, the test framework sets up an addnode connection from
# node 1 --> node0. By connecting node0 --> node 1, we're left with
# the two nodes being connected both ways.
# Topology will look like: node0 <--> node1
Reported by Pylint.
Line: 30
Column: 1
self.log.info("Test setban and listbanned RPCs")
self.log.info("setban: successfully ban single IP address")
assert_equal(len(self.nodes[1].getpeerinfo()), 2) # node1 should have 2 connections to node0 at this point
self.nodes[1].setban(subnet="127.0.0.1", command="add")
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 0, timeout=10)
assert_equal(len(self.nodes[1].getpeerinfo()), 0) # all nodes must be disconnected at this point
assert_equal(len(self.nodes[1].listbanned()), 1)
Reported by Pylint.
Line: 33
Column: 1
assert_equal(len(self.nodes[1].getpeerinfo()), 2) # node1 should have 2 connections to node0 at this point
self.nodes[1].setban(subnet="127.0.0.1", command="add")
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 0, timeout=10)
assert_equal(len(self.nodes[1].getpeerinfo()), 0) # all nodes must be disconnected at this point
assert_equal(len(self.nodes[1].listbanned()), 1)
self.log.info("clearbanned: successfully clear ban list")
self.nodes[1].clearbanned()
assert_equal(len(self.nodes[1].listbanned()), 0)
Reported by Pylint.
Line: 43
Column: 1
self.log.info("setban: fail to ban an already banned subnet")
assert_equal(len(self.nodes[1].listbanned()), 1)
assert_raises_rpc_error(-23, "IP/Subnet already banned", self.nodes[1].setban, "127.0.0.1", "add")
self.log.info("setban: fail to ban an invalid subnet")
assert_raises_rpc_error(-30, "Error: Invalid IP/Subnet", self.nodes[1].setban, "127.0.0.1/42", "add")
assert_equal(len(self.nodes[1].listbanned()), 1) # still only one banned ip because 127.0.0.1 is within the range of 127.0.0.0/24
Reported by Pylint.
Line: 46
Column: 1
assert_raises_rpc_error(-23, "IP/Subnet already banned", self.nodes[1].setban, "127.0.0.1", "add")
self.log.info("setban: fail to ban an invalid subnet")
assert_raises_rpc_error(-30, "Error: Invalid IP/Subnet", self.nodes[1].setban, "127.0.0.1/42", "add")
assert_equal(len(self.nodes[1].listbanned()), 1) # still only one banned ip because 127.0.0.1 is within the range of 127.0.0.0/24
self.log.info("setban remove: fail to unban a non-banned subnet")
assert_raises_rpc_error(-30, "Error: Unban failed", self.nodes[1].setban, "127.0.0.1", "remove")
assert_equal(len(self.nodes[1].listbanned()), 1)
Reported by Pylint.
Line: 47
Column: 1
self.log.info("setban: fail to ban an invalid subnet")
assert_raises_rpc_error(-30, "Error: Invalid IP/Subnet", self.nodes[1].setban, "127.0.0.1/42", "add")
assert_equal(len(self.nodes[1].listbanned()), 1) # still only one banned ip because 127.0.0.1 is within the range of 127.0.0.0/24
self.log.info("setban remove: fail to unban a non-banned subnet")
assert_raises_rpc_error(-30, "Error: Unban failed", self.nodes[1].setban, "127.0.0.1", "remove")
assert_equal(len(self.nodes[1].listbanned()), 1)
Reported by Pylint.
Line: 50
Column: 1
assert_equal(len(self.nodes[1].listbanned()), 1) # still only one banned ip because 127.0.0.1 is within the range of 127.0.0.0/24
self.log.info("setban remove: fail to unban a non-banned subnet")
assert_raises_rpc_error(-30, "Error: Unban failed", self.nodes[1].setban, "127.0.0.1", "remove")
assert_equal(len(self.nodes[1].listbanned()), 1)
self.log.info("setban remove: successfully unban subnet")
self.nodes[1].setban("127.0.0.0/24", "remove")
assert_equal(len(self.nodes[1].listbanned()), 0)
Reported by Pylint.
Line: 66
Column: 1
old_time = int(time.time())
self.nodes[1].setmocktime(old_time)
self.nodes[1].setban("192.168.0.1", "add", 1) # ban for 1 seconds
self.nodes[1].setban("2001:4d48:ac57:400:cacf:e9ff:fe1d:9c63/19", "add", 1000) # ban for 1000 seconds
listBeforeShutdown = self.nodes[1].listbanned()
assert_equal("192.168.0.1/32", listBeforeShutdown[2]['address'])
# Move time forward by 3 seconds so the third ban has expired
self.nodes[1].setmocktime(old_time + 3)
assert_equal(len(self.nodes[1].listbanned()), 3)
Reported by Pylint.
Line: 67
Column: 9
self.nodes[1].setmocktime(old_time)
self.nodes[1].setban("192.168.0.1", "add", 1) # ban for 1 seconds
self.nodes[1].setban("2001:4d48:ac57:400:cacf:e9ff:fe1d:9c63/19", "add", 1000) # ban for 1000 seconds
listBeforeShutdown = self.nodes[1].listbanned()
assert_equal("192.168.0.1/32", listBeforeShutdown[2]['address'])
# Move time forward by 3 seconds so the third ban has expired
self.nodes[1].setmocktime(old_time + 3)
assert_equal(len(self.nodes[1].listbanned()), 3)
Reported by Pylint.
test/functional/rpc_setban.py
16 issues
Line: 12
Column: 1
p2p_port
)
class SetBanTests(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [[],[]]
Reported by Pylint.
Line: 18
Column: 5
self.setup_clean_chain = True
self.extra_args = [[],[]]
def is_banned(self, node, addr):
return any(e['address'] == addr for e in node.listbanned())
def run_test(self):
# Node 0 connects to Node 1, check that the noban permission is not granted
self.connect_nodes(0, 1)
Reported by Pylint.
Line: 18
Column: 5
self.setup_clean_chain = True
self.extra_args = [[],[]]
def is_banned(self, node, addr):
return any(e['address'] == addr for e in node.listbanned())
def run_test(self):
# Node 0 connects to Node 1, check that the noban permission is not granted
self.connect_nodes(0, 1)
Reported by Pylint.
Line: 25
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# Node 0 connects to Node 1, check that the noban permission is not granted
self.connect_nodes(0, 1)
peerinfo = self.nodes[1].getpeerinfo()[0]
assert not "noban" in peerinfo["permissions"]
# Node 0 get banned by Node 1
self.nodes[1].setban("127.0.0.1", "add")
# Node 0 should not be able to reconnect
Reported by Bandit.
Line: 39
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.restart_node(1, ['-whitelist=127.0.0.1'])
self.connect_nodes(0, 1)
peerinfo = self.nodes[1].getpeerinfo()[0]
assert "noban" in peerinfo["permissions"]
# If we remove the ban, Node 0 should be able to reconnect even without noban permission
self.nodes[1].setban("127.0.0.1", "remove")
self.restart_node(1, [])
self.connect_nodes(0, 1)
Reported by Bandit.
Line: 46
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
self.restart_node(1, [])
self.connect_nodes(0, 1)
peerinfo = self.nodes[1].getpeerinfo()[0]
assert not "noban" in peerinfo["permissions"]
self.log.info("Test that a non-IP address can be banned/unbanned")
node = self.nodes[1]
tor_addr = "pg6mmjiyjmcrsslvykfwnntlaru7p5svn6y2ymmju6nubxndf4pscryd.onion"
ip_addr = "1.2.3.4"
Reported by Bandit.
Line: 52
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
node = self.nodes[1]
tor_addr = "pg6mmjiyjmcrsslvykfwnntlaru7p5svn6y2ymmju6nubxndf4pscryd.onion"
ip_addr = "1.2.3.4"
assert not self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
node.setban(tor_addr, "add")
assert self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
Reported by Bandit.
Line: 53
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
tor_addr = "pg6mmjiyjmcrsslvykfwnntlaru7p5svn6y2ymmju6nubxndf4pscryd.onion"
ip_addr = "1.2.3.4"
assert not self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
node.setban(tor_addr, "add")
assert self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
Reported by Bandit.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert not self.is_banned(node, ip_addr)
node.setban(tor_addr, "add")
assert self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
self.log.info("Test the ban list is preserved through restart")
self.restart_node(1)
Reported by Bandit.
Line: 57
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
node.setban(tor_addr, "add")
assert self.is_banned(node, tor_addr)
assert not self.is_banned(node, ip_addr)
self.log.info("Test the ban list is preserved through restart")
self.restart_node(1)
assert self.is_banned(node, tor_addr)
Reported by Bandit.
test/functional/p2p_blocksonly.py
16 issues
Line: 24
Column: 9
self.extra_args = [["-blocksonly"]]
def run_test(self):
self.miniwallet = MiniWallet(self.nodes[0])
# Add enough mature utxos to the wallet, so that all txs spend confirmed coins
self.miniwallet.generate(2)
self.nodes[0].generate(COINBASE_MATURITY)
self.blocksonly_mode_tests()
Reported by Pylint.
Line: 17
Column: 1
from test_framework.wallet import MiniWallet
class P2PBlocksOnly(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [["-blocksonly"]]
Reported by Pylint.
Line: 32
Column: 5
self.blocksonly_mode_tests()
self.blocks_relay_conn_tests()
def blocksonly_mode_tests(self):
self.log.info("Tests with node running in -blocksonly mode")
assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], False)
self.nodes[0].add_p2p_connection(P2PInterface())
tx, txid, wtxid, tx_hex = self.check_p2p_tx_violation()
Reported by Pylint.
Line: 37
Column: 9
assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], False)
self.nodes[0].add_p2p_connection(P2PInterface())
tx, txid, wtxid, tx_hex = self.check_p2p_tx_violation()
self.log.info('Check that txs from rpc are not rejected and relayed to other peers')
tx_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface())
assert_equal(self.nodes[0].getpeerinfo()[0]['relaytxes'], True)
Reported by Pylint.
Line: 60
Column: 1
assert_equal(peer_2_info['permissions'], ['relay'])
assert_equal(self.nodes[0].testmempoolaccept([tx_hex])[0]['allowed'], True)
self.log.info('Check that the tx from first_peer with relay-permission is relayed to others (ie.second_peer)')
with self.nodes[0].assert_debug_log(["received getdata"]):
# Note that normally, first_peer would never send us transactions since we're a blocksonly node.
# By activating blocksonly, we explicitly tell our peers that they should not send us transactions,
# and Bitcoin Core respects that choice and will not send transactions.
# But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to
Reported by Pylint.
Line: 62
Column: 1
self.log.info('Check that the tx from first_peer with relay-permission is relayed to others (ie.second_peer)')
with self.nodes[0].assert_debug_log(["received getdata"]):
# Note that normally, first_peer would never send us transactions since we're a blocksonly node.
# By activating blocksonly, we explicitly tell our peers that they should not send us transactions,
# and Bitcoin Core respects that choice and will not send transactions.
# But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to
# second_peer since we gave relay permission to first_peer.
# See https://github.com/bitcoin/bitcoin/issues/19943 for details.
Reported by Pylint.
Line: 63
Column: 1
self.log.info('Check that the tx from first_peer with relay-permission is relayed to others (ie.second_peer)')
with self.nodes[0].assert_debug_log(["received getdata"]):
# Note that normally, first_peer would never send us transactions since we're a blocksonly node.
# By activating blocksonly, we explicitly tell our peers that they should not send us transactions,
# and Bitcoin Core respects that choice and will not send transactions.
# But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to
# second_peer since we gave relay permission to first_peer.
# See https://github.com/bitcoin/bitcoin/issues/19943 for details.
first_peer.send_message(msg_tx(tx))
Reported by Pylint.
Line: 65
Column: 1
# Note that normally, first_peer would never send us transactions since we're a blocksonly node.
# By activating blocksonly, we explicitly tell our peers that they should not send us transactions,
# and Bitcoin Core respects that choice and will not send transactions.
# But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to
# second_peer since we gave relay permission to first_peer.
# See https://github.com/bitcoin/bitcoin/issues/19943 for details.
first_peer.send_message(msg_tx(tx))
self.log.info('Check that the peer with relay-permission is still connected after sending the transaction')
assert_equal(first_peer.is_connected, True)
Reported by Pylint.
Line: 69
Column: 1
# second_peer since we gave relay permission to first_peer.
# See https://github.com/bitcoin/bitcoin/issues/19943 for details.
first_peer.send_message(msg_tx(tx))
self.log.info('Check that the peer with relay-permission is still connected after sending the transaction')
assert_equal(first_peer.is_connected, True)
second_peer.wait_for_tx(txid)
assert_equal(self.nodes[0].getmempoolinfo()['size'], 1)
self.log.info("Relay-permission peer's transaction is accepted and relayed")
Reported by Pylint.
Line: 78
Column: 5
self.nodes[0].disconnect_p2ps()
self.nodes[0].generate(1)
def blocks_relay_conn_tests(self):
self.log.info('Tests with node in normal mode with block-relay-only connections')
self.restart_node(0, ["-noblocksonly"]) # disables blocks only mode
assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], True)
# Ensure we disconnect if a block-relay-only connection sends us a transaction
Reported by Pylint.
test/functional/p2p_add_connections.py
16 issues
Line: 29
Column: 13
def run_test(self):
self.log.info("Add 8 outbounds to node 0")
for i in range(8):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i, connection_type="outbound-full-relay")
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
Reported by Pylint.
Line: 34
Column: 13
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# set p2p_idx based on the outbound connections already open to the
# node, so add 8 to account for the previous full-relay connections
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 8, connection_type="block-relay-only")
self.log.info("Add 2 block-relay-only connections to node 1")
Reported by Pylint.
Line: 41
Column: 13
self.log.info("Add 2 block-relay-only connections to node 1")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
self.nodes[1].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i, connection_type="block-relay-only")
self.log.info("Add 5 inbound connections to node 1")
for i in range(5):
self.log.info(f"inbound: {i}")
Reported by Pylint.
Line: 46
Column: 13
self.log.info("Add 5 inbound connections to node 1")
for i in range(5):
self.log.info(f"inbound: {i}")
self.nodes[1].add_p2p_connection(P2PInterface())
self.log.info("Add 8 outbounds to node 1")
for i in range(8):
self.log.info(f"outbound: {i}")
Reported by Pylint.
Line: 51
Column: 13
self.log.info("Add 8 outbounds to node 1")
for i in range(8):
self.log.info(f"outbound: {i}")
# bump p2p_idx to account for the 2 existing outbounds on node 1
self.nodes[1].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 2)
self.log.info("Check the connections opened as expected")
check_node_connections(node=self.nodes[0], num_in=0, num_out=10)
Reported by Pylint.
Line: 65
Column: 13
self.log.info("Add 8 outbounds to node 0")
for i in range(8):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i)
check_node_connections(node=self.nodes[0], num_in=0, num_out=8)
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
Reported by Pylint.
Line: 71
Column: 13
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# bump p2p_idx to account for the 8 existing outbounds on node 0
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 8, connection_type="block-relay-only")
check_node_connections(node=self.nodes[0], num_in=0, num_out=10)
self.log.info("Restart node 0 and try to reconnect to p2ps")
Reported by Pylint.
Line: 81
Column: 13
self.log.info("Add 4 outbounds to node 0")
for i in range(4):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i)
check_node_connections(node=self.nodes[0], num_in=0, num_out=4)
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
Reported by Pylint.
Line: 87
Column: 13
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# bump p2p_idx to account for the 4 existing outbounds on node 0
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 4, connection_type="block-relay-only")
check_node_connections(node=self.nodes[0], num_in=0, num_out=6)
check_node_connections(node=self.nodes[1], num_in=5, num_out=10)
Reported by Pylint.
Line: 12
Column: 1
from test_framework.util import assert_equal
def check_node_connections(*, node, num_in, num_out):
info = node.getnetworkinfo()
assert_equal(info["connections_in"], num_in)
assert_equal(info["connections_out"], num_out)
Reported by Pylint.
src/pubkey.cpp
16 issues
Line: 39
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
size_t rpos, rlen, spos, slen;
size_t pos = 0;
size_t lenbyte;
unsigned char tmpsig[64] = {0};
int overflow = 0;
/* Hack to initialize sig with a correctly-parsed but invalid signature. */
secp256k1_ecdsa_signature_parse_compact(ctx, sig, tmpsig);
Reported by FlawFinder.
Line: 150
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
if (rlen > 32) {
overflow = 1;
} else {
memcpy(tmpsig + 32 - rlen, input + rpos, rlen);
}
/* Ignore leading zeroes in S */
while (slen > 0 && input[spos] == 0) {
slen--;
Reported by FlawFinder.
Line: 162
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
if (slen > 32) {
overflow = 1;
} else {
memcpy(tmpsig + 64 - slen, input + spos, slen);
}
if (!overflow) {
overflow = !secp256k1_ecdsa_signature_parse_compact(ctx, sig, tmpsig);
}
Reported by FlawFinder.
Line: 268
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
if (!secp256k1_ecdsa_recover(secp256k1_context_verify, &pubkey, &sig, hash.begin())) {
return false;
}
unsigned char pub[SIZE];
size_t publen = SIZE;
secp256k1_ec_pubkey_serialize(secp256k1_context_verify, pub, &publen, &pubkey, fComp ? SECP256K1_EC_COMPRESSED : SECP256K1_EC_UNCOMPRESSED);
Set(pub, pub + publen);
return true;
}
Reported by FlawFinder.
Line: 291
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
if (!secp256k1_ec_pubkey_parse(secp256k1_context_verify, &pubkey, vch, size())) {
return false;
}
unsigned char pub[SIZE];
size_t publen = SIZE;
secp256k1_ec_pubkey_serialize(secp256k1_context_verify, pub, &publen, &pubkey, SECP256K1_EC_UNCOMPRESSED);
Set(pub, pub + publen);
return true;
}
Reported by FlawFinder.
Line: 302
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
assert(IsValid());
assert((nChild >> 31) == 0);
assert(size() == COMPRESSED_SIZE);
unsigned char out[64];
BIP32Hash(cc, nChild, *begin(), begin()+1, out);
memcpy(ccChild.begin(), out+32, 32);
secp256k1_pubkey pubkey;
assert(secp256k1_context_verify && "secp256k1_context_verify must be initialized to use CPubKey.");
if (!secp256k1_ec_pubkey_parse(secp256k1_context_verify, &pubkey, vch, size())) {
Reported by FlawFinder.
Line: 304
Column: 5
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
assert(size() == COMPRESSED_SIZE);
unsigned char out[64];
BIP32Hash(cc, nChild, *begin(), begin()+1, out);
memcpy(ccChild.begin(), out+32, 32);
secp256k1_pubkey pubkey;
assert(secp256k1_context_verify && "secp256k1_context_verify must be initialized to use CPubKey.");
if (!secp256k1_ec_pubkey_parse(secp256k1_context_verify, &pubkey, vch, size())) {
return false;
}
Reported by FlawFinder.
Line: 313
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
if (!secp256k1_ec_pubkey_tweak_add(secp256k1_context_verify, &pubkey, out)) {
return false;
}
unsigned char pub[COMPRESSED_SIZE];
size_t publen = COMPRESSED_SIZE;
secp256k1_ec_pubkey_serialize(secp256k1_context_verify, pub, &publen, &pubkey, SECP256K1_EC_COMPRESSED);
pubkeyChild.Set(pub, pub + publen);
return true;
}
Reported by FlawFinder.
Line: 320
Column: 34
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
return true;
}
void CExtPubKey::Encode(unsigned char code[BIP32_EXTKEY_SIZE]) const {
code[0] = nDepth;
memcpy(code+1, vchFingerprint, 4);
code[5] = (nChild >> 24) & 0xFF; code[6] = (nChild >> 16) & 0xFF;
code[7] = (nChild >> 8) & 0xFF; code[8] = (nChild >> 0) & 0xFF;
memcpy(code+9, chaincode.begin(), 32);
Reported by FlawFinder.
Line: 322
Column: 5
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
void CExtPubKey::Encode(unsigned char code[BIP32_EXTKEY_SIZE]) const {
code[0] = nDepth;
memcpy(code+1, vchFingerprint, 4);
code[5] = (nChild >> 24) & 0xFF; code[6] = (nChild >> 16) & 0xFF;
code[7] = (nChild >> 8) & 0xFF; code[8] = (nChild >> 0) & 0xFF;
memcpy(code+9, chaincode.begin(), 32);
assert(pubkey.size() == CPubKey::COMPRESSED_SIZE);
memcpy(code+41, pubkey.begin(), CPubKey::COMPRESSED_SIZE);
Reported by FlawFinder.