The following issues were found
mitmproxy/addons/export.py
42 issues
Line: 4
Column: 1
import shlex
import typing
import pyperclip
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx, http
from mitmproxy import exceptions
Reported by Pylint.
Line: 50
Column: 9
except ValueError:
# shlex.quote doesn't support a bytes object
# see https://github.com/python/cpython/pull/10871
raise exceptions.CommandError("Request content must be valid unicode")
escape_control_chars = {chr(i): f"\\x{i:02x}" for i in range(32)}
return "".join(
escape_control_chars.get(x, x)
for x in text
)
Reported by Pylint.
Line: 90
Column: 3
request = cleanup_request(f)
request = pop_headers(request)
# TODO: Once https://github.com/httpie/httpie/issues/414 is implemented, we
# should ensure we always connect to the IP address specified in the flow,
# similar to how it's done in curl_command.
url = request.pretty_url
args = ["http", request.method, url]
Reported by Pylint.
Line: 163
Column: 33
return list(sorted(formats.keys()))
@command.command("export.file")
def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
if format not in formats:
raise exceptions.CommandError("No such export format: %s" % format)
Reported by Pylint.
Line: 163
Column: 20
return list(sorted(formats.keys()))
@command.command("export.file")
def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
if format not in formats:
raise exceptions.CommandError("No such export format: %s" % format)
Reported by Pylint.
Line: 181
Column: 20
ctx.log.error(str(e))
@command.command("export.clip")
def clip(self, format: str, f: flow.Flow) -> None:
"""
Export a flow to the system clipboard.
"""
if format not in formats:
raise exceptions.CommandError("No such export format: %s" % format)
Reported by Pylint.
Line: 1
Column: 1
import shlex
import typing
import pyperclip
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx, http
from mitmproxy import exceptions
Reported by Pylint.
Line: 15
Column: 1
from mitmproxy.utils import strutils
def cleanup_request(f: flow.Flow) -> http.Request:
if not getattr(f, "request", None):
raise exceptions.CommandError("Can't export flow with no request.")
assert isinstance(f, http.HTTPFlow)
request = f.request.copy()
request.decode(strict=False)
Reported by Pylint.
Line: 15
Column: 1
from mitmproxy.utils import strutils
def cleanup_request(f: flow.Flow) -> http.Request:
if not getattr(f, "request", None):
raise exceptions.CommandError("Can't export flow with no request.")
assert isinstance(f, http.HTTPFlow)
request = f.request.copy()
request.decode(strict=False)
Reported by Pylint.
Line: 18
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def cleanup_request(f: flow.Flow) -> http.Request:
if not getattr(f, "request", None):
raise exceptions.CommandError("Can't export flow with no request.")
assert isinstance(f, http.HTTPFlow)
request = f.request.copy()
request.decode(strict=False)
return request
Reported by Bandit.
mitmproxy/command.py
42 issues
Line: 28
Column: 13
"""
Translates a type to an explanatory string.
"""
if t == inspect._empty: # type: ignore
raise exceptions.CommandError("missing type annotation")
to = mitmproxy.types.CommandTypes.get(t, None)
if not to:
raise exceptions.CommandError("unsupported type: %s" % getattr(t, "__name__", t))
return to.display
Reported by Pylint.
Line: 45
Column: 11
class CommandParameter(typing.NamedTuple):
name: str
type: typing.Type
kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD
def __str__(self):
if self.kind is inspect.Parameter.VAR_POSITIONAL:
return f"*{self.name}"
else:
Reported by Pylint.
Line: 106
Column: 13
except TypeError:
expected = f'Expected: {str(self.signature.parameters)}'
received = f'Received: {str(args)}'
raise exceptions.CommandError(f"Command argument mismatch: \n {expected}\n {received}")
for name, value in bound_arguments.arguments.items():
param = self.signature.parameters[name]
convert_to = param.annotation
if param.kind == param.VAR_POSITIONAL:
Reported by Pylint.
Line: 161
Column: 24
try:
# hasattr is not enough, see https://github.com/mitmproxy/mitmproxy/issues/3794
is_command = isinstance(getattr(o, "command_name", None), str)
except Exception:
pass # getattr may raise if o implements __getattr__.
else:
if is_command:
try:
self.add(o.command_name, o)
Reported by Pylint.
Line: 311
Column: 20
return decorator
def argument(name, type):
"""
Set the type of a command argument at runtime. This is useful for more
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
Reported by Pylint.
Line: 16
Column: 1
from mitmproxy.command_lexer import unquote
def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
except TypeError as v:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
Reported by Pylint.
Line: 16
Column: 1
from mitmproxy.command_lexer import unquote
def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
except TypeError as v:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
Reported by Pylint.
Line: 16
Column: 1
from mitmproxy.command_lexer import unquote
def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
except TypeError as v:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
Reported by Pylint.
Line: 20
Column: 5
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
except TypeError as v:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
def typename(t: type) -> str:
"""
Reported by Pylint.
Line: 24
Column: 1
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
def typename(t: type) -> str:
"""
Translates a type to an explanatory string.
"""
if t == inspect._empty: # type: ignore
raise exceptions.CommandError("missing type annotation")
Reported by Pylint.
mitmproxy/tools/console/commander/commander.py
41 issues
Line: 4
Column: 1
import abc
import typing
import urwid
from urwid.text_layout import calc_coords
import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
Reported by Pylint.
Line: 5
Column: 1
import typing
import urwid
from urwid.text_layout import calc_coords
import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
import mitmproxy.types
Reported by Pylint.
Line: 172
Column: 24
self.cbuf = CommandBuffer(master, text)
self.update()
def keypress(self, size, key) -> None:
if key == "delete":
self.cbuf.delete()
elif key == "ctrl a" or key == 'home':
self.cbuf.cursor = 0
elif key == "ctrl e" or key == 'end':
Reported by Pylint.
Line: 241
Column: 28
def update(self) -> None:
self._w.set_text([self.leader, self.cbuf.render()])
def render(self, size, focus=False) -> urwid.Canvas:
(maxcol,) = size
canv = self._w.render((maxcol,))
canv = urwid.CompositeCanvas(canv)
canv.cursor = self.get_cursor_coords((maxcol,))
return canv
Reported by Pylint.
Line: 1
Column: 1
import abc
import typing
import urwid
from urwid.text_layout import calc_coords
import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
Reported by Pylint.
Line: 13
Column: 1
import mitmproxy.types
class Completer:
@abc.abstractmethod
def cycle(self, forward: bool = True) -> str:
raise NotImplementedError()
Reported by Pylint.
Line: 13
Column: 1
import mitmproxy.types
class Completer:
@abc.abstractmethod
def cycle(self, forward: bool = True) -> str:
raise NotImplementedError()
Reported by Pylint.
Line: 15
Column: 5
class Completer:
@abc.abstractmethod
def cycle(self, forward: bool = True) -> str:
raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
Reported by Pylint.
Line: 19
Column: 1
raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
self,
start: str,
options: typing.Sequence[str],
) -> None:
Reported by Pylint.
Line: 19
Column: 1
raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
self,
start: str,
options: typing.Sequence[str],
) -> None:
Reported by Pylint.
test/mitmproxy/test_tcp.py
41 issues
Line: 1
Column: 1
import pytest
from mitmproxy import tcp
from mitmproxy import flowfilter
from mitmproxy.test import tflow
class TestTCPFlow:
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy import tcp
from mitmproxy import flowfilter
from mitmproxy.test import tflow
class TestTCPFlow:
Reported by Pylint.
Line: 8
Column: 1
from mitmproxy.test import tflow
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
Reported by Pylint.
Line: 10
Column: 5
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
Reported by Pylint.
Line: 10
Column: 5
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
Reported by Pylint.
Line: 11
Column: 9
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
Reported by Pylint.
Line: 13
Column: 9
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
Reported by Pylint.
Line: 14
Column: 9
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
assert not f == f2
Reported by Pylint.
Line: 15
Column: 9
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
assert not f == f2
assert f is not f2
Reported by Pylint.
Line: 18
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.messages is not f2.messages
Reported by Bandit.
test/full_coverage_plugin.py
41 issues
Line: 3
Column: 1
import os
import configparser
import pytest
import sys
here = os.path.abspath(os.path.dirname(__file__))
enable_coverage = False
Reported by Pylint.
Line: 30
Column: 5
def pytest_configure(config):
global enable_coverage
global no_full_cov
enable_coverage = (
config.getoption('file_or_dir') and len(config.getoption('file_or_dir')) == 0 and
config.getoption('full_cov') and len(config.getoption('full_cov')) > 0 and
Reported by Pylint.
Line: 31
Column: 5
def pytest_configure(config):
global enable_coverage
global no_full_cov
enable_coverage = (
config.getoption('file_or_dir') and len(config.getoption('file_or_dir')) == 0 and
config.getoption('full_cov') and len(config.getoption('full_cov')) > 0 and
config.pluginmanager.getplugin("_cov") is not None and
Reported by Pylint.
Line: 49
Column: 5
@pytest.hookimpl(hookwrapper=True)
def pytest_runtestloop(session):
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
if not enable_coverage:
Reported by Pylint.
Line: 50
Column: 5
@pytest.hookimpl(hookwrapper=True)
def pytest_runtestloop(session):
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
if not enable_coverage:
yield
Reported by Pylint.
Line: 51
Column: 5
def pytest_runtestloop(session):
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
if not enable_coverage:
yield
return
Reported by Pylint.
Line: 52
Column: 5
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
if not enable_coverage:
yield
return
Reported by Pylint.
Line: 86
Column: 9
overall = cov.report(files, ignore_errors=True, file=null)
singles = [(s, cov.report(s, ignore_errors=True, file=null)) for s in files]
coverage_values[name] = (overall, singles)
except:
pass
if any(v < 100 for v, _ in coverage_values.values()):
# make sure we get the EXIT_TESTSFAILED exit code
session.testsfailed += 1
Reported by Pylint.
Line: 95
Column: 47
coverage_passed = False
def pytest_terminal_summary(terminalreporter, exitstatus, config):
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
Reported by Pylint.
Line: 96
Column: 5
def pytest_terminal_summary(terminalreporter, exitstatus, config):
global enable_coverage
global coverage_values
global coverage_passed
global no_full_cov
if not enable_coverage:
Reported by Pylint.
test/mitmproxy/net/http/http1/test_assemble.py
40 issues
Line: 1
Column: 1
import pytest
from mitmproxy.http import Headers
from mitmproxy.net.http.http1.assemble import (
assemble_request, assemble_request_head, assemble_response,
assemble_response_head, _assemble_request_line, _assemble_request_headers,
_assemble_response_headers,
assemble_body)
from mitmproxy.test.tutils import treq, tresp
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.http import Headers
from mitmproxy.net.http.http1.assemble import (
assemble_request, assemble_request_head, assemble_response,
assemble_response_head, _assemble_request_line, _assemble_request_headers,
_assemble_response_headers,
assemble_body)
from mitmproxy.test.tutils import treq, tresp
Reported by Pylint.
Line: 12
Column: 1
from mitmproxy.test.tutils import treq, tresp
def test_assemble_request():
assert assemble_request(treq()) == (
b"GET /path HTTP/1.1\r\n"
b"header: qvalue\r\n"
b"content-length: 7\r\n"
b"\r\n"
Reported by Pylint.
Line: 13
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_assemble_request():
assert assemble_request(treq()) == (
b"GET /path HTTP/1.1\r\n"
b"header: qvalue\r\n"
b"content-length: 7\r\n"
b"\r\n"
b"content"
Reported by Bandit.
Line: 25
Column: 1
assemble_request(treq(content=None))
def test_assemble_request_head():
c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
Reported by Pylint.
Line: 26
Column: 5
def test_assemble_request_head():
c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
Reported by Pylint.
Line: 27
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_assemble_request_head():
c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
Reported by Bandit.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_assemble_request_head():
c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
def test_assemble_response():
Reported by Bandit.
Line: 29
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
def test_assemble_response():
assert assemble_response(tresp()) == (
Reported by Bandit.
Line: 30
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
assert b"foo" not in c
def test_assemble_response():
assert assemble_response(tresp()) == (
b"HTTP/1.1 200 OK\r\n"
Reported by Bandit.
mitmproxy/proxy/layers/tls.py
40 issues
Line: 157
Column: 24
tls: SSL.Connection = None # type: ignore
"""The OpenSSL connection object"""
def __init__(self, context: context.Context, conn: connection.Connection):
super().__init__(
context,
tunnel_connection=conn,
conn=conn,
)
Reported by Pylint.
Line: 205
Column: 33
# provide more detailed information for some errors.
last_err = e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
if last_err == ('SSL routines', 'tls_process_server_certificate', 'certificate verify failed'):
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode() # type: ignore
err = f"Certificate verify failed: {error}"
elif last_err in [
('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'),
('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')
Reported by Pylint.
Line: 205
Column: 64
# provide more detailed information for some errors.
last_err = e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
if last_err == ('SSL routines', 'tls_process_server_certificate', 'certificate verify failed'):
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode() # type: ignore
err = f"Certificate verify failed: {error}"
elif last_err in [
('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'),
('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')
Reported by Pylint.
Line: 206
Column: 25
last_err = e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
if last_err == ('SSL routines', 'tls_process_server_certificate', 'certificate verify failed'):
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode() # type: ignore
err = f"Certificate verify failed: {error}"
elif last_err in [
('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'),
('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')
]:
Reported by Pylint.
Line: 206
Column: 41
last_err = e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
if last_err == ('SSL routines', 'tls_process_server_certificate', 'certificate verify failed'):
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode() # type: ignore
err = f"Certificate verify failed: {error}"
elif last_err in [
('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'),
('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')
]:
Reported by Pylint.
Line: 215
Column: 23
assert isinstance(last_err, tuple)
err = last_err[2]
elif last_err == ('SSL routines', 'ssl3_get_record', 'wrong version number') and data[:4].isascii():
err = f"The remote server does not speak TLS."
else: # pragma: no cover
# TODO: Add test case once we find one.
err = f"OpenSSL {e!r}"
self.conn.error = err
return False, err
Reported by Pylint.
Line: 217
Column: 3
elif last_err == ('SSL routines', 'ssl3_get_record', 'wrong version number') and data[:4].isascii():
err = f"The remote server does not speak TLS."
else: # pragma: no cover
# TODO: Add test case once we find one.
err = f"OpenSSL {e!r}"
self.conn.error = err
return False, err
else:
# Here we set all attributes that are only known *after* the handshake.
Reported by Pylint.
Line: 297
Column: 24
"""
wait_for_clienthello: bool = False
def __init__(self, context: context.Context, conn: Optional[connection.Server] = None):
super().__init__(context, conn or context.server)
def start_handshake(self) -> layer.CommandGenerator[None]:
wait_for_clienthello = (
# if command_to_reply_to is set, we've been instructed to open the connection from the child layer.
Reported by Pylint.
Line: 355
Column: 24
server_tls_available: bool
client_hello_parsed: bool = False
def __init__(self, context: context.Context):
if context.client.tls:
# In the case of TLS-over-TLS, we already have client TLS. As the outer TLS connection between client
# and proxy isn't that interesting to us, we just unset the attributes here and keep the inner TLS
# session's attributes.
# Alternatively we could create a new Client instance,
Reported by Pylint.
Line: 1
Column: 1
import struct
import time
from dataclasses import dataclass
from typing import Iterator, Literal, Optional, Tuple
from OpenSSL import SSL
from mitmproxy import certs, connection
from mitmproxy.net import tls as net_tls
from mitmproxy.proxy import commands, events, layer, tunnel
Reported by Pylint.
test/mitmproxy/addons/test_clientplayback.py
40 issues
Line: 4
Column: 1
import asyncio
from contextlib import asynccontextmanager
import pytest
from mitmproxy.addons.clientplayback import ClientPlayback, ReplayHandler
from mitmproxy.addons.proxyserver import Proxyserver
from mitmproxy.exceptions import CommandError, OptionsError
from mitmproxy.connection import Address
Reported by Pylint.
Line: 109
Column: 27
@pytest.mark.asyncio
async def test_start_stop(tdata):
cp = ClientPlayback()
with taddons.context(cp) as tctx:
cp.start_replay([tflow.tflow()])
assert cp.count() == 1
Reported by Pylint.
Line: 1
Column: 1
import asyncio
from contextlib import asynccontextmanager
import pytest
from mitmproxy.addons.clientplayback import ClientPlayback, ReplayHandler
from mitmproxy.addons.proxyserver import Proxyserver
from mitmproxy.exceptions import CommandError, OptionsError
from mitmproxy.connection import Address
Reported by Pylint.
Line: 14
Column: 1
@asynccontextmanager
async def tcp_server(handle_conn) -> Address:
server = await asyncio.start_server(handle_conn, '127.0.0.1', 0)
await server.start_serving()
try:
yield server.sockets[0].getsockname()
finally:
Reported by Pylint.
Line: 25
Column: 1
@pytest.mark.asyncio
@pytest.mark.parametrize("mode", ["regular", "upstream", "err"])
async def test_playback(mode):
handler_ok = asyncio.Event()
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
if mode == "err":
writer.close()
Reported by Pylint.
Line: 35
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
return
if mode == "upstream":
conn_req = await reader.readuntil(b"\r\n\r\n")
assert conn_req == b'CONNECT address:22 HTTP/1.1\r\n\r\n'
writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n")
req = await reader.readuntil(b"data")
assert req == (
b'GET /path HTTP/1.1\r\n'
b'header: qvalue\r\n'
Reported by Bandit.
Line: 38
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert conn_req == b'CONNECT address:22 HTTP/1.1\r\n\r\n'
writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n")
req = await reader.readuntil(b"data")
assert req == (
b'GET /path HTTP/1.1\r\n'
b'header: qvalue\r\n'
b'content-length: 4\r\n'
b'\r\n'
b'data'
Reported by Bandit.
Line: 47
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
)
writer.write(b"HTTP/1.1 204 No Content\r\n\r\n")
await writer.drain()
assert not await reader.read()
handler_ok.set()
cp = ClientPlayback()
ps = Proxyserver()
with taddons.context(cp, ps) as tctx:
Reported by Bandit.
Line: 50
Column: 5
assert not await reader.read()
handler_ok.set()
cp = ClientPlayback()
ps = Proxyserver()
with taddons.context(cp, ps) as tctx:
async with tcp_server(handler) as addr:
cp.running()
Reported by Pylint.
Line: 51
Column: 5
handler_ok.set()
cp = ClientPlayback()
ps = Proxyserver()
with taddons.context(cp, ps) as tctx:
async with tcp_server(handler) as addr:
cp.running()
flow = tflow.tflow()
Reported by Pylint.
mitmproxy/contentviews/xml_html.py
40 issues
Line: 110
Column: 16
token = Tag("")
elif isinstance(token, Tag):
token.data += readuntil(">", i, 1)
if token.done:
yield token
token = Text("")
if token.data.strip():
yield token
Reported by Pylint.
Line: 9
Column: 1
from mitmproxy.contentviews import base
from mitmproxy.utils import sliding_window, strutils
"""
A custom XML/HTML prettifier. Compared to other prettifiers, its main features are:
- Implemented in pure Python.
- Modifies whitespace only.
- Works with any input.
Reported by Pylint.
Line: 220
Column: 3
__content_types = ("text/xml", "text/html")
def __call__(self, data, **metadata):
# TODO:
# We should really have the message text as str here,
# not the message content as bytes.
# https://github.com/mitmproxy/mitmproxy/issues/1662#issuecomment-266192578
data = data.decode("utf8", "xmlcharrefreplace")
tokens = tokenize(data)
Reported by Pylint.
Line: 226
Column: 3
# https://github.com/mitmproxy/mitmproxy/issues/1662#issuecomment-266192578
data = data.decode("utf8", "xmlcharrefreplace")
tokens = tokenize(data)
# TODO:
# Performance: Don't render the whole document right away.
# Let's wait with this until we have a sequence-like interface,
# this thing is reasonably fast right now anyway.
pretty = base.format_text(format_xml(tokens))
if "html" in data.lower():
Reported by Pylint.
Line: 237
Column: 1
t = "XML"
return t, pretty
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
if content_type in self.__content_types:
return 1
elif strutils.is_xml(data):
return 0.4
return float(content_type in self.__content_types)
Reported by Pylint.
Line: 1
Column: 1
import io
import re
import textwrap
from typing import Iterable, Optional
from mitmproxy.contentviews import base
from mitmproxy.utils import sliding_window, strutils
"""
Reported by Pylint.
Line: 31
Column: 1
INDENT = 2
class Token:
def __init__(self, data):
self.data = data
def __repr__(self):
return "{}({})".format(
Reported by Pylint.
Line: 31
Column: 1
INDENT = 2
class Token:
def __init__(self, data):
self.data = data
def __repr__(self):
return "{}({})".format(
Reported by Pylint.
Line: 42
Column: 1
)
class Text(Token):
@property
def text(self):
return self.data.strip()
Reported by Pylint.
Line: 42
Column: 1
)
class Text(Token):
@property
def text(self):
return self.data.strip()
Reported by Pylint.
mitmproxy/contrib/wbxml/ASWBXMLByteQueue.py
39 issues
Line: 47
Column: 9
self.bytesEnqueued += 1
logging.debug("Array byte count: %d, enqueued: %d" % (self.qsize(), self.bytesEnqueued))
"""
Created to debug the dequeueing of bytes
"""
def dequeueAndLog(self):
Reported by Pylint.
Line: 49
Column: 5
logging.debug("Array byte count: %d, enqueued: %d" % (self.qsize(), self.bytesEnqueued))
"""
Created to debug the dequeueing of bytes
"""
def dequeueAndLog(self):
singleByte = self.get()
self.bytesDequeued += 1
Reported by Pylint.
Line: 58
Column: 5
logging.debug("Dequeued byte 0x{0:X} ({1} total)".format(singleByte, self.bytesDequeued))
return singleByte
"""
Return true if the continuation bit is set in the byte
"""
def checkContinuationBit(self, byteval):
continuationBitmask = 0x80
return (continuationBitmask & byteval) != 0
Reported by Pylint.
Line: 83
Column: 17
if ( length != None):
currentByte = 0x00
strReturn = ""
for i in range(0, length):
# TODO: Improve this handling. We are technically UTF-8, meaning
# that characters could be more than one byte long. This will fail if we have
# characters outside of the US-ASCII range
if ( self.qsize() == 0 ):
break
Reported by Pylint.
Line: 84
Column: 3
currentByte = 0x00
strReturn = ""
for i in range(0, length):
# TODO: Improve this handling. We are technically UTF-8, meaning
# that characters could be more than one byte long. This will fail if we have
# characters outside of the US-ASCII range
if ( self.qsize() == 0 ):
break
currentByte = self.dequeueAndLog()
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python3
'''
@author: David Shaw, shawd@vmware.com
Inspired by EAS Inspector for Fiddler
https://easinspectorforfiddler.codeplex.com
----- The MIT License (MIT) -----
Filename: ASWBXMLByteQueue.py
Reported by Pylint.
Line: 33
Column: 1
from queue import Queue
import logging
class ASWBXMLByteQueue(Queue):
def __init__(self, wbxmlBytes):
self.bytesDequeued = 0
self.bytesEnqueued = 0
Reported by Pylint.
Line: 37
Column: 9
def __init__(self, wbxmlBytes):
self.bytesDequeued = 0
self.bytesEnqueued = 0
Queue.__init__(self)
for byte in wbxmlBytes:
Reported by Pylint.
Line: 38
Column: 9
def __init__(self, wbxmlBytes):
self.bytesDequeued = 0
self.bytesEnqueued = 0
Queue.__init__(self)
for byte in wbxmlBytes:
self.put(byte)
Reported by Pylint.
Line: 52
Column: 5
"""
Created to debug the dequeueing of bytes
"""
def dequeueAndLog(self):
singleByte = self.get()
self.bytesDequeued += 1
logging.debug("Dequeued byte 0x{0:X} ({1} total)".format(singleByte, self.bytesDequeued))
return singleByte
Reported by Pylint.