The following issues were found
test/mitmproxy/test_addonmanager.py
79 issues
Line: 3
Column: 1
from unittest import mock
import pytest
from mitmproxy import addonmanager
from mitmproxy import addons
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import hooks
Reported by Pylint.
Line: 18
Column: 30
class TAddon:
def __init__(self, name, addons=None):
self.name = name
self.response = True
self.running_called = False
if addons:
self.addons = addons
Reported by Pylint.
Line: 93
Column: 5
f = tflow.tflow()
await a.handle_lifecycle(HttpRequestHook(f))
a._configure_all(o, o.keys())
def test_defaults():
assert addons.default_addons()
Reported by Pylint.
Line: 116
Column: 21
l.add_option("custom_option", bool, True, "help")
assert warn.called
def cmd(a: str) -> str:
return "foo"
l.add_command("test.command", cmd)
Reported by Pylint.
Line: 168
Column: 31
m = master.Master(o)
a = addonmanager.AddonManager(m)
a.add(AOption())
assert "custom_option" in m.options._options
def test_nesting():
o = options.Options()
m = master.Master(o)
Reported by Pylint.
Line: 1
Column: 1
from unittest import mock
import pytest
from mitmproxy import addonmanager
from mitmproxy import addons
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import hooks
Reported by Pylint.
Line: 17
Column: 1
from mitmproxy.test import tflow
class TAddon:
def __init__(self, name, addons=None):
self.name = name
self.response = True
self.running_called = False
if addons:
Reported by Pylint.
Line: 26
Column: 5
self.addons = addons
@command.command("test.command")
def testcommand(self) -> str:
return "here"
def __repr__(self):
return "Addon(%s)" % self.name
Reported by Pylint.
Line: 26
Column: 5
self.addons = addons
@command.command("test.command")
def testcommand(self) -> str:
return "here"
def __repr__(self):
return "Addon(%s)" % self.name
Reported by Pylint.
Line: 32
Column: 5
def __repr__(self):
return "Addon(%s)" % self.name
def done(self):
pass
def running(self):
self.running_called = True
Reported by Pylint.
test/mitmproxy/addons/test_tlsconfig.py
79 issues
Line: 6
Column: 1
from pathlib import Path
from typing import Union
import pytest
from OpenSSL import SSL
from mitmproxy import certs, connection
from mitmproxy.addons import tlsconfig
from mitmproxy.proxy import context
Reported by Pylint.
Line: 25
Column: 82
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"qux"
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b""))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == SSL.NO_OVERLAPPING_PROTOCOLS
# Test that we try to mirror the server connection's ALPN
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=None))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"h2"
Reported by Pylint.
Line: 37
Column: 71
assert tlsconfig.alpn_select_callback(conn, [b"qux", b"h2", b"http/1.1"]) == b"h2"
# Test no overlap
assert tlsconfig.alpn_select_callback(conn, [b"qux", b"quux"]) == SSL.NO_OVERLAPPING_PROTOCOLS
# Test that we don't select an ALPN if the server refused to select one.
conn.set_app_data(tlsconfig.AppData(server_alpn=b"", http2=True, client_alpn=None))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1"]) == SSL.NO_OVERLAPPING_PROTOCOLS
Reported by Pylint.
Line: 41
Column: 67
# Test that we don't select an ALPN if the server refused to select one.
conn.set_app_data(tlsconfig.AppData(server_alpn=b"", http2=True, client_alpn=None))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1"]) == SSL.NO_OVERLAPPING_PROTOCOLS
here = Path(__file__).parent
Reported by Pylint.
Line: 1
Column: 1
import ssl
import time
from pathlib import Path
from typing import Union
import pytest
from OpenSSL import SSL
from mitmproxy import certs, connection
Reported by Pylint.
Line: 14
Column: 1
from mitmproxy.proxy import context
from mitmproxy.proxy.layers import modes, tls
from mitmproxy.test import taddons
from test.mitmproxy.proxy.layers import test_tls
def test_alpn_select_callback():
ctx = SSL.Context(SSL.SSLv23_METHOD)
conn = SSL.Connection(ctx)
Reported by Pylint.
Line: 17
Column: 1
from test.mitmproxy.proxy.layers import test_tls
def test_alpn_select_callback():
ctx = SSL.Context(SSL.SSLv23_METHOD)
conn = SSL.Connection(ctx)
# Test that we respect addons setting `client.alpn`.
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
Reported by Pylint.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# Test that we respect addons setting `client.alpn`.
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"qux"
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b""))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == SSL.NO_OVERLAPPING_PROTOCOLS
# Test that we try to mirror the server connection's ALPN
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=None))
Reported by Bandit.
Line: 25
Column: 1
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"qux"
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b""))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == SSL.NO_OVERLAPPING_PROTOCOLS
# Test that we try to mirror the server connection's ALPN
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=None))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"h2"
Reported by Pylint.
Line: 25
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"qux"
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b""))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == SSL.NO_OVERLAPPING_PROTOCOLS
# Test that we try to mirror the server connection's ALPN
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=None))
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"h2"
Reported by Bandit.
test/mitmproxy/addons/test_next_layer.py
76 issues
Line: 3
Column: 1
from unittest.mock import MagicMock
import pytest
from mitmproxy import connection
from mitmproxy.addons.next_layer import NextLayer
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy import context, layers
from mitmproxy.test import taddons
Reported by Pylint.
Line: 39
Column: 37
def test_configure(self):
nl = NextLayer()
with taddons.context(nl) as tctx:
with pytest.raises(Exception, match="mutually exclusive"):
tctx.configure(nl, allow_hosts=["example.org"], ignore_hosts=["example.com"])
def test_ignore_connection(self):
nl = NextLayer()
Reported by Pylint.
Line: 45
Column: 37
def test_ignore_connection(self):
nl = NextLayer()
with taddons.context(nl) as tctx:
assert not nl.ignore_connection(("example.com", 443), b"")
tctx.configure(nl, ignore_hosts=["example.com"])
assert nl.ignore_connection(("example.com", 443), b"")
assert nl.ignore_connection(("example.com", 1234), b"")
Reported by Pylint.
Line: 70
Column: 37
def test_make_top_layer(self):
nl = NextLayer()
ctx = MagicMock()
with taddons.context(nl) as tctx:
tctx.configure(nl, mode="regular")
assert isinstance(nl.make_top_layer(ctx), layers.modes.HttpProxy)
tctx.configure(nl, mode="transparent")
assert isinstance(nl.make_top_layer(ctx), layers.modes.TransparentProxy)
Reported by Pylint.
Line: 88
Column: 37
ctx = MagicMock()
ctx.client.alpn = None
ctx.server.address = ("example.com", 443)
with taddons.context(nl) as tctx:
ctx.layers = []
assert isinstance(nl._next_layer(ctx, b"", b""), layers.modes.HttpProxy)
assert nl._next_layer(ctx, b"", b"") is None
Reported by Pylint.
Line: 90
Column: 31
ctx.server.address = ("example.com", 443)
with taddons.context(nl) as tctx:
ctx.layers = []
assert isinstance(nl._next_layer(ctx, b"", b""), layers.modes.HttpProxy)
assert nl._next_layer(ctx, b"", b"") is None
tctx.configure(nl, ignore_hosts=["example.com"])
assert isinstance(nl._next_layer(ctx, b"123", b""), layers.TCPLayer)
Reported by Pylint.
Line: 92
Column: 20
ctx.layers = []
assert isinstance(nl._next_layer(ctx, b"", b""), layers.modes.HttpProxy)
assert nl._next_layer(ctx, b"", b"") is None
tctx.configure(nl, ignore_hosts=["example.com"])
assert isinstance(nl._next_layer(ctx, b"123", b""), layers.TCPLayer)
assert nl._next_layer(ctx, client_hello_no_extensions[:10], b"") is None
Reported by Pylint.
Line: 95
Column: 31
assert nl._next_layer(ctx, b"", b"") is None
tctx.configure(nl, ignore_hosts=["example.com"])
assert isinstance(nl._next_layer(ctx, b"123", b""), layers.TCPLayer)
assert nl._next_layer(ctx, client_hello_no_extensions[:10], b"") is None
tctx.configure(nl, ignore_hosts=[])
assert isinstance(nl._next_layer(ctx, client_hello_no_extensions, b""), layers.ServerTLSLayer)
assert isinstance(ctx.layers[-1], layers.ClientTLSLayer)
Reported by Pylint.
Line: 96
Column: 20
tctx.configure(nl, ignore_hosts=["example.com"])
assert isinstance(nl._next_layer(ctx, b"123", b""), layers.TCPLayer)
assert nl._next_layer(ctx, client_hello_no_extensions[:10], b"") is None
tctx.configure(nl, ignore_hosts=[])
assert isinstance(nl._next_layer(ctx, client_hello_no_extensions, b""), layers.ServerTLSLayer)
assert isinstance(ctx.layers[-1], layers.ClientTLSLayer)
Reported by Pylint.
Line: 99
Column: 31
assert nl._next_layer(ctx, client_hello_no_extensions[:10], b"") is None
tctx.configure(nl, ignore_hosts=[])
assert isinstance(nl._next_layer(ctx, client_hello_no_extensions, b""), layers.ServerTLSLayer)
assert isinstance(ctx.layers[-1], layers.ClientTLSLayer)
ctx.layers = []
assert isinstance(nl._next_layer(ctx, b"", b""), layers.modes.HttpProxy)
assert isinstance(nl._next_layer(ctx, client_hello_no_extensions, b""), layers.ClientTLSLayer)
Reported by Pylint.
test/mitmproxy/addons/test_core.py
75 issues
Line: 7
Column: 1
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import exceptions
import pytest
def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
Reported by Pylint.
Line: 185
Column: 27
@mock.patch("mitmproxy.platform.original_addr")
def test_validation_modes(m):
sa = core.Core()
with taddons.context() as tctx:
tctx.configure(sa, mode = "reverse:http://localhost")
with pytest.raises(Exception, match="Invalid server specification"):
tctx.configure(sa, mode = "reverse:")
Reported by Pylint.
Line: 1
Column: 1
from unittest import mock
from mitmproxy.addons import core
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import exceptions
import pytest
Reported by Pylint.
Line: 10
Column: 1
import pytest
def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
Reported by Pylint.
Line: 11
Column: 5
def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
Reported by Pylint.
Line: 13
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
with pytest.raises(exceptions.CommandError):
tctx.command(sa.set, "nonexistent")
Reported by Bandit.
Line: 15
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
with pytest.raises(exceptions.CommandError):
tctx.command(sa.set, "nonexistent")
Reported by Bandit.
Line: 21
Column: 1
tctx.command(sa.set, "nonexistent")
def test_resume():
sa = core.Core()
with taddons.context(loadcore=False):
f = tflow.tflow()
assert not sa.resume([f])
f.intercept()
Reported by Pylint.
Line: 22
Column: 5
def test_resume():
sa = core.Core()
with taddons.context(loadcore=False):
f = tflow.tflow()
assert not sa.resume([f])
f.intercept()
sa.resume([f])
Reported by Pylint.
Line: 24
Column: 9
def test_resume():
sa = core.Core()
with taddons.context(loadcore=False):
f = tflow.tflow()
assert not sa.resume([f])
f.intercept()
sa.resume([f])
assert not f.reply.state == "taken"
Reported by Pylint.
test/mitmproxy/test_flow.py
74 issues
Line: 3
Column: 1
import io
import pytest
import mitmproxy.io
from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy.exceptions import FlowReadException
Reported by Pylint.
Line: 1
Column: 1
import io
import pytest
import mitmproxy.io
from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy.exceptions import FlowReadException
Reported by Pylint.
Line: 15
Column: 1
from mitmproxy.test import taddons, tflow
class State:
def __init__(self):
self.flows = []
def request(self, f):
if f not in self.flows:
Reported by Pylint.
Line: 19
Column: 5
def __init__(self):
self.flows = []
def request(self, f):
if f not in self.flows:
self.flows.append(f)
def response(self, f):
if f not in self.flows:
Reported by Pylint.
Line: 19
Column: 5
def __init__(self):
self.flows = []
def request(self, f):
if f not in self.flows:
self.flows.append(f)
def response(self, f):
if f not in self.flows:
Reported by Pylint.
Line: 23
Column: 5
if f not in self.flows:
self.flows.append(f)
def response(self, f):
if f not in self.flows:
self.flows.append(f)
def websocket_start(self, f):
if f not in self.flows:
Reported by Pylint.
Line: 23
Column: 5
if f not in self.flows:
self.flows.append(f)
def response(self, f):
if f not in self.flows:
self.flows.append(f)
def websocket_start(self, f):
if f not in self.flows:
Reported by Pylint.
Line: 27
Column: 5
if f not in self.flows:
self.flows.append(f)
def websocket_start(self, f):
if f not in self.flows:
self.flows.append(f)
class TestSerialize:
Reported by Pylint.
Line: 27
Column: 5
if f not in self.flows:
self.flows.append(f)
def websocket_start(self, f):
if f not in self.flows:
self.flows.append(f)
class TestSerialize:
Reported by Pylint.
Line: 32
Column: 1
self.flows.append(f)
class TestSerialize:
def test_roundtrip(self):
sio = io.BytesIO()
f = tflow.tflow()
f.marked = ":default:"
Reported by Pylint.
mitmproxy/contrib/kaitaistruct/gif.py
73 issues
Line: 9
Column: 1
from enum import Enum
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 77
Column: 24
@property
def has_color_table(self):
if hasattr(self, '_m_has_color_table'):
return self._m_has_color_table if hasattr(self, '_m_has_color_table') else None
self._m_has_color_table = (self.flags & 128) != 0
return self._m_has_color_table if hasattr(self, '_m_has_color_table') else None
@property
Reported by Pylint.
Line: 85
Column: 24
@property
def color_table_size(self):
if hasattr(self, '_m_color_table_size'):
return self._m_color_table_size if hasattr(self, '_m_color_table_size') else None
self._m_color_table_size = (2 << (self.flags & 7))
return self._m_color_table_size if hasattr(self, '_m_color_table_size') else None
Reported by Pylint.
Line: 111
Column: 24
@property
def has_color_table(self):
if hasattr(self, '_m_has_color_table'):
return self._m_has_color_table if hasattr(self, '_m_has_color_table') else None
self._m_has_color_table = (self.flags & 128) != 0
return self._m_has_color_table if hasattr(self, '_m_has_color_table') else None
@property
Reported by Pylint.
Line: 119
Column: 24
@property
def has_interlace(self):
if hasattr(self, '_m_has_interlace'):
return self._m_has_interlace if hasattr(self, '_m_has_interlace') else None
self._m_has_interlace = (self.flags & 64) != 0
return self._m_has_interlace if hasattr(self, '_m_has_interlace') else None
@property
Reported by Pylint.
Line: 127
Column: 24
@property
def has_sorted_color_table(self):
if hasattr(self, '_m_has_sorted_color_table'):
return self._m_has_sorted_color_table if hasattr(self, '_m_has_sorted_color_table') else None
self._m_has_sorted_color_table = (self.flags & 32) != 0
return self._m_has_sorted_color_table if hasattr(self, '_m_has_sorted_color_table') else None
@property
Reported by Pylint.
Line: 135
Column: 24
@property
def color_table_size(self):
if hasattr(self, '_m_color_table_size'):
return self._m_color_table_size if hasattr(self, '_m_color_table_size') else None
self._m_color_table_size = (2 << (self.flags & 7))
return self._m_color_table_size if hasattr(self, '_m_color_table_size') else None
Reported by Pylint.
Line: 188
Column: 24
@property
def transparent_color_flag(self):
if hasattr(self, '_m_transparent_color_flag'):
return self._m_transparent_color_flag if hasattr(self, '_m_transparent_color_flag') else None
self._m_transparent_color_flag = (self.flags & 1) != 0
return self._m_transparent_color_flag if hasattr(self, '_m_transparent_color_flag') else None
@property
Reported by Pylint.
Line: 196
Column: 24
@property
def user_input_flag(self):
if hasattr(self, '_m_user_input_flag'):
return self._m_user_input_flag if hasattr(self, '_m_user_input_flag') else None
self._m_user_input_flag = (self.flags & 2) != 0
return self._m_user_input_flag if hasattr(self, '_m_user_input_flag') else None
Reported by Pylint.
Line: 3
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
import array
import struct
import zlib
from enum import Enum
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
Reported by Pylint.
mitmproxy/http.py
72 issues
Line: 1221
Column: 31
super().__init__("http", client_conn, server_conn, live)
self.mode = mode
_stateobject_attributes = flow.Flow._stateobject_attributes.copy()
# mypy doesn't support update with kwargs
_stateobject_attributes.update(dict(
request=Request,
response=Response,
websocket=WebSocketData,
Reported by Pylint.
Line: 2
Column: 1
import binascii
import os
import re
import time
import urllib.parse
import json
from dataclasses import dataclass
from dataclasses import fields
from email.utils import formatdate
Reported by Pylint.
Line: 96
Column: 24
- For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
"""
def __init__(self, fields: Iterable[Tuple[bytes, bytes]] = (), **headers):
"""
*Args:*
- *fields:* (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
- *\*\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
Reported by Pylint.
Line: 101
Column: 15
*Args:*
- *fields:* (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
- *\*\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
the behavior is undefined.
Reported by Pylint.
Line: 101
Column: 13
*Args:*
- *fields:* (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
- *\*\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
the behavior is undefined.
Reported by Pylint.
Line: 495
Column: 22
self.headers.pop("content-encoding", None)
self.content = decoded
def encode(self, encoding: str) -> None:
"""
Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
Any existing content-encodings are overwritten, the content is not decoded beforehand.
*Raises:*
Reported by Pylint.
Line: 597
Column: 9
def make(
cls,
method: str,
url: str,
content: Union[bytes, str] = "",
headers: Union[Headers, Dict[Union[str, bytes], Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = ()
) -> "Request":
"""
Simplified API for creating request objects.
Reported by Pylint.
Line: 975
Column: 13
def _set_multipart_form(self, value):
is_valid_content_type = self.headers.get("content-type", "").lower().startswith("multipart/form-data")
if not is_valid_content_type:
"""
Generate a random boundary here.
See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
on generating the boundary.
"""
Reported by Pylint.
Line: 1066
Column: 13
Simplified API for creating response objects.
"""
if isinstance(headers, Headers):
headers = headers
elif isinstance(headers, dict):
headers = Headers(
(always_bytes(k, "utf-8", "surrogateescape"), # type: ignore
always_bytes(v, "utf-8", "surrogateescape"))
for k, v in headers.items()
Reported by Pylint.
Line: 1
Column: 1
import binascii
import os
import re
import time
import urllib.parse
import json
from dataclasses import dataclass
from dataclasses import fields
from email.utils import formatdate
Reported by Pylint.
test/mitmproxy/proxy/test_tutils.py
72 issues
Line: 4
Column: 1
import typing
from dataclasses import dataclass
import pytest
from mitmproxy.proxy import commands, events, layer
from . import tutils
Reported by Pylint.
Line: 7
Column: 1
import pytest
from mitmproxy.proxy import commands, events, layer
from . import tutils
class TEvent(events.Event):
commands: typing.Iterable[typing.Any]
Reported by Pylint.
Line: 45
Column: 17
return tutils.Playbook(TLayer(tctx), expected=[])
def test_simple(tplaybook):
assert (
tplaybook
>> TEvent()
<< TCommand()
>> TEvent([])
Reported by Pylint.
Line: 55
Column: 19
)
def test_mismatch(tplaybook):
with pytest.raises(AssertionError, match="Playbook mismatch"):
assert (
tplaybook
>> TEvent([])
<< TCommand()
Reported by Pylint.
Line: 64
Column: 25
)
def test_partial_assert(tplaybook):
"""Developers can assert parts of a playbook and the continue later on."""
assert (
tplaybook
>> TEvent()
<< TCommand()
Reported by Pylint.
Line: 80
Column: 22
@pytest.mark.parametrize("typed", [True, False])
def test_placeholder(tplaybook, typed):
"""Developers can specify placeholders for yet unknown attributes."""
if typed:
f = tutils.Placeholder(int)
else:
f = tutils.Placeholder()
Reported by Pylint.
Line: 94
Column: 36
assert f() == 42
def test_placeholder_type_mismatch(tplaybook):
"""Developers can specify placeholders for yet unknown attributes."""
f = tutils.Placeholder(str)
with pytest.raises(TypeError, match="Placeholder type error for TCommand.x: expected str, got int."):
assert (
tplaybook
Reported by Pylint.
Line: 105
Column: 21
)
def test_unfinished(tplaybook):
"""We show a warning when playbooks aren't asserted."""
tplaybook >> TEvent()
with pytest.raises(RuntimeError, match="Unfinished playbook"):
tplaybook.__del__()
tplaybook._errored = True
Reported by Pylint.
Line: 107
Column: 5
def test_unfinished(tplaybook):
"""We show a warning when playbooks aren't asserted."""
tplaybook >> TEvent()
with pytest.raises(RuntimeError, match="Unfinished playbook"):
tplaybook.__del__()
tplaybook._errored = True
tplaybook.__del__()
Reported by Pylint.
Line: 110
Column: 5
tplaybook >> TEvent()
with pytest.raises(RuntimeError, match="Unfinished playbook"):
tplaybook.__del__()
tplaybook._errored = True
tplaybook.__del__()
def test_command_reply(tplaybook):
"""CommandReplies can use relative offsets to point to the matching command."""
Reported by Pylint.
test/mitmproxy/addons/test_maplocal.py
72 issues
Line: 4
Column: 1
import sys
from pathlib import Path
import pytest
from mitmproxy.addons.maplocal import MapLocal, MapLocalSpec, file_candidates
from mitmproxy.utils.spec import parse_spec
from mitmproxy.test import taddons
from mitmproxy.test import tflow
Reported by Pylint.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
"url,spec,expected_candidates",
[
# trailing slashes
("https://example.com/foo", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
Reported by Bandit.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
[
# trailing slashes
("https://example.com/foo", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
Reported by Bandit.
Line: 18
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
# trailing slashes
("https://example.com/foo", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
Reported by Bandit.
Line: 21
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
Reported by Bandit.
Line: 21
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
Reported by Bandit.
Line: 22
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
] + [
Reported by Bandit.
Line: 22
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
] + [
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
] + [
Reported by Bandit.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
] + [
# URL decode and special characters
Reported by Bandit.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
# simple prefixes
("http://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar.jpg?query", ":example.com/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example.com/foo/bar/baz.jpg", ":example.com/foo:/tmp",
["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
("https://example.com/foo/bar.jpg", ":/foo/bar.jpg:/tmp", ["/tmp/index.html"]),
] + [
# URL decode and special characters
Reported by Bandit.
mitmproxy/platform/windows.py
70 issues
Line: 17
Column: 1
import click
import collections
import collections.abc
import pydivert
import pydivert.consts
if typing.TYPE_CHECKING:
class WindowsError(OSError):
@property
Reported by Pylint.
Line: 18
Column: 1
import collections
import collections.abc
import pydivert
import pydivert.consts
if typing.TYPE_CHECKING:
class WindowsError(OSError):
@property
def winerror(self) -> int:
Reported by Pylint.
Line: 335
Column: 5
self.redirect_request = redirect_request
super().__init__(self.handle, filter)
def handle(self, packet):
client = (packet.src_addr, packet.src_port)
if client not in self.tcp_connections:
self.tcp_connections.refresh()
Reported by Pylint.
Line: 578
Column: 25
proxy = TransparentProxy(**options)
proxy.start()
print(f" * Redirection active.")
print(f" Filter: {proxy.request_filter}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print(" * Shutting down...")
Reported by Pylint.
Line: 62
Column: 9
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((REDIRECT_API_HOST, REDIRECT_API_PORT))
self.wfile = self.sock.makefile('wb')
self.rfile = self.sock.makefile('rb')
write(os.getpid(), self.wfile)
def original_addr(self, csock: socket.socket):
ip, port = csock.getpeername()[:2]
Reported by Pylint.
Line: 63
Column: 9
self.sock.connect((REDIRECT_API_HOST, REDIRECT_API_PORT))
self.wfile = self.sock.makefile('wb')
self.rfile = self.sock.makefile('rb')
write(os.getpid(), self.wfile)
def original_addr(self, csock: socket.socket):
ip, port = csock.getpeername()[:2]
ip = re.sub(r"^::ffff:(?=\d+.\d+.\d+.\d+$)", "", ip)
Reported by Pylint.
Line: 281
Column: 9
def __init__(
self,
handle: typing.Callable[[pydivert.Packet], None],
filter: str,
layer: pydivert.Layer = pydivert.Layer.NETWORK,
flags: pydivert.Flag = 0
) -> None:
self.handle = handle
self.windivert = pydivert.WinDivert(filter, layer, flags=flags)
Reported by Pylint.
Line: 328
Column: 9
def __init__(
self,
redirect_request: typing.Callable[[pydivert.Packet], None],
filter: str
) -> None:
self.tcp_connections = TcpConnectionTable()
self.trusted_pids = set()
self.redirect_request = redirect_request
super().__init__(self.handle, filter)
Reported by Pylint.
Line: 432
Column: 9
local: bool = True,
forward: bool = True,
proxy_port: int = 8080,
filter: typing.Optional[str] = "tcp.DstPort == 80 or tcp.DstPort == 443",
) -> None:
self.proxy_port = proxy_port
self.filter = (
filter
or
Reported by Pylint.
Line: 481
Column: 3
@classmethod
def setup(cls):
# TODO: Make sure that server can be killed cleanly. That's a bit difficult as we don't have access to
# controller.should_exit when this is called.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_unavailable = s.connect_ex((REDIRECT_API_HOST, REDIRECT_API_PORT))
if server_unavailable:
proxifier = TransparentProxy()
Reported by Pylint.