The following issues were found
mitmproxy/addonmanager.py
27 issues
Line: 13
Column: 1
from mitmproxy import hooks
from mitmproxy import exceptions
from mitmproxy import flow
from . import ctx
def _get_name(itm):
return getattr(itm, "name", itm.__class__.__name__.lower())
Reported by Pylint.
Line: 44
Column: 5
def safecall():
try:
yield
except (exceptions.AddonHalt, exceptions.OptionsError):
raise
except Exception:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon")
ctx.log.error(
Reported by Pylint.
Line: 46
Column: 12
yield
except (exceptions.AddonHalt, exceptions.OptionsError):
raise
except Exception:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon")
ctx.log.error(
"Addon error: %s" % "".join(
traceback.format_exception(etype, value, tb)
Reported by Pylint.
Line: 69
Column: 13
name: str,
typespec: type,
default: typing.Any,
help: str,
choices: typing.Optional[typing.Sequence[str]] = None
) -> None:
"""
Add an option to mitmproxy.
Reported by Pylint.
Line: 80
Column: 24
it will be generated and added by tools as needed.
"""
if name in self.master.options:
existing = self.master.options._options[name]
same_signature = (
existing.name == name and
existing.typespec == typespec and
existing.default == default and
existing.help == help and
Reported by Pylint.
Line: 136
Column: 30
self.master = master
master.options.changed.connect(self._configure_all)
def _configure_all(self, options, updated):
self.trigger(hooks.ConfigureHook(updated))
def clear(self):
"""
Remove all addons.
Reported by Pylint.
Line: 1
Column: 1
import contextlib
import pprint
import sys
import traceback
import types
import typing
from dataclasses import dataclass
from mitmproxy import controller
Reported by Pylint.
Line: 20
Column: 1
return getattr(itm, "name", itm.__class__.__name__.lower())
def cut_traceback(tb, func_name):
"""
Cut off a traceback at the function with the given name.
The func_name's frame is excluded.
Args:
Reported by Pylint.
Line: 41
Column: 1
@contextlib.contextmanager
def safecall():
try:
yield
except (exceptions.AddonHalt, exceptions.OptionsError):
raise
except Exception:
Reported by Pylint.
Line: 47
Column: 23
except (exceptions.AddonHalt, exceptions.OptionsError):
raise
except Exception:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon")
ctx.log.error(
"Addon error: %s" % "".join(
traceback.format_exception(etype, value, tb)
)
Reported by Pylint.
test/mitmproxy/contentviews/test_api.py
26 issues
Line: 3
Column: 1
from unittest import mock
import pytest
from mitmproxy import contentviews
from mitmproxy.test import tflow
from mitmproxy.test import tutils
Reported by Pylint.
Line: 70
Column: 18
def test_get_message_content_view():
f = tflow.tflow()
r = tutils.treq()
desc, lines, err = contentviews.get_message_content_view("raw", r, f)
assert desc == "Raw"
desc, lines, err = contentviews.get_message_content_view("unknown", r, f)
assert desc == "Raw"
Reported by Pylint.
Line: 1
Column: 1
from unittest import mock
import pytest
from mitmproxy import contentviews
from mitmproxy.test import tflow
from mitmproxy.test import tutils
Reported by Pylint.
Line: 10
Column: 1
from mitmproxy.test import tutils
class TestContentView(contentviews.View):
name = "test"
def __call__(self, *args, **kwargs):
pass
Reported by Pylint.
Line: 16
Column: 5
def __call__(self, *args, **kwargs):
pass
def should_render(self, content_type):
return content_type == "test/123"
def test_add_remove():
tcv = TestContentView()
Reported by Pylint.
Line: 16
Column: 5
def __call__(self, *args, **kwargs):
pass
def should_render(self, content_type):
return content_type == "test/123"
def test_add_remove():
tcv = TestContentView()
Reported by Pylint.
Line: 20
Column: 1
return content_type == "test/123"
def test_add_remove():
tcv = TestContentView()
contentviews.add(tcv)
assert tcv in contentviews.views
# repeated addition causes exception
Reported by Pylint.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_add_remove():
tcv = TestContentView()
contentviews.add(tcv)
assert tcv in contentviews.views
# repeated addition causes exception
with pytest.raises(ValueError, match="Duplicate view"):
contentviews.add(tcv)
Reported by Bandit.
Line: 30
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
contentviews.add(tcv)
contentviews.remove(tcv)
assert tcv not in contentviews.views
def test_get_content_view():
desc, lines, err = contentviews.get_content_view(
contentviews.get("Raw"),
Reported by Bandit.
Line: 33
Column: 1
assert tcv not in contentviews.views
def test_get_content_view():
desc, lines, err = contentviews.get_content_view(
contentviews.get("Raw"),
b"[1, 2, 3]",
)
assert "Raw" in desc
Reported by Pylint.
examples/contrib/tls_passthrough.py
26 issues
Line: 30
Column: 1
from enum import Enum
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
Reported by Pylint.
Line: 31
Column: 1
from enum import Enum
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
class InterceptionResult(Enum):
Reported by Pylint.
Line: 32
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
class InterceptionResult(Enum):
success = True
Reported by Pylint.
Line: 33
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
class InterceptionResult(Enum):
success = True
failure = False
Reported by Pylint.
Line: 1
Column: 3
# FIXME: This addon is currently not compatible with mitmproxy 7 and above.
"""
This inline script allows conditional TLS Interception based
on a user-defined strategy.
Example:
> mitmdump -s tls_passthrough.py
Reported by Pylint.
Line: 123
Column: 15
)
def configure(updated):
global tls_strategy
if ctx.options.tlsstrat > 0:
tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
else:
tls_strategy = ConservativeStrategy()
Reported by Pylint.
Line: 124
Column: 5
def configure(updated):
global tls_strategy
if ctx.options.tlsstrat > 0:
tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
else:
tls_strategy = ConservativeStrategy()
Reported by Pylint.
Line: 131
Column: 16
tls_strategy = ConservativeStrategy()
def next_layer(next_layer):
"""
This hook does the actual magic - if the next layer is planned to be a TLS layer,
we check if we want to enter pass-through mode instead.
"""
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
Reported by Pylint.
Line: 136
Column: 45
This hook does the actual magic - if the next layer is planned to be a TLS layer,
we check if we want to enter pass-through mode instead.
"""
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
server_address = next_layer.server_conn.address
if tls_strategy.should_intercept(server_address):
# We try to intercept.
# Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
Reported by Pylint.
Line: 36
Column: 1
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
class InterceptionResult(Enum):
success = True
failure = False
skipped = None
Reported by Pylint.
test/mitmproxy/contentviews/test_msgpack.py
26 issues
Line: 1
Column: 1
from hypothesis import given
from hypothesis.strategies import binary
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
Reported by Pylint.
Line: 2
Column: 1
from hypothesis import given
from hypothesis.strategies import binary
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
Reported by Pylint.
Line: 4
Column: 1
from hypothesis import given
from hypothesis.strategies import binary
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
Reported by Pylint.
Line: 7
Column: 1
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
def msgpack_encode(content):
return packb(content, use_bin_type=True)
Reported by Pylint.
Line: 1
Column: 1
from hypothesis import given
from hypothesis.strategies import binary
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
Reported by Pylint.
Line: 10
Column: 1
from . import full_eval
def msgpack_encode(content):
return packb(content, use_bin_type=True)
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
Reported by Pylint.
Line: 14
Column: 1
return packb(content, use_bin_type=True)
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
assert msgpack.parse_msgpack(b"aoesuteoahu") is msgpack.PARSE_ERROR
assert msgpack.parse_msgpack(msgpack_encode({"foo": "\xe4\xb8\x96\xe7\x95\x8c"}))
Reported by Pylint.
Line: 15
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
assert msgpack.parse_msgpack(b"aoesuteoahu") is msgpack.PARSE_ERROR
assert msgpack.parse_msgpack(msgpack_encode({"foo": "\xe4\xb8\x96\xe7\x95\x8c"}))
def test_format_msgpack():
Reported by Bandit.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
assert msgpack.parse_msgpack(b"aoesuteoahu") is msgpack.PARSE_ERROR
assert msgpack.parse_msgpack(msgpack_encode({"foo": "\xe4\xb8\x96\xe7\x95\x8c"}))
def test_format_msgpack():
assert list(msgpack.format_msgpack({
Reported by Bandit.
Line: 17
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
assert msgpack.parse_msgpack(b"aoesuteoahu") is msgpack.PARSE_ERROR
assert msgpack.parse_msgpack(msgpack_encode({"foo": "\xe4\xb8\x96\xe7\x95\x8c"}))
def test_format_msgpack():
assert list(msgpack.format_msgpack({
"data": [
Reported by Bandit.
mitmproxy/contrib/kaitaistruct/google_protobuf.py
26 issues
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 11
Column: 1
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
from .vlq_base128_le import VlqBase128Le
class GoogleProtobuf(KaitaiStruct):
"""Google Protocol Buffers (AKA protobuf) is a popular data
serialization scheme used for communication protocols, data storage,
etc. There are implementations are available for almost every
popular language. The focus points of this scheme are brevity (data
Reported by Pylint.
Line: 95
Column: 24
arbitrary bytes from UTF-8 encoded strings, etc.
"""
if hasattr(self, '_m_wire_type'):
return self._m_wire_type if hasattr(self, '_m_wire_type') else None
self._m_wire_type = self._root.Pair.WireTypes((self.key.value & 7))
return self._m_wire_type if hasattr(self, '_m_wire_type') else None
@property
Reported by Pylint.
Line: 106
Column: 24
field name in a `.proto` file by this field tag.
"""
if hasattr(self, '_m_field_tag'):
return self._m_field_tag if hasattr(self, '_m_field_tag') else None
self._m_field_tag = (self.key.value >> 3)
return self._m_field_tag if hasattr(self, '_m_field_tag') else None
Reported by Pylint.
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 97
Column: 13
if hasattr(self, '_m_wire_type'):
return self._m_wire_type if hasattr(self, '_m_wire_type') else None
self._m_wire_type = self._root.Pair.WireTypes((self.key.value & 7))
return self._m_wire_type if hasattr(self, '_m_wire_type') else None
@property
def field_tag(self):
"""Identifies a field of protocol. One can look up symbolic
Reported by Pylint.
Line: 108
Column: 13
if hasattr(self, '_m_field_tag'):
return self._m_field_tag if hasattr(self, '_m_field_tag') else None
self._m_field_tag = (self.key.value >> 3)
return self._m_field_tag if hasattr(self, '_m_field_tag') else None
class DelimitedBytes(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
Reported by Pylint.
Line: 1
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 5
Column: 1
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
examples/contrib/webscanner_helper/proxyauth_selenium.py
25 issues
Line: 8
Column: 1
import time
from typing import Dict, List, cast, Any
import mitmproxy.http
from mitmproxy import flowfilter
from mitmproxy import master
from mitmproxy.script import concurrent
from selenium import webdriver
Reported by Pylint.
Line: 9
Column: 1
from typing import Dict, List, cast, Any
import mitmproxy.http
from mitmproxy import flowfilter
from mitmproxy import master
from mitmproxy.script import concurrent
from selenium import webdriver
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 10
Column: 1
import mitmproxy.http
from mitmproxy import flowfilter
from mitmproxy import master
from mitmproxy.script import concurrent
from selenium import webdriver
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 11
Column: 1
import mitmproxy.http
from mitmproxy import flowfilter
from mitmproxy import master
from mitmproxy.script import concurrent
from selenium import webdriver
logger = logging.getLogger(__name__)
cookie_key_name = {
Reported by Pylint.
Line: 12
Column: 1
from mitmproxy import flowfilter
from mitmproxy import master
from mitmproxy.script import concurrent
from selenium import webdriver
logger = logging.getLogger(__name__)
cookie_key_name = {
"path": "Path",
Reported by Pylint.
Line: 78
Column: 13
def request(self, flow: mitmproxy.http.HTTPFlow):
if flow.request.is_replay:
logger.warning("Caught replayed request: " + str(flow))
if (not self.filter or self.filter(flow)) and self.auth_oracle.is_unauthorized_request(flow):
logger.debug("unauthorized request detected, perform login")
self._login(flow)
# has to be concurrent because replay.client is blocking and replayed flows
Reported by Pylint.
Line: 96
Column: 25
cast(Any, master).commands.call("replay.client", [new_flow])
count = 0
while new_flow.response is None and count < 10:
logger.error("waiting since " + str(count) + " ...")
count = count + 1
time.sleep(1)
if new_flow.response:
flow.response = new_flow.response
else:
Reported by Pylint.
Line: 1
Column: 1
import abc
import logging
import random
import string
import time
from typing import Dict, List, cast, Any
import mitmproxy.http
from mitmproxy import flowfilter
Reported by Pylint.
Line: 25
Column: 1
}
def randomString(string_length=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(string_length))
Reported by Pylint.
Line: 28
Suggestion:
https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b311-random
def randomString(string_length=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(string_length))
class AuthorizationOracle(abc.ABC):
"""Abstract class for an authorization oracle which decides if a given request or response is authenticated."""
Reported by Bandit.
mitmproxy/test/taddons.py
25 issues
Line: 5
Column: 1
import asyncio
import sys
import mitmproxy.master
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
Reported by Pylint.
Line: 6
Column: 1
import sys
import mitmproxy.master
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
Reported by Pylint.
Line: 7
Column: 1
import mitmproxy.master
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
Reported by Pylint.
Line: 8
Column: 1
import mitmproxy.master
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
class TestAddons(addonmanager.AddonManager):
Reported by Pylint.
Line: 9
Column: 1
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
class TestAddons(addonmanager.AddonManager):
def __init__(self, master):
Reported by Pylint.
Line: 10
Column: 1
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
class TestAddons(addonmanager.AddonManager):
def __init__(self, master):
super().__init__(master)
Reported by Pylint.
Line: 14
Column: 5
class TestAddons(addonmanager.AddonManager):
def __init__(self, master):
super().__init__(master)
def trigger(self, event: hooks.Hook):
if isinstance(event, log.AddLogHook):
self.master.logs.append(event.entry)
Reported by Pylint.
Line: 45
Column: 13
# start with a sleep(0), which lets all other coroutines advance.
# often this is enough to not sleep at all.
await asyncio.sleep(0)
for i in range(int(timeout / 0.01)):
if self.has_log(txt, level):
return True
else:
await asyncio.sleep(0.01)
raise AssertionError(f"Did not find log entry {txt!r} in {self.logs}.")
Reported by Pylint.
Line: 88
Column: 9
Cycles the flow through the events for the flow. Stops if a reply
is taken (as in flow interception).
"""
f.reply._state = "start"
for evt in eventsequence.iterate(f):
self.master.addons.invoke_addon(
addon,
evt
)
Reported by Pylint.
Line: 1
Column: 1
import contextlib
import asyncio
import sys
import mitmproxy.master
import mitmproxy.options
from mitmproxy import addonmanager, hooks, log
from mitmproxy import command
from mitmproxy import eventsequence
Reported by Pylint.
mitmproxy/addons/serverplayback.py
25 issues
Line: 114
Column: 21
def count(self) -> int:
return sum([len(i) for i in self.flowmap.values()])
def _hash(self, flow: http.HTTPFlow) -> typing.Hashable:
"""
Calculates a loose hash of the flow request.
"""
r = flow.request
_, _, path, _, query, _ = urllib.parse.urlparse(r.url)
Reported by Pylint.
Line: 163
Column: 25
repr(key).encode("utf8", "surrogateescape")
).digest()
def next_flow(self, flow: http.HTTPFlow) -> typing.Optional[http.HTTPFlow]:
"""
Returns the next flow object, or None if no matching flow was
found.
"""
hash = self._hash(flow)
Reported by Pylint.
Line: 168
Column: 9
Returns the next flow object, or None if no matching flow was
found.
"""
hash = self._hash(flow)
if hash in self.flowmap:
if ctx.options.server_replay_nopop:
return next((
flow
for flow in self.flowmap[hash]
Reported by Pylint.
Line: 190
Column: 25
else:
return None
def configure(self, updated):
if not self.configured and ctx.options.server_replay:
self.configured = True
try:
flows = io.read_flows_from_paths(ctx.options.server_replay)
except exceptions.FlowReadException as e:
Reported by Pylint.
Line: 1
Column: 1
import hashlib
import typing
import urllib
import mitmproxy.types
from mitmproxy import command, hooks
from mitmproxy import ctx, http
from mitmproxy import exceptions
from mitmproxy import flow
Reported by Pylint.
Line: 13
Column: 1
from mitmproxy import io
class ServerPlayback:
flowmap: typing.Dict[typing.Hashable, typing.List[http.HTTPFlow]]
configured: bool
def __init__(self):
self.flowmap = {}
Reported by Pylint.
Line: 21
Column: 5
self.flowmap = {}
self.configured = False
def load(self, loader):
loader.add_option(
"server_replay_kill_extra", bool, False,
"Kill extra requests during replay."
)
loader.add_option(
Reported by Pylint.
Line: 21
Column: 5
self.flowmap = {}
self.configured = False
def load(self, loader):
loader.add_option(
"server_replay_kill_extra", bool, False,
"Kill extra requests during replay."
)
loader.add_option(
Reported by Pylint.
Line: 88
Column: 13
Replay server responses from flows.
"""
self.flowmap = {}
for f in flows:
if isinstance(f, http.HTTPFlow):
lst = self.flowmap.setdefault(self._hash(f), [])
lst.append(f)
ctx.master.addons.trigger(hooks.UpdateHook([]))
Reported by Pylint.
Line: 95
Column: 5
ctx.master.addons.trigger(hooks.UpdateHook([]))
@command.command("replay.server.file")
def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
raise exceptions.CommandError(str(e))
self.load_flows(flows)
Reported by Pylint.
test/mitmproxy/contentviews/test_xml_html.py
25 issues
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import xml_html
from . import full_eval
datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(tdata):
Reported by Pylint.
Line: 4
Column: 1
import pytest
from mitmproxy.contentviews import xml_html
from . import full_eval
datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(tdata):
Reported by Pylint.
Line: 17
Column: 9
assert v(b"<p") == ('XML', [[('text', '<p')]])
with open(tdata.path(datadir + "simple.html")) as f:
input = f.read()
tokens = xml_html.tokenize(input)
assert str(next(tokens)) == "Tag(<!DOCTYPE html>)"
@pytest.mark.parametrize("filename", [
Reported by Pylint.
Line: 32
Column: 9
def test_format_xml(filename, tdata):
path = tdata.path(datadir + filename)
with open(path) as f:
input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f:
expected = f.read()
tokens = xml_html.tokenize(input)
assert xml_html.format_xml(tokens) == expected
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import xml_html
from . import full_eval
datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(tdata):
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.contentviews import xml_html
from . import full_eval
datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]])
Reported by Pylint.
Line: 9
Column: 1
datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]])
assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]])
assert v(b"<>") == ('XML', [[('text', '<>')]])
assert v(b"<p") == ('XML', [[('text', '<p')]])
Reported by Pylint.
Line: 10
Column: 5
def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]])
assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]])
assert v(b"<>") == ('XML', [[('text', '<>')]])
assert v(b"<p") == ('XML', [[('text', '<p')]])
Reported by Pylint.
Line: 11
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]])
assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]])
assert v(b"<>") == ('XML', [[('text', '<>')]])
assert v(b"<p") == ('XML', [[('text', '<p')]])
with open(tdata.path(datadir + "simple.html")) as f:
Reported by Bandit.
Line: 12
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]])
assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]])
assert v(b"<>") == ('XML', [[('text', '<>')]])
assert v(b"<p") == ('XML', [[('text', '<p')]])
with open(tdata.path(datadir + "simple.html")) as f:
input = f.read()
Reported by Bandit.
mitmproxy/utils/debug.py
24 issues
Line: 36
Column: 13
print("=======")
try:
import psutil
except:
print("(psutil not installed, skipping some debug info)")
else:
p = psutil.Process()
print("num threads: ", p.num_threads())
Reported by Pylint.
Line: 29
Column: 15
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 29
Column: 15
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 29
Column: 28
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 37
Column: 9
try:
import psutil
except:
print("(psutil not installed, skipping some debug info)")
else:
p = psutil.Process()
print("num threads: ", p.num_threads())
if hasattr(p, "num_fds"):
Reported by Pylint.
Line: 67
Column: 37
bthreads.append(i)
else:
print(i.name)
bthreads.sort(key=lambda x: x._thread_started)
for i in bthreads:
print(i._threadinfo())
print()
print("Memory")
Reported by Pylint.
Line: 69
Column: 19
print(i.name)
bthreads.sort(key=lambda x: x._thread_started)
for i in bthreads:
print(i._threadinfo())
print()
print("Memory")
print("=======")
gc.collect()
Reported by Pylint.
Line: 106
Column: 17
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.
Line: 106
Column: 30
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.
Line: 106
Column: 17
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.