The following issues were found
test/functional/p2p_unrequested_blocks.py
23 issues
Line: 80
Column: 9
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
# 1. Have nodes mine a block (leave IBD)
[n.generatetoaddress(1, n.get_deterministic_priv_key().address) for n in self.nodes]
tips = [int("0x" + n.getbestblockhash(), 0) for n in self.nodes]
# 2. Send one block that builds on each tip.
# This should be accepted by node0
blocks_h2 = [] # the height 2 blocks on each node's chain
Reported by Pylint.
Line: 66
Column: 1
)
class AcceptBlockTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.extra_args = [[], ["-minimumchainwork=0x10"]]
Reported by Pylint.
Line: 75
Column: 5
def setup_network(self):
self.setup_nodes()
def run_test(self):
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
# 1. Have nodes mine a block (leave IBD)
[n.generatetoaddress(1, n.get_deterministic_priv_key().address) for n in self.nodes]
Reported by Pylint.
Line: 75
Column: 5
def setup_network(self):
self.setup_nodes()
def run_test(self):
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
# 1. Have nodes mine a block (leave IBD)
[n.generatetoaddress(1, n.get_deterministic_priv_key().address) for n in self.nodes]
Reported by Pylint.
Line: 75
Column: 5
def setup_network(self):
self.setup_nodes()
def run_test(self):
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
# 1. Have nodes mine a block (leave IBD)
[n.generatetoaddress(1, n.get_deterministic_priv_key().address) for n in self.nodes]
Reported by Pylint.
Line: 99
Column: 1
self.log.info("First height 2 block accepted by node0; correctly rejected by node1")
# 3. Send another block that builds on genesis.
block_h1f = create_block(int("0x" + self.nodes[0].getblockhash(0), 0), create_coinbase(1), block_time)
block_time += 1
block_h1f.solve()
test_node.send_and_ping(msg_block(block_h1f))
tip_entry_found = False
Reported by Pylint.
Line: 105
Column: 13
test_node.send_and_ping(msg_block(block_h1f))
tip_entry_found = False
for x in self.nodes[0].getchaintips():
if x['hash'] == block_h1f.hash:
assert_equal(x['status'], "headers-only")
tip_entry_found = True
assert tip_entry_found
assert_raises_rpc_error(-1, "Block not found on disk", self.nodes[0].getblock, block_h1f.hash)
Reported by Pylint.
Line: 109
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if x['hash'] == block_h1f.hash:
assert_equal(x['status'], "headers-only")
tip_entry_found = True
assert tip_entry_found
assert_raises_rpc_error(-1, "Block not found on disk", self.nodes[0].getblock, block_h1f.hash)
# 4. Send another two block that build on the fork.
block_h2f = create_block(block_h1f.sha256, create_coinbase(2), block_time)
block_time += 1
Reported by Bandit.
Line: 110
Column: 1
assert_equal(x['status'], "headers-only")
tip_entry_found = True
assert tip_entry_found
assert_raises_rpc_error(-1, "Block not found on disk", self.nodes[0].getblock, block_h1f.hash)
# 4. Send another two block that build on the fork.
block_h2f = create_block(block_h1f.sha256, create_coinbase(2), block_time)
block_time += 1
block_h2f.solve()
Reported by Pylint.
Line: 121
Column: 13
# Since the earlier block was not processed by node, the new block
# can't be fully validated.
tip_entry_found = False
for x in self.nodes[0].getchaintips():
if x['hash'] == block_h2f.hash:
assert_equal(x['status'], "headers-only")
tip_entry_found = True
assert tip_entry_found
Reported by Pylint.
contrib/tracing/p2p_monitor.py
23 issues
Line: 17
Column: 1
import sys
import curses
from curses import wrapper, panel
from bcc import BPF, USDT
# BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into
# a sandboxed Linux kernel VM.
program = """
#include <uapi/linux/ptrace.h>
Reported by Pylint.
Line: 170
Column: 31
info_panel = panel.new_panel(win)
info_panel.hide()
ROWS_AVALIABLE_FOR_LIST = curses.LINES - 5
scroll = 0
while True:
try:
# BCC: poll the perf buffers for new events or timeout after 50ms
Reported by Pylint.
Line: 97
Column: 24
total_outbound_msgs = 0
total_outbound_bytes = 0
def __init__(self, id, address, connection_type):
self.id = id
self.address = address
self.connection_type = connection_type
self.last_messages = list()
Reported by Pylint.
Line: 128
Column: 33
bpf = BPF(text=program, usdt_contexts=[bitcoind_with_usdts])
# BCC: perf buffer handle function for inbound_messages
def handle_inbound(_, data, size):
""" Inbound message handler.
Called each time a message is submitted to the inbound_messages BPF table."""
event = bpf["inbound_messages"].event(data)
if event.peer_id not in peers:
Reported by Pylint.
Line: 141
Column: 34
Message(event.msg_type.decode("utf-8"), event.msg_size, True))
# BCC: perf buffer handle function for outbound_messages
def handle_outbound(_, data, size):
""" Outbound message handler.
Called each time a message is submitted to the outbound_messages BPF table."""
event = bpf["outbound_messages"].event(data)
if event.peer_id not in peers:
Reported by Pylint.
Line: 234
Column: 13
info_window.addstr(
2, 1, f" OUR NODE{peer.connection_type:^54}PEER ",
curses.A_BOLD)
for i, msg in enumerate(peer.last_messages):
if msg.inbound:
info_window.addstr(
i + 3, 1, "%68s" %
(f"<--- {msg.msg_type} ({msg.size} bytes) "), curses.A_NORMAL)
else:
Reported by Pylint.
Line: 21
Column: 1
# BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into
# a sandboxed Linux kernel VM.
program = """
#include <uapi/linux/ptrace.h>
// Tor v3 addresses are 62 chars + 6 chars for the port (':12345').
// I2P addresses are 60 chars + 6 chars for the port (':12345').
#define MAX_PEER_ADDR_LENGTH 62 + 6
Reported by Pylint.
Line: 72
Column: 1
"""
class Message:
""" A P2P network message. """
msg_type = ""
size = 0
data = bytes()
inbound = False
Reported by Pylint.
Line: 85
Column: 1
self.inbound = inbound
class Peer:
""" A P2P network peer. """
id = 0
address = ""
connection_type = ""
last_messages = list()
Reported by Pylint.
Line: 85
Column: 1
self.inbound = inbound
class Peer:
""" A P2P network peer. """
id = 0
address = ""
connection_type = ""
last_messages = list()
Reported by Pylint.
test/functional/wallet_labels.py
23 issues
Line: 20
Column: 1
from test_framework.wallet_util import test_address
class WalletLabelsTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def skip_test_if_missing_module(self):
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Check that there's no UTXO on the node
node = self.nodes[0]
assert_equal(len(node.listunspent()), 0)
# Note each time we call generate, all generated coins go into
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Check that there's no UTXO on the node
node = self.nodes[0]
assert_equal(len(node.listunspent()), 0)
# Note each time we call generate, all generated coins go into
Reported by Pylint.
Line: 28
Column: 5
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Check that there's no UTXO on the node
node = self.nodes[0]
assert_equal(len(node.listunspent()), 0)
# Note each time we call generate, all generated coins go into
Reported by Pylint.
Line: 36
Column: 1
# Note each time we call generate, all generated coins go into
# the same address, so we call twice to get two addresses w/50 each
node.generatetoaddress(nblocks=1, address=node.getnewaddress(label='coinbase'))
node.generatetoaddress(nblocks=COINBASE_MATURITY + 1, address=node.getnewaddress(label='coinbase'))
assert_equal(node.getbalance(), 100)
# there should be 2 address groups
# each with 1 address with a balance of 50 Bitcoins
address_groups = node.listaddressgroupings()
Reported by Pylint.
Line: 65
Column: 22
address_groups = node.listaddressgroupings()
assert_equal(len(address_groups), 1)
assert_equal(len(address_groups[0]), 2)
assert_equal(set([a[0] for a in address_groups[0]]), linked_addresses)
assert_equal([a[1] for a in address_groups[0]], [0, 0])
node.generate(1)
# we want to reset so that the "" label has what's expected.
Reported by Pylint.
Line: 143
Column: 13
self.log.info('Check watchonly labels')
node.createwallet(wallet_name='watch_only', disable_private_keys=True)
wallet_watch_only = node.get_wallet_rpc('watch_only')
BECH32_VALID = {
'✔️_VER15_PROG40': 'bcrt10qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqxkg7fn',
'✔️_VER16_PROG03': 'bcrt1sqqqqq8uhdgr',
'✔️_VER16_PROB02': 'bcrt1sqqqq4wstyw',
}
BECH32_INVALID = {
Reported by Pylint.
Line: 144
Column: 1
node.createwallet(wallet_name='watch_only', disable_private_keys=True)
wallet_watch_only = node.get_wallet_rpc('watch_only')
BECH32_VALID = {
'✔️_VER15_PROG40': 'bcrt10qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqxkg7fn',
'✔️_VER16_PROG03': 'bcrt1sqqqqq8uhdgr',
'✔️_VER16_PROB02': 'bcrt1sqqqq4wstyw',
}
BECH32_INVALID = {
'❌_VER15_PROG41': 'bcrt1sqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqajlxj8',
Reported by Pylint.
Line: 148
Column: 13
'✔️_VER16_PROG03': 'bcrt1sqqqqq8uhdgr',
'✔️_VER16_PROB02': 'bcrt1sqqqq4wstyw',
}
BECH32_INVALID = {
'❌_VER15_PROG41': 'bcrt1sqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqajlxj8',
'❌_VER16_PROB01': 'bcrt1sqq5r4036',
}
for l in BECH32_VALID:
ad = BECH32_VALID[l]
Reported by Pylint.
Line: 149
Column: 1
'✔️_VER16_PROB02': 'bcrt1sqqqq4wstyw',
}
BECH32_INVALID = {
'❌_VER15_PROG41': 'bcrt1sqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqajlxj8',
'❌_VER16_PROB01': 'bcrt1sqq5r4036',
}
for l in BECH32_VALID:
ad = BECH32_VALID[l]
wallet_watch_only.importaddress(label=l, rescan=False, address=ad)
Reported by Pylint.
src/key.cpp
23 issues
Line: 76
Column: 5
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
if (oslen > 32 || end - seckey < oslen) {
return 0;
}
memcpy(out32 + (32 - oslen), seckey, oslen);
if (!secp256k1_ec_seckey_verify(ctx, out32)) {
memset(out32, 0, 32);
return 0;
}
return 1;
Reported by FlawFinder.
Line: 118
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
0x8C,0xD0,0x36,0x41,0x41,0x02,0x01,0x01,0xA1,0x24,0x03,0x22,0x00
};
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::COMPRESSED_SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_COMPRESSED);
ptr += pubkeylen;
Reported by FlawFinder.
Line: 119
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
};
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::COMPRESSED_SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_COMPRESSED);
ptr += pubkeylen;
*seckeylen = ptr - seckey;
Reported by FlawFinder.
Line: 120
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::COMPRESSED_SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_COMPRESSED);
ptr += pubkeylen;
*seckeylen = ptr - seckey;
assert(*seckeylen == CKey::COMPRESSED_SIZE);
Reported by FlawFinder.
Line: 144
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
0x8C,0xD0,0x36,0x41,0x41,0x02,0x01,0x01,0xA1,0x44,0x03,0x42,0x00
};
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_UNCOMPRESSED);
ptr += pubkeylen;
Reported by FlawFinder.
Line: 145
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
};
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_UNCOMPRESSED);
ptr += pubkeylen;
*seckeylen = ptr - seckey;
Reported by FlawFinder.
Line: 146
Column: 9
CWE codes:
120
Suggestion:
Make sure destination can always hold the source data
unsigned char *ptr = seckey;
memcpy(ptr, begin, sizeof(begin)); ptr += sizeof(begin);
memcpy(ptr, key32, 32); ptr += 32;
memcpy(ptr, middle, sizeof(middle)); ptr += sizeof(middle);
pubkeylen = CPubKey::SIZE;
secp256k1_ec_pubkey_serialize(ctx, ptr, &pubkeylen, &pubkey, SECP256K1_EC_UNCOMPRESSED);
ptr += pubkeylen;
*seckeylen = ptr - seckey;
assert(*seckeylen == CKey::SIZE);
Reported by FlawFinder.
Line: 203
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
// Check that the sig has a low R value and will be less than 71 bytes
bool SigHasLowR(const secp256k1_ecdsa_signature* sig)
{
unsigned char compact_sig[64];
secp256k1_ecdsa_signature_serialize_compact(secp256k1_context_sign, compact_sig, sig);
// In DER serialization, all values are interpreted as big-endian, signed integers. The highest bit in the integer indicates
// its signed-ness; 0 is positive, 1 is negative. When the value is interpreted as a negative integer, it must be converted
// to a positive value by prepending a 0x00 byte so that the highest bit is 0. We can avoid this prepending by ensuring that
Reported by FlawFinder.
Line: 218
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
return false;
vchSig.resize(CPubKey::SIGNATURE_SIZE);
size_t nSigLen = CPubKey::SIGNATURE_SIZE;
unsigned char extra_entropy[32] = {0};
WriteLE32(extra_entropy, test_case);
secp256k1_ecdsa_signature sig;
uint32_t counter = 0;
int ret = secp256k1_ecdsa_sign(secp256k1_context_sign, &sig, hash.begin(), begin(), secp256k1_nonce_function_rfc6979, (!grind && test_case) ? extra_entropy : nullptr);
Reported by FlawFinder.
Line: 239
Column: 14
CWE codes:
119
120
Suggestion:
Perform bounds checking, use functions that limit length, or ensure that the size is larger than the maximum possible length
if (pubkey.IsCompressed() != fCompressed) {
return false;
}
unsigned char rnd[8];
std::string str = "Bitcoin key verification\n";
GetRandBytes(rnd, sizeof(rnd));
uint256 hash;
CHash256().Write(MakeUCharSpan(str)).Write(rnd).Finalize(hash);
std::vector<unsigned char> vchSig;
Reported by FlawFinder.
test/functional/mocks/signer.py
23 issues
Line: 20
Column: 1
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 20
Column: 15
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 20
Column: 15
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
Reported by Pylint.
Line: 23
Column: 20
def enumerate(args):
sys.stdout.write(json.dumps([{"fingerprint": "00000001", "type": "trezor", "model": "trezor_t"}, {"fingerprint": "00000002"}]))
def getdescriptors(args):
xpub = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
sys.stdout.write(json.dumps({
"receive": [
"pkh([00000001/44'/1'/" + args.account + "']" + xpub + "/0/*)#vt6w3l3j",
Reported by Pylint.
Line: 40
Column: 20
}))
def displayaddress(args):
# Several descriptor formats are acceptable, so allowing for potential
# changes to InferDescriptor:
if args.fingerprint != "00000001":
return sys.stdout.write(json.dumps({"error": "Unexpected fingerprint", "fingerprint": args.fingerprint}))
Reported by Pylint.
Line: 54
Column: 12
return sys.stdout.write(json.dumps({"address": "bcrt1qm90ugl4d48jv8n6e5t9ln6t9zlpm5th68x4f8g"}))
def signtx(args):
if args.fingerprint != "00000001":
return sys.stdout.write(json.dumps({"error": "Unexpected fingerprint", "fingerprint": args.fingerprint}))
with open(os.path.join(os.getcwd(), "mock_psbt"), "r", encoding="utf8") as f:
mock_psbt = f.read()
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
import os
import sys
import argparse
import json
Reported by Pylint.
Line: 11
Column: 1
import argparse
import json
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
Reported by Pylint.
Line: 13
Column: 1
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
Reported by Pylint.
Line: 14
Column: 62
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:
sys.stdout.write(mock_result[2:])
sys.exit(int(mock_result[0]))
Reported by Pylint.
test/functional/rpc_whitelist.py
22 issues
Line: 39
Column: 9
# 1 => Password (Hashed)
# 2 => Permissions
# 3 => Password Plaintext
self.users = [
["user1", "50358aa884c841648e0700b073c32b2e$b73e95fff0748cc0b517859d2ca47d9bac1aa78231f3e48fa9222b612bd2083e", "getbestblockhash,getblockcount,", "12345"],
["user2", "8650ba41296f62092377a38547f361de$4620db7ba063ef4e2f7249853e9f3c5c3592a9619a759e3e6f1c63f2e22f1d21", "getblockcount", "54321"]
]
# For exceptions
self.strange_users = [
Reported by Pylint.
Line: 44
Column: 9
["user2", "8650ba41296f62092377a38547f361de$4620db7ba063ef4e2f7249853e9f3c5c3592a9619a759e3e6f1c63f2e22f1d21", "getblockcount", "54321"]
]
# For exceptions
self.strange_users = [
# Test empty
["strangedude", "62d67dffec03836edd698314f1b2be62$c2fb4be29bb0e3646298661123cf2d8629640979cabc268ef05ea613ab54068d", ":", "s7R4nG3R7H1nGZ"],
["strangedude2", "575c012c7fe4b1e83b9d809412da3ef7$09f448d0acfc19924dd62ecb96004d3c2d4b91f471030dfe43c6ea64a8f658c1", "", "s7R4nG3R7H1nGZ"],
# Test trailing comma
["strangedude3", "23189c561b5975a56f4cf94030495d61$3a2f6aac26351e2257428550a553c4c1979594e36675bbd3db692442387728c0", ":getblockcount,", "s7R4nG3R7H1nGZ"],
Reported by Pylint.
Line: 57
Column: 9
["strangedude5", "d12c6e962d47a454f962eb41225e6ec8$2dd39635b155536d3c1a2e95d05feff87d5ba55f2d5ff975e6e997a836b717c9", ":getblockcount,getblockcount", "s7R4nG3R7H1nGZ"]
]
# These commands shouldn't be allowed for any user to test failures
self.never_allowed = ["getnetworkinfo"]
with open(os.path.join(get_datadir_path(self.options.tmpdir, 0), "bitcoin.conf"), 'a', encoding='utf8') as f:
f.write("\nrpcwhitelistdefault=0\n")
for user in self.users:
f.write("rpcauth=" + user[0] + ":" + user[1] + "\n")
f.write("rpcwhitelist=" + user[0] + ":" + user[2] + "\n")
Reported by Pylint.
Line: 80
Column: 17
i += 1
for permission in permissions:
self.log.info("[" + user[0] + "]: Testing a permitted permission (" + permission + ")")
assert_equal(200, rpccall(self.nodes[0], user, permission).status)
for permission in self.never_allowed:
self.log.info("[" + user[0] + "]: Testing a non permitted permission (" + permission + ")")
assert_equal(403, rpccall(self.nodes[0], user, permission).status)
# Now test the strange users
Reported by Pylint.
Line: 83
Column: 17
self.log.info("[" + user[0] + "]: Testing a permitted permission (" + permission + ")")
assert_equal(200, rpccall(self.nodes[0], user, permission).status)
for permission in self.never_allowed:
self.log.info("[" + user[0] + "]: Testing a non permitted permission (" + permission + ")")
assert_equal(403, rpccall(self.nodes[0], user, permission).status)
# Now test the strange users
for permission in self.never_allowed:
self.log.info("Strange test 1")
assert_equal(403, rpccall(self.nodes[0], self.strange_users[0], permission).status)
Reported by Pylint.
Line: 9
Column: 1
A test for RPC users with restricted permissions
"""
from test_framework.test_framework import BitcoinTestFramework
import os
from test_framework.util import (
get_datadir_path,
assert_equal,
str_to_b64str
)
Reported by Pylint.
Line: 15
Column: 1
assert_equal,
str_to_b64str
)
import http.client
import urllib.parse
def rpccall(node, user, method):
url = urllib.parse.urlparse(node.url)
headers = {"Authorization": "Basic " + str_to_b64str('{}:{}'.format(user[0], user[3]))}
Reported by Pylint.
Line: 16
Column: 1
str_to_b64str
)
import http.client
import urllib.parse
def rpccall(node, user, method):
url = urllib.parse.urlparse(node.url)
headers = {"Authorization": "Basic " + str_to_b64str('{}:{}'.format(user[0], user[3]))}
conn = http.client.HTTPConnection(url.hostname, url.port)
Reported by Pylint.
Line: 18
Column: 1
import http.client
import urllib.parse
def rpccall(node, user, method):
url = urllib.parse.urlparse(node.url)
headers = {"Authorization": "Basic " + str_to_b64str('{}:{}'.format(user[0], user[3]))}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "' + method + '"}', headers)
Reported by Pylint.
Line: 29
Column: 1
return resp
class RPCWhitelistTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
def setup_chain(self):
super().setup_chain()
Reported by Pylint.
contrib/message-capture/message-capture-parser.py
22 issues
Line: 18
Column: 1
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
from test_framework.messages import ser_uint256 # noqa: E402
from test_framework.p2p import MESSAGEMAP # noqa: E402
TIME_SIZE = 8
LENGTH_SIZE = 4
MSGTYPE_SIZE = 12
Reported by Pylint.
Line: 19
Column: 1
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
from test_framework.messages import ser_uint256 # noqa: E402
from test_framework.p2p import MESSAGEMAP # noqa: E402
TIME_SIZE = 8
LENGTH_SIZE = 4
MSGTYPE_SIZE = 12
Reported by Pylint.
Line: 146
Column: 13
try:
msg.deserialize(msg_ser)
except KeyboardInterrupt:
raise
except Exception:
# Unable to deserialize message body
msg_ser.seek(0, os.SEEK_SET)
msg_dict["body"] = msg_ser.read().hex()
Reported by Pylint.
Line: 148
Column: 20
msg.deserialize(msg_ser)
except KeyboardInterrupt:
raise
except Exception:
# Unable to deserialize message body
msg_ser.seek(0, os.SEEK_SET)
msg_dict["body"] = msg_ser.read().hex()
msg_dict["error"] = "Unable to deserialize message."
messages.append(msg_dict)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Parse message capture binary files. To be used in conjunction with -capturemessages."""
import argparse
import os
import shutil
Reported by Pylint.
Line: 18
Column: 1
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
from test_framework.messages import ser_uint256 # noqa: E402
from test_framework.p2p import MESSAGEMAP # noqa: E402
TIME_SIZE = 8
LENGTH_SIZE = 4
MSGTYPE_SIZE = 12
Reported by Pylint.
Line: 19
Column: 1
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
from test_framework.messages import ser_uint256 # noqa: E402
from test_framework.p2p import MESSAGEMAP # noqa: E402
TIME_SIZE = 8
LENGTH_SIZE = 4
MSGTYPE_SIZE = 12
Reported by Pylint.
Line: 27
Column: 1
# The test framework classes stores hashes as large ints in many cases.
# These are variables of type uint256 in core.
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
# As such, they are itemized here.
# Any variables with these names that are of type int are actually uint256 variables.
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
HASH_INTS = [
"blockhash",
Reported by Pylint.
Line: 30
Column: 1
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
# As such, they are itemized here.
# Any variables with these names that are of type int are actually uint256 variables.
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
HASH_INTS = [
"blockhash",
"block_hash",
"hash",
"hashMerkleRoot",
Reported by Pylint.
Line: 51
Column: 1
]
class ProgressBar:
def __init__(self, total: float):
self.total = total
self.running = 0
def set_progress(self, progress: float):
Reported by Pylint.
test/functional/interface_http.py
21 issues
Line: 10
Column: 1
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, str_to_b64str
import http.client
import urllib.parse
class HTTPBasicsTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 3
Reported by Pylint.
Line: 11
Column: 1
from test_framework.util import assert_equal, str_to_b64str
import http.client
import urllib.parse
class HTTPBasicsTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 3
self.supports_cli = False
Reported by Pylint.
Line: 13
Column: 1
import http.client
import urllib.parse
class HTTPBasicsTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 3
self.supports_cli = False
def setup_network(self):
Reported by Pylint.
Line: 21
Column: 5
def setup_network(self):
self.setup_nodes()
def run_test(self):
#################################################
# lowlevel check for http persistent connection #
#################################################
url = urllib.parse.urlparse(self.nodes[0].url)
Reported by Pylint.
Line: 34
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1
assert conn.sock is not None #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
Reported by Bandit.
Line: 35
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1
assert conn.sock is not None #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1 #must also response with a correct json-rpc message
Reported by Bandit.
Line: 40
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1 #must also response with a correct json-rpc message
assert conn.sock is not None #according to http/1.1 connection must still be open!
conn.close()
#same should be if we add keep-alive because this should be the std. behaviour
headers = {"Authorization": "Basic " + str_to_b64str(authpair), "Connection": "keep-alive"}
Reported by Bandit.
Line: 41
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1 #must also response with a correct json-rpc message
assert conn.sock is not None #according to http/1.1 connection must still be open!
conn.close()
#same should be if we add keep-alive because this should be the std. behaviour
headers = {"Authorization": "Basic " + str_to_b64str(authpair), "Connection": "keep-alive"}
Reported by Bandit.
Line: 51
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1
assert conn.sock is not None #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
Reported by Bandit.
Line: 52
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1
assert conn.sock is not None #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert b'"error":null' in out1 #must also response with a correct json-rpc message
Reported by Bandit.
test/functional/rpc_getblockstats.py
21 issues
Line: 60
Column: 9
self.sync_all()
self.nodes[0].generate(1)
self.expected_stats = self.get_stats()
blocks = []
tip = self.nodes[0].getbestblockhash()
blockhash = None
height = 0
Reported by Pylint.
Line: 84
Column: 13
d = json.load(f)
blocks = d['blocks']
mocktime = d['mocktime']
self.expected_stats = d['stats']
# Set the timestamps from the file so that the nodes can get out of Initial Block Download
self.nodes[0].setmocktime(mocktime)
self.sync_all()
Reported by Pylint.
Line: 112
Column: 13
assert_equal(stats[self.max_stat_pos]['height'], self.start_height + self.max_stat_pos)
for i in range(self.max_stat_pos+1):
self.log.info('Checking block %d\n' % (i))
assert_equal(stats[i], self.expected_stats[i])
# Check selecting block by hash too
blockhash = self.expected_stats[i]['blockhash']
stats_by_hash = self.nodes[0].getblockstats(hash_or_height=blockhash)
Reported by Pylint.
Line: 126
Column: 21
result = self.nodes[0].getblockstats(hash_or_height=self.start_height + i, stats=[stat])
assert_equal(list(result.keys()), [stat])
if result[stat] != self.expected_stats[i][stat]:
self.log.info('result[%s] (%d) failed, %r != %r' % (
stat, i, result[stat], self.expected_stats[i][stat]))
assert_equal(result[stat], self.expected_stats[i][stat])
# Make sure only the selected statistics are included (more than one)
some_stats = {'minfee', 'maxfee'}
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
# Copyright (c) 2017-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test getblockstats rpc call
#
Reported by Pylint.
Line: 16
Column: 1
assert_equal,
assert_raises_rpc_error,
)
import json
import os
TESTSDIR = os.path.dirname(os.path.realpath(__file__))
class GetblockstatsTest(BitcoinTestFramework):
Reported by Pylint.
Line: 17
Column: 1
assert_raises_rpc_error,
)
import json
import os
TESTSDIR = os.path.dirname(os.path.realpath(__file__))
class GetblockstatsTest(BitcoinTestFramework):
Reported by Pylint.
Line: 21
Column: 1
TESTSDIR = os.path.dirname(os.path.realpath(__file__))
class GetblockstatsTest(BitcoinTestFramework):
start_height = 101
max_stat_pos = 2
def add_options(self, parser):
Reported by Pylint.
Line: 40
Column: 5
self.setup_clean_chain = True
self.supports_cli = False
def get_stats(self):
return [self.nodes[0].getblockstats(hash_or_height=self.start_height + i) for i in range(self.max_stat_pos+1)]
def generate_test_data(self, filename):
mocktime = 1525107225
self.nodes[0].setmocktime(mocktime)
Reported by Pylint.
Line: 41
Column: 1
self.supports_cli = False
def get_stats(self):
return [self.nodes[0].getblockstats(hash_or_height=self.start_height + i) for i in range(self.max_stat_pos+1)]
def generate_test_data(self, filename):
mocktime = 1525107225
self.nodes[0].setmocktime(mocktime)
self.nodes[0].generate(COINBASE_MATURITY + 1)
Reported by Pylint.
test/functional/feature_coinstatsindex.py
21 issues
Line: 37
Column: 1
assert_raises_rpc_error,
)
class CoinStatsIndexTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.supports_cli = False
self.extra_args = [
Reported by Pylint.
Line: 56
Column: 5
self._test_reorg_index()
self._test_index_rejects_hash_serialized()
def block_sanity_check(self, block_info):
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
Reported by Pylint.
Line: 56
Column: 5
self._test_reorg_index()
self._test_index_rejects_hash_serialized()
def block_sanity_check(self, block_info):
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
Reported by Pylint.
Line: 60
Column: 1
block_subsidy = 50
assert_equal(
block_info['prevout_spent'] + block_subsidy,
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
Reported by Pylint.
Line: 63
Column: 5
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
# Both none and muhash options allow the usage of the index
index_hash_options = ['none', 'muhash']
Reported by Pylint.
Line: 63
Column: 5
block_info['new_outputs_ex_coinbase'] + block_info['coinbase'] + block_info['unspendable']
)
def _test_coin_stats_index(self):
node = self.nodes[0]
index_node = self.nodes[1]
# Both none and muhash options allow the usage of the index
index_hash_options = ['none', 'muhash']
Reported by Pylint.
Line: 77
Column: 1
self.sync_blocks(timeout=120)
self.log.info("Test that gettxoutsetinfo() output is consistent with or without coinstatsindex option")
res0 = node.gettxoutsetinfo('none')
# The fields 'disk_size' and 'transactions' do not exist on the index
del res0['disk_size'], res0['transactions']
Reported by Pylint.
Line: 92
Column: 1
# Everything left should be the same
assert_equal(res1, res0)
self.log.info("Test that gettxoutsetinfo() can get fetch data on specific heights with index")
# Generate a new tip
node.generate(5)
for hash_option in index_hash_options:
Reported by Pylint.
Line: 111
Column: 1
assert_equal(res0, res3)
# It does not work without coinstatsindex
assert_raises_rpc_error(-8, "Querying specific block heights requires coinstatsindex", node.gettxoutsetinfo, hash_option, 102)
self.log.info("Test gettxoutsetinfo() with index and verbose flag")
for hash_option in index_hash_options:
# Genesis block is unspendable
Reported by Pylint.
Line: 162
Column: 17
tx1_final = self.nodes[0].gettransaction(tx1_txid)
for output in tx1_final['details']:
if output['amount'] == Decimal('21.00000000') and output['category'] == 'receive':
n = output['vout']
# Generate and send another tx with an OP_RETURN output (which is unspendable)
tx2 = CTransaction()
tx2.vin.append(CTxIn(COutPoint(int(tx1_txid, 16), n), b''))
tx2.vout.append(CTxOut(int(20.99 * COIN), CScript([OP_RETURN] + [OP_FALSE]*30)))
Reported by Pylint.