The following issues were found
mitmproxy/tools/console/flowlist.py
33 issues
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import layoutwidget
import mitmproxy.tools.console.master # noqa
class FlowItem(urwid.WidgetWrap):
Reported by Pylint.
Line: 5
Column: 1
from mitmproxy.tools.console import common
from mitmproxy.tools.console import layoutwidget
import mitmproxy.tools.console.master # noqa
class FlowItem(urwid.WidgetWrap):
def __init__(self, master, flow):
Reported by Pylint.
Line: 33
Column: 58
def selectable(self):
return True
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.master.commands.execute("console.view.flow @focus")
return True
def keypress(self, size, key):
Reported by Pylint.
Line: 33
Column: 53
def selectable(self):
return True
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.master.commands.execute("console.view.flow @focus")
return True
def keypress(self, size, key):
Reported by Pylint.
Line: 33
Column: 27
def selectable(self):
return True
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.master.commands.execute("console.view.flow @focus")
return True
def keypress(self, size, key):
Reported by Pylint.
Line: 33
Column: 48
def selectable(self):
return True
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.master.commands.execute("console.view.flow @focus")
return True
def keypress(self, size, key):
Reported by Pylint.
Line: 38
Column: 24
self.master.commands.execute("console.view.flow @focus")
return True
def keypress(self, size, key):
return key
class FlowListWalker(urwid.ListWalker):
Reported by Pylint.
Line: 109
Column: 35
def view_changed(self):
self.body.view_changed()
def set_flowlist_layout(self, opts, updated):
self.master.ui.clear()
Reported by Pylint.
Line: 109
Column: 41
def view_changed(self):
self.body.view_changed()
def set_flowlist_layout(self, opts, updated):
self.master.ui.clear()
Reported by Pylint.
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import layoutwidget
import mitmproxy.tools.console.master # noqa
class FlowItem(urwid.WidgetWrap):
Reported by Pylint.
mitmproxy/tools/console/overlay.py
31 issues
Line: 3
Column: 1
import math
import urwid
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import keymap
Reported by Pylint.
Line: 63
Column: 24
def selectable(self):
return True
def keypress(self, size, key):
return key
class ChooserListWalker(urwid.ListWalker):
shortcuts = "123456789abcdefghijklmnoprstuvwxyz"
Reported by Pylint.
Line: 1
Column: 1
import math
import urwid
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import keymap
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.tools.console import keymap
class SimpleOverlay(urwid.Overlay, layoutwidget.LayoutWidget):
def __init__(self, master, widget, parent, width, valign="middle"):
self.widget = widget
self.master = master
super().__init__(
Reported by Pylint.
Line: 13
Column: 5
class SimpleOverlay(urwid.Overlay, layoutwidget.LayoutWidget):
def __init__(self, master, widget, parent, width, valign="middle"):
self.widget = widget
self.master = master
super().__init__(
widget,
parent,
Reported by Pylint.
Line: 26
Column: 5
)
@property
def keyctx(self):
return getattr(self.widget, "keyctx")
def key_responder(self):
return self.widget.key_responder()
Reported by Pylint.
Line: 42
Column: 1
return self.widget.layout_popping()
class Choice(urwid.WidgetWrap):
def __init__(self, txt, focus, current, shortcut):
if shortcut:
selection_type = "option_selected_key" if focus else "key"
txt = [(selection_type, shortcut), ") ", txt]
else:
Reported by Pylint.
Line: 50
Column: 13
else:
txt = " " + txt
if current:
s = "option_active_selected" if focus else "option_active"
else:
s = "option_selected" if focus else "text"
super().__init__(
urwid.AttrWrap(
urwid.Padding(urwid.Text(txt)),
Reported by Pylint.
Line: 52
Column: 13
if current:
s = "option_active_selected" if focus else "option_active"
else:
s = "option_selected" if focus else "text"
super().__init__(
urwid.AttrWrap(
urwid.Padding(urwid.Text(txt)),
s,
)
Reported by Pylint.
Line: 60
Column: 5
)
)
def selectable(self):
return True
def keypress(self, size, key):
return key
Reported by Pylint.
test/mitmproxy/proxy/layers/http/hyper_h2_test_helpers.py
31 issues
Line: 11
Column: 1
This module contains helpers for the h2 tests.
"""
from hpack.hpack import Encoder
from hyperframe.frame import (
HeadersFrame, DataFrame, SettingsFrame, WindowUpdateFrame, PingFrame,
GoAwayFrame, RstStreamFrame, PushPromiseFrame, PriorityFrame,
ContinuationFrame, AltSvcFrame
)
Reported by Pylint.
Line: 12
Column: 1
This module contains helpers for the h2 tests.
"""
from hpack.hpack import Encoder
from hyperframe.frame import (
HeadersFrame, DataFrame, SettingsFrame, WindowUpdateFrame, PingFrame,
GoAwayFrame, RstStreamFrame, PushPromiseFrame, PriorityFrame,
ContinuationFrame, AltSvcFrame
)
Reported by Pylint.
Line: 41
Column: 5
def preamble(self):
return b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
def build_headers_frame(self,
headers,
flags=[],
stream_id=1,
**priority_kwargs):
"""
Reported by Pylint.
Line: 60
Column: 5
return f
def build_continuation_frame(self, header_block, flags=[], stream_id=1):
"""
Builds a single continuation frame out of the binary header block.
"""
f = ContinuationFrame(stream_id)
f.data = header_block
Reported by Pylint.
Line: 136
Column: 5
f.error_code = error_code
return f
def build_push_promise_frame(self,
stream_id,
promised_stream_id,
headers,
flags=[]):
"""
Reported by Pylint.
Line: 1
Column: 1
# This file has been copied from https://github.com/python-hyper/hyper-h2/blob/master/test/helpers.py,
# MIT License
# -*- coding: utf-8 -*-
"""
helpers
~~~~~~~
This module contains helpers for the h2 tests.
Reported by Pylint.
Line: 25
Column: 1
}
class FrameFactory(object):
"""
A class containing lots of helper methods and state to build frames. This
allows test cases to easily build correct HTTP/2 frames to feed to
hyper-h2.
"""
Reported by Pylint.
Line: 35
Column: 5
def __init__(self):
self.encoder = Encoder()
def refresh_encoder(self):
self.encoder = Encoder()
def preamble(self):
return b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
Reported by Pylint.
Line: 38
Column: 5
def refresh_encoder(self):
self.encoder = Encoder()
def preamble(self):
return b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
def build_headers_frame(self,
headers,
flags=[],
Reported by Pylint.
Line: 38
Column: 5
def refresh_encoder(self):
self.encoder = Encoder()
def preamble(self):
return b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
def build_headers_frame(self,
headers,
flags=[],
Reported by Pylint.
mitmproxy/connection.py
31 issues
Line: 120
Column: 48
def __repr__(self):
attrs = repr({
k: {
"cipher_list": lambda: f"<{len(v)} ciphers>",
"id": lambda: f"…{v[-6:]}"
}.get(k, lambda: v)()
for k, v in self.__dict__.items()
})
return f"{type(self).__name__}({attrs})"
Reported by Pylint.
Line: 121
Column: 37
attrs = repr({
k: {
"cipher_list": lambda: f"<{len(v)} ciphers>",
"id": lambda: f"…{v[-6:]}"
}.get(k, lambda: v)()
for k, v in self.__dict__.items()
})
return f"{type(self).__name__}({attrs})"
Reported by Pylint.
Line: 122
Column: 30
k: {
"cipher_list": lambda: f"<{len(v)} ciphers>",
"id": lambda: f"…{v[-6:]}"
}.get(k, lambda: v)()
for k, v in self.__dict__.items()
})
return f"{type(self).__name__}({attrs})"
@property
Reported by Pylint.
Line: 1
Column: 1
import uuid
import warnings
from abc import ABCMeta
from enum import Flag
from typing import Optional, Sequence, Tuple
from mitmproxy import certs
from mitmproxy.coretypes import serializable
from mitmproxy.net import server_spec
Reported by Pylint.
Line: 31
Column: 1
"""
Base class for client and server connections.
The connection object only exposes metadata about the connection, but not the underlying socket object.
This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
"""
# all connections have a unique id. While
# f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
# we also want these semantics for recorded flows.
Reported by Pylint.
Line: 35
Column: 1
This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
"""
# all connections have a unique id. While
# f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
# we also want these semantics for recorded flows.
id: str
"""A unique UUID to identify the connection."""
state: ConnectionState
"""The current connection state."""
Reported by Pylint.
Line: 49
Column: 1
"""
A string describing a general error with connections to this address.
The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
property. This property is only reused per client connection.
"""
tls: bool = False
Reported by Pylint.
Line: 50
Column: 1
A string describing a general error with connections to this address.
The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
property. This property is only reused per client connection.
"""
tls: bool = False
"""
Reported by Pylint.
Line: 81
Column: 1
[ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
alpn_offers: Sequence[bytes] = ()
"""The ALPN offers as sent in the ClientHello."""
# we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
cipher: Optional[str] = None
"""The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
cipher_list: Sequence[str] = ()
"""Ciphers accepted by the proxy server on this connection."""
tls_version: Optional[str] = None
Reported by Pylint.
Line: 90
Column: 1
"""The active TLS version."""
sni: Optional[str] = None
"""
The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
"""
timestamp_start: Optional[float]
timestamp_end: Optional[float] = None
"""*Timestamp:* Connection has been closed."""
Reported by Pylint.
test/mitmproxy/test_controller.py
31 issues
Line: 3
Column: 1
import asyncio
import pytest
import mitmproxy.ctx
from mitmproxy import controller
from mitmproxy.exceptions import ControlException
from mitmproxy.test import taddons
Reported by Pylint.
Line: 1
Column: 1
import asyncio
import pytest
import mitmproxy.ctx
from mitmproxy import controller
from mitmproxy.exceptions import ControlException
from mitmproxy.test import taddons
Reported by Pylint.
Line: 12
Column: 1
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
Reported by Pylint.
Line: 13
Column: 5
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
Reported by Pylint.
Line: 13
Column: 5
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
Reported by Pylint.
Line: 13
Column: 5
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
Reported by Pylint.
Line: 14
Column: 9
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
Reported by Pylint.
Line: 14
Column: 9
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
Reported by Pylint.
Line: 18
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
async def test():
mitmproxy.ctx.log("test")
asyncio.ensure_future(test())
Reported by Bandit.
Line: 25
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
asyncio.ensure_future(test())
await tctx.master.await_log("test")
assert tctx.master.should_exit.is_set()
class TestReply:
@pytest.mark.asyncio
async def test_simple(self):
Reported by Bandit.
examples/contrib/mitmproxywrapper.py
31 issues
Line: 53
Column: 47
re.MULTILINE)
return {b: a for (a, b) in mapping}
def run_command_with_input(self, command, input):
popen = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
(stdout, stderr) = popen.communicate(input)
Reported by Pylint.
Line: 58
Column: 18
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
(stdout, stderr) = popen.communicate(input)
return stdout
def primary_interace_name(self):
scutil_script = 'get State:/Network/Global/IPv4\nd.show\n'
stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script)
Reported by Pylint.
Line: 1
Column: 1
#!/usr/bin/env python
#
# Helper tool to enable/disable OS X proxy and wrap mitmproxy
#
# Get usage information with:
#
# mitmproxywrapper.py -h
#
Reported by Pylint.
Line: 10
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_imports.html#b404-import-subprocess
# mitmproxywrapper.py -h
#
import subprocess
import re
import argparse
import contextlib
import os
import sys
Reported by Bandit.
Line: 18
Column: 1
import sys
class Wrapper:
def __init__(self, port, extra_arguments=None):
self.port = port
self.extra_arguments = extra_arguments
def run_networksetup_command(self, *arguments):
Reported by Pylint.
Line: 23
Column: 5
self.port = port
self.extra_arguments = extra_arguments
def run_networksetup_command(self, *arguments):
return subprocess.check_output(
['sudo', 'networksetup'] + list(arguments))
def proxy_state_for_service(self, service):
state = self.run_networksetup_command(
Reported by Pylint.
Line: 23
Column: 5
self.port = port
self.extra_arguments = extra_arguments
def run_networksetup_command(self, *arguments):
return subprocess.check_output(
['sudo', 'networksetup'] + list(arguments))
def proxy_state_for_service(self, service):
state = self.run_networksetup_command(
Reported by Pylint.
Line: 24
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b603_subprocess_without_shell_equals_true.html
self.extra_arguments = extra_arguments
def run_networksetup_command(self, *arguments):
return subprocess.check_output(
['sudo', 'networksetup'] + list(arguments))
def proxy_state_for_service(self, service):
state = self.run_networksetup_command(
'-getwebproxy',
Reported by Bandit.
Line: 27
Column: 5
return subprocess.check_output(
['sudo', 'networksetup'] + list(arguments))
def proxy_state_for_service(self, service):
state = self.run_networksetup_command(
'-getwebproxy',
service).splitlines()
return dict([re.findall(r'([^:]+): (.*)', line)[0] for line in state])
Reported by Pylint.
Line: 31
Column: 16
state = self.run_networksetup_command(
'-getwebproxy',
service).splitlines()
return dict([re.findall(r'([^:]+): (.*)', line)[0] for line in state])
def enable_proxy_for_service(self, service):
print(f'Enabling proxy on {service}...')
for subcommand in ['-setwebproxy', '-setsecurewebproxy']:
self.run_networksetup_command(
Reported by Pylint.
mitmproxy/addons/next_layer.py
31 issues
Line: 31
Column: 9
def stack_match(
context: context.Context,
layers: Sequence[Union[LayerCls, Tuple[LayerCls, ...]]]
) -> bool:
if len(context.layers) != len(layers):
return False
return all(
Reported by Pylint.
Line: 32
Column: 9
def stack_match(
context: context.Context,
layers: Sequence[Union[LayerCls, Tuple[LayerCls, ...]]]
) -> bool:
if len(context.layers) != len(layers):
return False
return all(
expected is Any or isinstance(actual, expected)
Reported by Pylint.
Line: 113
Column: 27
nextlayer.data_server(),
)
def _next_layer(self, context: context.Context, data_client: bytes, data_server: bytes) -> Optional[layer.Layer]:
if len(context.layers) == 0:
return self.make_top_layer(context)
if len(data_client) < 3 and not data_server:
return None # not enough data yet to make a decision
Reported by Pylint.
Line: 121
Column: 1
return None # not enough data yet to make a decision
# helper function to quickly check if the existing layer stack matches a particular configuration.
def s(*layers):
return stack_match(context, layers)
# 1. check for --ignore/--allow
ignore = self.ignore_connection(context.server.address, data_client)
if ignore is True:
Reported by Pylint.
Line: 183
Column: 30
# 6. Assume HTTP by default.
return layers.HttpLayer(context, HTTPMode.transparent)
def make_top_layer(self, context: context.Context) -> layer.Layer:
if ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:"):
return layers.modes.HttpProxy(context)
elif ctx.options.mode == "transparent":
return layers.modes.TransparentProxy(context)
Reported by Pylint.
Line: 3
Column: 1
"""
This addon determines the next protocol layer in our proxy stack.
Whenever a protocol layer in the proxy wants to pass a connection to a child layer and isn't sure which protocol comes
next, it calls the `next_layer` hook, which ends up here.
For example, if mitmproxy runs as a regular proxy, we first need to determine if
new clients start with a TLS handshake right away (Secure Web Proxy) or send a plaintext HTTP CONNECT request.
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
Reported by Pylint.
Line: 6
Column: 1
Whenever a protocol layer in the proxy wants to pass a connection to a child layer and isn't sure which protocol comes
next, it calls the `next_layer` hook, which ends up here.
For example, if mitmproxy runs as a regular proxy, we first need to determine if
new clients start with a TLS handshake right away (Secure Web Proxy) or send a plaintext HTTP CONNECT request.
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
which processes the `CONNECT` request, a second time to determine that the client then starts negotiating TLS, and a
third time where we check if the protocol within that TLS stream is actually HTTP or something else.
Reported by Pylint.
Line: 7
Column: 1
next, it calls the `next_layer` hook, which ends up here.
For example, if mitmproxy runs as a regular proxy, we first need to determine if
new clients start with a TLS handshake right away (Secure Web Proxy) or send a plaintext HTTP CONNECT request.
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
which processes the `CONNECT` request, a second time to determine that the client then starts negotiating TLS, and a
third time where we check if the protocol within that TLS stream is actually HTTP or something else.
Reported by Pylint.
Line: 9
Column: 1
new clients start with a TLS handshake right away (Secure Web Proxy) or send a plaintext HTTP CONNECT request.
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
which processes the `CONNECT` request, a second time to determine that the client then starts negotiating TLS, and a
third time where we check if the protocol within that TLS stream is actually HTTP or something else.
Sometimes it's useful to hardcode specific logic in next_layer when one wants to do fancy things.
In that case it's not necessary to modify mitmproxy's source, adding a custom addon with a next_layer event hook
Reported by Pylint.
Line: 10
Column: 1
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
which processes the `CONNECT` request, a second time to determine that the client then starts negotiating TLS, and a
third time where we check if the protocol within that TLS stream is actually HTTP or something else.
Sometimes it's useful to hardcode specific logic in next_layer when one wants to do fancy things.
In that case it's not necessary to modify mitmproxy's source, adding a custom addon with a next_layer event hook
that sets nextlayer.layer works just as well.
Reported by Pylint.
test/mitmproxy/addons/test_intercept.py
31 issues
Line: 1
Column: 1
import pytest
from mitmproxy.addons import intercept
from mitmproxy import exceptions
from mitmproxy.proxy import layers
from mitmproxy.test import taddons
from mitmproxy.test import tflow
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.addons import intercept
from mitmproxy import exceptions
from mitmproxy.proxy import layers
from mitmproxy.test import taddons
from mitmproxy.test import tflow
Reported by Pylint.
Line: 10
Column: 1
from mitmproxy.test import tflow
def test_simple():
r = intercept.Intercept()
with taddons.context(r) as tctx:
assert not r.filt
tctx.configure(r, intercept="~q")
assert r.filt
Reported by Pylint.
Line: 11
Column: 5
def test_simple():
r = intercept.Intercept()
with taddons.context(r) as tctx:
assert not r.filt
tctx.configure(r, intercept="~q")
assert r.filt
assert tctx.options.intercept_active
Reported by Pylint.
Line: 13
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_simple():
r = intercept.Intercept()
with taddons.context(r) as tctx:
assert not r.filt
tctx.configure(r, intercept="~q")
assert r.filt
assert tctx.options.intercept_active
with pytest.raises(exceptions.OptionsError):
tctx.configure(r, intercept="~~")
Reported by Bandit.
Line: 15
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
with taddons.context(r) as tctx:
assert not r.filt
tctx.configure(r, intercept="~q")
assert r.filt
assert tctx.options.intercept_active
with pytest.raises(exceptions.OptionsError):
tctx.configure(r, intercept="~~")
tctx.configure(r, intercept=None)
assert not r.filt
Reported by Bandit.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert not r.filt
tctx.configure(r, intercept="~q")
assert r.filt
assert tctx.options.intercept_active
with pytest.raises(exceptions.OptionsError):
tctx.configure(r, intercept="~~")
tctx.configure(r, intercept=None)
assert not r.filt
assert not tctx.options.intercept_active
Reported by Bandit.
Line: 20
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
with pytest.raises(exceptions.OptionsError):
tctx.configure(r, intercept="~~")
tctx.configure(r, intercept=None)
assert not r.filt
assert not tctx.options.intercept_active
tctx.configure(r, intercept="~s")
f = tflow.tflow(resp=True)
Reported by Bandit.
Line: 21
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
tctx.configure(r, intercept="~~")
tctx.configure(r, intercept=None)
assert not r.filt
assert not tctx.options.intercept_active
tctx.configure(r, intercept="~s")
f = tflow.tflow(resp=True)
tctx.cycle(r, f)
Reported by Bandit.
Line: 25
Column: 9
tctx.configure(r, intercept="~s")
f = tflow.tflow(resp=True)
tctx.cycle(r, f)
assert f.intercepted
f = tflow.tflow(resp=False)
tctx.cycle(r, f)
Reported by Pylint.
examples/contrib/webscanner_helper/urlinjection.py
30 issues
Line: 6
Column: 1
import json
import logging
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 7
Column: 1
import logging
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
class InjectionGenerator:
Reported by Pylint.
Line: 19
Column: 9
@abc.abstractmethod
def inject(self, index, flow: HTTPFlow):
"""Injects the given URL index into the given flow."""
pass
class HTMLInjection(InjectionGenerator):
"""Injects the URL index either by creating a new HTML page or by appending is to an existing page."""
Reported by Pylint.
Line: 66
Column: 17
def inject(self, index, flow: HTTPFlow):
if flow.response is not None:
if flow.response.status_code != 404 and not self.insert:
logger.warning(
f"URL '{flow.request.url}' didn't return 404 status, "
f"index page would overwrite valid page.")
elif self.insert:
content = (flow.response
.content
Reported by Pylint.
Line: 92
Column: 13
@classmethod
def robots_txt(cls, index, directive="Allow"):
lines = ["User-agent: *"]
for scheme_netloc, paths in index.items():
for path, methods in paths.items():
lines.append(directive + ": " + path)
return "\n".join(lines)
def inject(self, index, flow: HTTPFlow):
Reported by Pylint.
Line: 93
Column: 23
def robots_txt(cls, index, directive="Allow"):
lines = ["User-agent: *"]
for scheme_netloc, paths in index.items():
for path, methods in paths.items():
lines.append(directive + ": " + path)
return "\n".join(lines)
def inject(self, index, flow: HTTPFlow):
if flow.response is not None:
Reported by Pylint.
Line: 100
Column: 17
def inject(self, index, flow: HTTPFlow):
if flow.response is not None:
if flow.response.status_code != 404:
logger.warning(
f"URL '{flow.request.url}' didn't return 404 status, "
f"index page would overwrite valid page.")
else:
flow.response.content = self.robots_txt(index,
self.directive).encode(
Reported by Pylint.
Line: 117
Column: 23
lines = [
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><urlset xmlns=\"http://www.sitemaps.org/schemas/sitemap/0.9\">"]
for scheme_netloc, paths in index.items():
for path, methods in paths.items():
url = scheme_netloc + path
lines.append(f"<url><loc>{html.escape(url)}</loc></url>")
lines.append("</urlset>")
return "\n".join(lines)
Reported by Pylint.
Line: 126
Column: 17
def inject(self, index, flow: HTTPFlow):
if flow.response is not None:
if flow.response.status_code != 404:
logger.warning(
f"URL '{flow.request.url}' didn't return 404 status, "
f"index page would overwrite valid page.")
else:
flow.response.content = self.sitemap(index).encode(self.ENCODING)
Reported by Pylint.
Line: 172
Column: 17
self.injection_gen.inject(self.url_store, flow)
flow.response.status_code = 200
flow.response.headers["content-type"] = "text/html"
logger.debug(f"Set status code to 200 and set content to logged "
f"urls. Method: {self.injection_gen}")
Reported by Pylint.
mitmproxy/addons/script.py
29 issues
Line: 37
Column: 12
loader.exec_module(m)
if not getattr(m, "name", None):
m.name = path # type: ignore
except Exception as e:
script_error_handler(path, e, msg=str(e))
finally:
sys.path[:] = oldpath
return m
Reported by Pylint.
Line: 41
Column: 9
script_error_handler(path, e, msg=str(e))
finally:
sys.path[:] = oldpath
return m
def script_error_handler(path, exc, msg="", tb=False):
"""
Handles all the user's script errors with
Reported by Pylint.
Line: 1
Column: 1
import asyncio
import os
import importlib.util
import importlib.machinery
import sys
import types
import typing
import traceback
Reported by Pylint.
Line: 19
Column: 1
import mitmproxy.types as mtypes
def load_script(path: str) -> typing.Optional[types.ModuleType]:
fullname = "__mitmproxy_script__.{}".format(
os.path.splitext(os.path.basename(path))[0]
)
# the fullname is not unique among scripts, so if there already is an existing script with said
# fullname, remove it.
Reported by Pylint.
Line: 28
Column: 5
sys.modules.pop(fullname, None)
oldpath = sys.path
sys.path.insert(0, os.path.dirname(path))
m = None
try:
loader = importlib.machinery.SourceFileLoader(fullname, path)
spec = importlib.util.spec_from_loader(fullname, loader=loader)
assert spec
m = importlib.util.module_from_spec(spec)
Reported by Pylint.
Line: 32
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
try:
loader = importlib.machinery.SourceFileLoader(fullname, path)
spec = importlib.util.spec_from_loader(fullname, loader=loader)
assert spec
m = importlib.util.module_from_spec(spec)
loader.exec_module(m)
if not getattr(m, "name", None):
m.name = path # type: ignore
except Exception as e:
Reported by Bandit.
Line: 33
Column: 9
loader = importlib.machinery.SourceFileLoader(fullname, path)
spec = importlib.util.spec_from_loader(fullname, loader=loader)
assert spec
m = importlib.util.module_from_spec(spec)
loader.exec_module(m)
if not getattr(m, "name", None):
m.name = path # type: ignore
except Exception as e:
script_error_handler(path, e, msg=str(e))
Reported by Pylint.
Line: 37
Column: 5
loader.exec_module(m)
if not getattr(m, "name", None):
m.name = path # type: ignore
except Exception as e:
script_error_handler(path, e, msg=str(e))
finally:
sys.path[:] = oldpath
return m
Reported by Pylint.
Line: 44
Column: 1
return m
def script_error_handler(path, exc, msg="", tb=False):
"""
Handles all the user's script errors with
an optional traceback
"""
exception = type(exc).__name__
Reported by Pylint.
Line: 63
Column: 1
ctx.log.error(log_msg)
ReloadInterval = 1
class Script:
"""
An addon that manages a single script.
Reported by Pylint.