The following issues were found
mitmproxy/proxy/layers/tcp.py
8 issues
Line: 1
Column: 1
from dataclasses import dataclass
from typing import Optional
from mitmproxy import flow, tcp
from mitmproxy.proxy import commands, events, layer
from mitmproxy.proxy.commands import StartHook
from mitmproxy.connection import ConnectionState, Connection
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.events import MessageInjected
Reported by Pylint.
Line: 49
Column: 1
flow: tcp.TCPFlow
class TcpMessageInjected(MessageInjected[tcp.TCPMessage]):
"""
The user has injected a custom TCP message.
"""
Reported by Pylint.
Line: 69
Column: 5
self.flow = tcp.TCPFlow(self.context.client, self.context.server, True)
@expect(events.Start)
def start(self, _) -> layer.CommandGenerator[None]:
if self.flow:
yield TcpStartHook(self.flow)
if self.context.server.timestamp_start is None:
err = yield commands.OpenConnection(self.context.server)
Reported by Pylint.
Line: 87
Column: 5
_handle_event = start
@expect(events.DataReceived, events.ConnectionClosed, TcpMessageInjected)
def relay_messages(self, event: events.Event) -> layer.CommandGenerator[None]:
if isinstance(event, TcpMessageInjected):
# we just spoof that we received data here and then process that regularly.
event = events.DataReceived(
self.context.client if event.message.from_client else self.context.server,
Reported by Pylint.
Line: 87
Column: 5
_handle_event = start
@expect(events.DataReceived, events.ConnectionClosed, TcpMessageInjected)
def relay_messages(self, event: events.Event) -> layer.CommandGenerator[None]:
if isinstance(event, TcpMessageInjected):
# we just spoof that we received data here and then process that regularly.
event = events.DataReceived(
self.context.client if event.message.from_client else self.context.server,
Reported by Pylint.
Line: 96
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
event.message.content,
)
assert isinstance(event, events.ConnectionEvent)
from_client = event.connection == self.context.client
send_to: Connection
if from_client:
send_to = self.context.server
Reported by Bandit.
Line: 134
Column: 5
raise AssertionError(f"Unexpected event: {event}")
@expect(events.DataReceived, events.ConnectionClosed, TcpMessageInjected)
def done(self, _) -> layer.CommandGenerator[None]:
yield from ()
Reported by Pylint.
Line: 134
Column: 5
raise AssertionError(f"Unexpected event: {event}")
@expect(events.DataReceived, events.ConnectionClosed, TcpMessageInjected)
def done(self, _) -> layer.CommandGenerator[None]:
yield from ()
Reported by Pylint.
mitmproxy/tools/console/grideditor/col_subgrid.py
8 issues
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console.grideditor import base
from mitmproxy.tools.console import signals
from mitmproxy.net.http import cookies
class Column(base.Column):
def __init__(self, heading, subeditor):
super().__init__(heading)
Reported by Pylint.
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console.grideditor import base
from mitmproxy.tools.console import signals
from mitmproxy.net.http import cookies
class Column(base.Column):
def __init__(self, heading, subeditor):
super().__init__(heading)
Reported by Pylint.
Line: 7
Column: 1
from mitmproxy.net.http import cookies
class Column(base.Column):
def __init__(self, heading, subeditor):
super().__init__(heading)
self.subeditor = subeditor
def Edit(self, data):
Reported by Pylint.
Line: 21
Column: 5
def blank(self):
return []
def keypress(self, key, editor):
if key in "rRe":
signals.status_message.send(
self,
message="Press enter to edit this field.",
expire=1000
Reported by Pylint.
Line: 22
Column: 9
return []
def keypress(self, key, editor):
if key in "rRe":
signals.status_message.send(
self,
message="Press enter to edit this field.",
expire=1000
)
Reported by Pylint.
Line: 36
Column: 1
return key
class Display(base.Cell):
def __init__(self, data):
p = cookies._format_pairs(data, sep="\n")
w = urwid.Text(p)
super().__init__(w)
Reported by Pylint.
Line: 38
Column: 9
class Display(base.Cell):
def __init__(self, data):
p = cookies._format_pairs(data, sep="\n")
w = urwid.Text(p)
super().__init__(w)
def get_data(self):
pass
Reported by Pylint.
Line: 39
Column: 9
class Display(base.Cell):
def __init__(self, data):
p = cookies._format_pairs(data, sep="\n")
w = urwid.Text(p)
super().__init__(w)
def get_data(self):
pass
Reported by Pylint.
mitmproxy/addons/block.py
8 issues
Line: 1
Column: 1
import ipaddress
from mitmproxy import ctx
class Block:
def load(self, loader):
loader.add_option(
"block_global", bool, True,
"""
Reported by Pylint.
Line: 5
Column: 1
from mitmproxy import ctx
class Block:
def load(self, loader):
loader.add_option(
"block_global", bool, True,
"""
Block connections from public IP addresses.
Reported by Pylint.
Line: 6
Column: 5
class Block:
def load(self, loader):
loader.add_option(
"block_global", bool, True,
"""
Block connections from public IP addresses.
"""
Reported by Pylint.
Line: 6
Column: 5
class Block:
def load(self, loader):
loader.add_option(
"block_global", bool, True,
"""
Block connections from public IP addresses.
"""
Reported by Pylint.
Line: 22
Column: 5
"""
)
def client_connected(self, client):
parts = client.peername[0].rsplit("%", 1)
address = ipaddress.ip_address(parts[0])
if isinstance(address, ipaddress.IPv6Address):
address = address.ipv4_mapped or address
Reported by Pylint.
Line: 22
Column: 5
"""
)
def client_connected(self, client):
parts = client.peername[0].rsplit("%", 1)
address = ipaddress.ip_address(parts[0])
if isinstance(address, ipaddress.IPv6Address):
address = address.ipv4_mapped or address
Reported by Pylint.
Line: 32
Column: 1
return
if ctx.options.block_private and address.is_private:
ctx.log.warn(f"Client connection from {client.peername[0]} killed by block_private option.")
client.error = "Connection killed by block_private."
if ctx.options.block_global and address.is_global:
ctx.log.warn(f"Client connection from {client.peername[0]} killed by block_global option.")
client.error = "Connection killed by block_global."
Reported by Pylint.
Line: 36
Column: 1
client.error = "Connection killed by block_private."
if ctx.options.block_global and address.is_global:
ctx.log.warn(f"Client connection from {client.peername[0]} killed by block_global option.")
client.error = "Connection killed by block_global."
Reported by Pylint.
mitmproxy/contentviews/auto.py
8 issues
Line: 2
Column: 1
from mitmproxy import contentviews
from . import base
class ViewAuto(base.View):
name = "Auto"
def __call__(self, data, **metadata):
# TODO: The auto view has little justification now that views implement render_priority,
Reported by Pylint.
Line: 9
Column: 3
name = "Auto"
def __call__(self, data, **metadata):
# TODO: The auto view has little justification now that views implement render_priority,
# but we keep it around for now to not touch more parts.
priority, view = max(
(v.render_priority(data, **metadata), v)
for v in contentviews.views
)
Reported by Pylint.
Line: 19
Column: 1
return "No content", []
return view(data, **metadata)
def render_priority(self, data: bytes, **metadata) -> float:
return -1 # don't recurse.
Reported by Pylint.
Line: 19
Column: 31
return "No content", []
return view(data, **metadata)
def render_priority(self, data: bytes, **metadata) -> float:
return -1 # don't recurse.
Reported by Pylint.
Line: 1
Column: 1
from mitmproxy import contentviews
from . import base
class ViewAuto(base.View):
name = "Auto"
def __call__(self, data, **metadata):
# TODO: The auto view has little justification now that views implement render_priority,
Reported by Pylint.
Line: 5
Column: 1
from . import base
class ViewAuto(base.View):
name = "Auto"
def __call__(self, data, **metadata):
# TODO: The auto view has little justification now that views implement render_priority,
# but we keep it around for now to not touch more parts.
Reported by Pylint.
Line: 19
Column: 5
return "No content", []
return view(data, **metadata)
def render_priority(self, data: bytes, **metadata) -> float:
return -1 # don't recurse.
Reported by Pylint.
Line: 19
Column: 5
return "No content", []
return view(data, **metadata)
def render_priority(self, data: bytes, **metadata) -> float:
return -1 # don't recurse.
Reported by Pylint.
mitmproxy/platform/osx.py
8 issues
Line: 3
Column: 1
import subprocess
from . import pf
"""
Doing this the "right" way by using DIOCNATLOOK on the pf device turns out
to be a pain. Apple has made a number of modifications to the data
structures returned, and compiling userspace tools to test and work with
this turns out to be a pain in the ass. Parsing pfctl output is short,
Reported by Pylint.
Line: 5
Column: 1
from . import pf
"""
Doing this the "right" way by using DIOCNATLOOK on the pf device turns out
to be a pain. Apple has made a number of modifications to the data
structures returned, and compiling userspace tools to test and work with
this turns out to be a pain in the ass. Parsing pfctl output is short,
simple, and works.
Reported by Pylint.
Line: 28
Column: 13
if "sudo: a password is required" in e.output.decode(errors="replace"):
insufficient_priv = True
else:
raise RuntimeError("Error getting pfctl state: " + repr(e))
else:
insufficient_priv = "sudo: a password is required" in stxt.decode(errors="replace")
if insufficient_priv:
raise RuntimeError(
Reported by Pylint.
Line: 1
Column: 1
import subprocess
from . import pf
"""
Doing this the "right" way by using DIOCNATLOOK on the pf device turns out
to be a pain. Apple has made a number of modifications to the data
structures returned, and compiling userspace tools to test and work with
this turns out to be a pain in the ass. Parsing pfctl output is short,
Reported by Pylint.
Line: 1
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
import subprocess
from . import pf
"""
Doing this the "right" way by using DIOCNATLOOK on the pf device turns out
to be a pain. Apple has made a number of modifications to the data
structures returned, and compiling userspace tools to test and work with
this turns out to be a pain in the ass. Parsing pfctl output is short,
Reported by Bandit.
Line: 20
Column: 1
STATECMD = ("sudo", "-n", "/sbin/pfctl", "-s", "state")
def original_addr(csock):
peer = csock.getpeername()
try:
stxt = subprocess.check_output(STATECMD, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if "sudo: a password is required" in e.output.decode(errors="replace"):
Reported by Pylint.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
def original_addr(csock):
peer = csock.getpeername()
try:
stxt = subprocess.check_output(STATECMD, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if "sudo: a password is required" in e.output.decode(errors="replace"):
insufficient_priv = True
else:
raise RuntimeError("Error getting pfctl state: " + repr(e))
Reported by Bandit.
Line: 24
Column: 5
peer = csock.getpeername()
try:
stxt = subprocess.check_output(STATECMD, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if "sudo: a password is required" in e.output.decode(errors="replace"):
insufficient_priv = True
else:
raise RuntimeError("Error getting pfctl state: " + repr(e))
else:
Reported by Pylint.
test/mitmproxy/tools/console/test_defaultkeys.py
8 issues
Line: 1
Column: 1
import pytest
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy.test.tflow import tflow
from mitmproxy.tools.console import defaultkeys
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import master
Reported by Pylint.
Line: 1
Column: 1
import pytest
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy.test.tflow import tflow
from mitmproxy.tools.console import defaultkeys
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import master
Reported by Pylint.
Line: 13
Column: 1
@pytest.mark.asyncio
async def test_commands_exist():
command_manager = command.CommandManager(ctx)
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
Reported by Pylint.
Line: 16
Column: 5
async def test_commands_exist():
command_manager = command.CommandManager(ctx)
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
m = master.ConsoleMaster(None)
await m.load_flow(tflow())
Reported by Pylint.
Line: 18
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
m = master.ConsoleMaster(None)
await m.load_flow(tflow())
for binding in km.bindings:
try:
Reported by Bandit.
Line: 19
Column: 5
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
m = master.ConsoleMaster(None)
await m.load_flow(tflow())
for binding in km.bindings:
try:
parsed, _ = command_manager.parse_partial(binding.command.strip())
Reported by Pylint.
Line: 32
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if a.type != mitmproxy.types.Space
]
assert cmd in m.commands.commands
cmd_obj = m.commands.commands[cmd]
cmd_obj.prepare_args(args)
except Exception as e:
raise ValueError(f"Invalid binding: {binding.command}") from e
Reported by Bandit.
Line: 36
Column: 9
cmd_obj = m.commands.commands[cmd]
cmd_obj.prepare_args(args)
except Exception as e:
raise ValueError(f"Invalid binding: {binding.command}") from e
Reported by Pylint.
examples/contrib/modify_body_inject_iframe.py
8 issues
Line: 2
Column: 1
# (this script works best with --anticache)
from bs4 import BeautifulSoup
from mitmproxy import ctx, http
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
Reported by Pylint.
Line: 3
Column: 1
# (this script works best with --anticache)
from bs4 import BeautifulSoup
from mitmproxy import ctx, http
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
Reported by Pylint.
Line: 1
Column: 1
# (this script works best with --anticache)
from bs4 import BeautifulSoup
from mitmproxy import ctx, http
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy import ctx, http
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
)
Reported by Pylint.
Line: 7
Column: 5
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
)
def response(self, flow: http.HTTPFlow) -> None:
Reported by Pylint.
Line: 7
Column: 5
class Injector:
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
)
def response(self, flow: http.HTTPFlow) -> None:
Reported by Pylint.
Line: 12
Column: 5
"iframe", str, "", "IFrame to inject"
)
def response(self, flow: http.HTTPFlow) -> None:
if ctx.options.iframe:
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
iframe = html.new_tag(
"iframe",
Reported by Pylint.
Line: 12
Column: 5
"iframe", str, "", "IFrame to inject"
)
def response(self, flow: http.HTTPFlow) -> None:
if ctx.options.iframe:
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
iframe = html.new_tag(
"iframe",
Reported by Pylint.
examples/addons/tcp-simple.py
8 issues
Line: 13
Column: 1
mitmdump --rawtcp --tcp-hosts ".*" -s examples/tcp-simple.py
"""
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp
def tcp_message(flow: tcp.TCPFlow):
Reported by Pylint.
Line: 14
Column: 1
mitmdump --rawtcp --tcp-hosts ".*" -s examples/tcp-simple.py
"""
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp
def tcp_message(flow: tcp.TCPFlow):
message = flow.messages[-1]
Reported by Pylint.
Line: 15
Column: 1
"""
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp
def tcp_message(flow: tcp.TCPFlow):
message = flow.messages[-1]
message.content = message.content.replace(b"foo", b"bar")
Reported by Pylint.
Line: 1
Column: 1
"""
Process individual messages from a TCP connection.
This script replaces full occurences of "foo" with "bar" and prints various details for each message.
Please note that TCP is stream-based and *not* message-based. mitmproxy splits stream contents into "messages"
as they are received by socket.recv(). This is pretty arbitrary and should not be relied on.
However, it is sometimes good enough as a quick hack.
Example Invocation:
Reported by Pylint.
Line: 4
Column: 1
"""
Process individual messages from a TCP connection.
This script replaces full occurences of "foo" with "bar" and prints various details for each message.
Please note that TCP is stream-based and *not* message-based. mitmproxy splits stream contents into "messages"
as they are received by socket.recv(). This is pretty arbitrary and should not be relied on.
However, it is sometimes good enough as a quick hack.
Example Invocation:
Reported by Pylint.
Line: 5
Column: 1
Process individual messages from a TCP connection.
This script replaces full occurences of "foo" with "bar" and prints various details for each message.
Please note that TCP is stream-based and *not* message-based. mitmproxy splits stream contents into "messages"
as they are received by socket.recv(). This is pretty arbitrary and should not be relied on.
However, it is sometimes good enough as a quick hack.
Example Invocation:
Reported by Pylint.
Line: 18
Column: 1
from mitmproxy import tcp
def tcp_message(flow: tcp.TCPFlow):
message = flow.messages[-1]
message.content = message.content.replace(b"foo", b"bar")
ctx.log.info(
f"tcp_message[from_client={message.from_client}), content={strutils.bytes_to_escaped_str(message.content)}]"
Reported by Pylint.
Line: 23
Column: 1
message.content = message.content.replace(b"foo", b"bar")
ctx.log.info(
f"tcp_message[from_client={message.from_client}), content={strutils.bytes_to_escaped_str(message.content)}]"
)
Reported by Pylint.
mitmproxy/stateobject.py
8 issues
Line: 8
Column: 1
from mitmproxy.utils import typecheck
class StateObject(serializable.Serializable):
"""
An object with serializable state.
State attributes can either be serializable types(str, tuple, bool, ...)
or StateObject instances themselves.
Reported by Pylint.
Line: 86
Column: 13
try:
json.dumps(val)
except TypeError:
raise ValueError(f"Data not serializable: {val}")
return val
else:
return typeinfo(val)
Reported by Pylint.
Line: 1
Column: 1
import json
import typing
from mitmproxy.coretypes import serializable
from mitmproxy.utils import typecheck
class StateObject(serializable.Serializable):
"""
Reported by Pylint.
Line: 52
Column: 1
raise RuntimeWarning(f"Unexpected State in __setstate__: {state}")
def _process(typeinfo: typecheck.Type, val: typing.Any, make: bool) -> typing.Any:
if val is None:
return None
elif make and hasattr(typeinfo, "from_state"):
return typeinfo.from_state(val)
elif not make and hasattr(val, "get_state"):
Reported by Pylint.
Line: 53
Column: 5
def _process(typeinfo: typecheck.Type, val: typing.Any, make: bool) -> typing.Any:
if val is None:
return None
elif make and hasattr(typeinfo, "from_state"):
return typeinfo.from_state(val)
elif not make and hasattr(val, "get_state"):
return val.get_state()
Reported by Pylint.
Line: 62
Column: 5
typename = str(typeinfo)
if typename.startswith("typing.List"):
T = typecheck.sequence_type(typeinfo)
return [_process(T, x, make) for x in val]
elif typename.startswith("typing.Tuple"):
Ts = typecheck.tuple_types(typeinfo)
if len(Ts) != len(val):
Reported by Pylint.
Line: 63
Column: 9
typename = str(typeinfo)
if typename.startswith("typing.List"):
T = typecheck.sequence_type(typeinfo)
return [_process(T, x, make) for x in val]
elif typename.startswith("typing.Tuple"):
Ts = typecheck.tuple_types(typeinfo)
if len(Ts) != len(val):
raise ValueError(f"Invalid data. Expected {Ts}, got {val}.")
Reported by Pylint.
Line: 66
Column: 9
T = typecheck.sequence_type(typeinfo)
return [_process(T, x, make) for x in val]
elif typename.startswith("typing.Tuple"):
Ts = typecheck.tuple_types(typeinfo)
if len(Ts) != len(val):
raise ValueError(f"Invalid data. Expected {Ts}, got {val}.")
return tuple(
_process(T, x, make) for T, x in zip(Ts, val)
)
Reported by Pylint.
mitmproxy/addons/disable_h2c.py
8 issues
Line: 1
Column: 1
import mitmproxy
class DisableH2C:
"""
We currently only support HTTP/2 over a TLS connection.
Some clients try to upgrade a connection from HTTP/1.1 to h2c. We need to
Reported by Pylint.
Line: 17
Column: 5
by sending the connection preface. We just kill those flows.
"""
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
del f.request.headers['upgrade']
if 'connection' in f.request.headers:
del f.request.headers['connection']
Reported by Pylint.
Line: 17
Column: 5
by sending the connection preface. We just kill those flows.
"""
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
del f.request.headers['upgrade']
if 'connection' in f.request.headers:
del f.request.headers['connection']
Reported by Pylint.
Line: 17
Column: 5
by sending the connection preface. We just kill those flows.
"""
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
del f.request.headers['upgrade']
if 'connection' in f.request.headers:
del f.request.headers['connection']
Reported by Pylint.
Line: 19
Column: 1
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
del f.request.headers['upgrade']
if 'connection' in f.request.headers:
del f.request.headers['connection']
if 'http2-settings' in f.request.headers:
del f.request.headers['http2-settings']
Reported by Pylint.
Line: 33
Column: 1
)
if is_connection_preface:
f.kill()
mitmproxy.ctx.log.warn("Initiating HTTP/2 connections with prior knowledge are currently not supported.")
# Handlers
def request(self, f):
self.process_flow(f)
Reported by Pylint.
Line: 37
Column: 5
# Handlers
def request(self, f):
self.process_flow(f)
Reported by Pylint.
Line: 37
Column: 5
# Handlers
def request(self, f):
self.process_flow(f)
Reported by Pylint.