The following issues were found
mitmproxy/net/encoding.py
21 issues
Line: 11
Column: 1
import gzip
import zlib
import brotli
import zstandard as zstd
from typing import Union, Optional, AnyStr, overload # noqa
# We have a shared single-element cache for encoding and decoding.
Reported by Pylint.
Line: 12
Column: 1
import gzip
import zlib
import brotli
import zstandard as zstd
from typing import Union, Optional, AnyStr, overload # noqa
# We have a shared single-element cache for encoding and decoding.
# This is quite useful in practice, e.g.
Reported by Pylint.
Line: 14
Column: 1
import brotli
import zstandard as zstd
from typing import Union, Optional, AnyStr, overload # noqa
# We have a shared single-element cache for encoding and decoding.
# This is quite useful in practice, e.g.
# flow.request.content = flow.request.content.replace(b"foo", b"bar")
# does not require an .encode() call if content does not contain b"foo"
Reported by Pylint.
Line: 14
Column: 1
import brotli
import zstandard as zstd
from typing import Union, Optional, AnyStr, overload # noqa
# We have a shared single-element cache for encoding and decoding.
# This is quite useful in practice, e.g.
# flow.request.content = flow.request.content.replace(b"foo", b"bar")
# does not require an .encode() call if content does not contain b"foo"
Reported by Pylint.
Line: 57
Column: 5
return None
encoding = encoding.lower()
global _cache
cached = (
isinstance(encoded, bytes) and
_cache.encoded == encoded and
_cache.encoding == encoding and
_cache.errors == errors
Reported by Pylint.
Line: 77
Column: 9
except TypeError:
raise
except Exception as e:
raise ValueError("{} when decoding {} with {}: {}".format(
type(e).__name__,
repr(encoded)[:10],
repr(encoding),
repr(e),
))
Reported by Pylint.
Line: 114
Column: 5
return None
encoding = encoding.lower()
global _cache
cached = (
isinstance(decoded, bytes) and
_cache.decoded == decoded and
_cache.encoding == encoding and
_cache.errors == errors
Reported by Pylint.
Line: 134
Column: 9
except TypeError:
raise
except Exception as e:
raise ValueError("{} when encoding {} with {}: {}".format(
type(e).__name__,
repr(decoded)[:10],
repr(encoding),
repr(e),
))
Reported by Pylint.
Line: 14
Column: 1
import brotli
import zstandard as zstd
from typing import Union, Optional, AnyStr, overload # noqa
# We have a shared single-element cache for encoding and decoding.
# This is quite useful in practice, e.g.
# flow.request.content = flow.request.content.replace(b"foo", b"bar")
# does not require an .encode() call if content does not contain b"foo"
Reported by Pylint.
Line: 57
Column: 5
return None
encoding = encoding.lower()
global _cache
cached = (
isinstance(encoded, bytes) and
_cache.encoded == encoded and
_cache.encoding == encoding and
_cache.errors == errors
Reported by Pylint.
examples/contrib/webscanner_helper/urlindex.py
21 issues
Line: 8
Column: 1
from pathlib import Path
from typing import Type, Dict, Union, Optional
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 9
Column: 1
from typing import Type, Dict, Union, Optional
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
class UrlIndexWriter(abc.ABC):
Reported by Pylint.
Line: 32
Column: 9
@abc.abstractmethod
def load(self):
"""Load existing URL index."""
pass
@abc.abstractmethod
def add_url(self, flow: HTTPFlow):
"""Add new URL to URL index."""
pass
Reported by Pylint.
Line: 37
Column: 9
@abc.abstractmethod
def add_url(self, flow: HTTPFlow):
"""Add new URL to URL index."""
pass
@abc.abstractmethod
def save(self):
pass
Reported by Pylint.
Line: 45
Column: 5
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
Reported by Pylint.
Line: 149
Column: 13
try:
self.writer = WRITER[index_format.lower()](file_path)
except KeyError:
raise ValueError(f"Format '{index_format}' is not supported.")
if not append and file_path.exists():
file_path.unlink()
self.writer.load()
Reported by Pylint.
Line: 1
Column: 1
import abc
import datetime
import json
import logging
from pathlib import Path
from typing import Type, Dict, Union, Optional
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
Reported by Pylint.
Line: 17
Column: 1
class UrlIndexWriter(abc.ABC):
"""Abstract Add-on to write seen URLs.
For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
The injection can be done using the URLInjection Add-on.
"""
def __init__(self, filename: Path):
"""Initializes the UrlIndexWriter.
Reported by Pylint.
Line: 40
Column: 5
pass
@abc.abstractmethod
def save(self):
pass
class SetEncoder(json.JSONEncoder):
def default(self, obj):
Reported by Pylint.
Line: 44
Column: 1
pass
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
Reported by Pylint.
test/mitmproxy/contentviews/test_protobuf.py
20 issues
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import protobuf
from . import full_eval
datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(tdata):
Reported by Pylint.
Line: 4
Column: 1
import pytest
from mitmproxy.contentviews import protobuf
from . import full_eval
datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(tdata):
Reported by Pylint.
Line: 26
Column: 9
def test_format_pbuf(filename, tdata):
path = tdata.path(datadir + filename)
with open(path, "rb") as f:
input = f.read()
with open(path.replace(".bin", "-decoded.bin")) as f:
expected = f.read()
assert protobuf.format_pbuf(input) == expected
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import protobuf
from . import full_eval
datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(tdata):
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.contentviews import protobuf
from . import full_eval
datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(tdata):
v = full_eval(protobuf.ViewProtobuf())
p = tdata.path(datadir + "protobuf01.bin")
Reported by Pylint.
Line: 9
Column: 1
datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(tdata):
v = full_eval(protobuf.ViewProtobuf())
p = tdata.path(datadir + "protobuf01.bin")
with open(p, "rb") as f:
raw = f.read()
Reported by Pylint.
Line: 10
Column: 5
def test_view_protobuf_request(tdata):
v = full_eval(protobuf.ViewProtobuf())
p = tdata.path(datadir + "protobuf01.bin")
with open(p, "rb") as f:
raw = f.read()
content_type, output = v(raw)
Reported by Pylint.
Line: 11
Column: 5
def test_view_protobuf_request(tdata):
v = full_eval(protobuf.ViewProtobuf())
p = tdata.path(datadir + "protobuf01.bin")
with open(p, "rb") as f:
raw = f.read()
content_type, output = v(raw)
assert content_type == "Protobuf"
Reported by Pylint.
Line: 13
Column: 27
v = full_eval(protobuf.ViewProtobuf())
p = tdata.path(datadir + "protobuf01.bin")
with open(p, "rb") as f:
raw = f.read()
content_type, output = v(raw)
assert content_type == "Protobuf"
assert output == [[('text', '1: 3bbc333c-e61c-433b-819a-0b9a8cc103b8')]]
with pytest.raises(ValueError, match="Failed to parse input."):
Reported by Pylint.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
with open(p, "rb") as f:
raw = f.read()
content_type, output = v(raw)
assert content_type == "Protobuf"
assert output == [[('text', '1: 3bbc333c-e61c-433b-819a-0b9a8cc103b8')]]
with pytest.raises(ValueError, match="Failed to parse input."):
v(b'foobar')
Reported by Bandit.
test/mitmproxy/tools/test_dump.py
20 issues
Line: 3
Column: 1
from unittest import mock
import pytest
from mitmproxy import controller
from mitmproxy import log
from mitmproxy import options
from mitmproxy.tools import dump
Reported by Pylint.
Line: 1
Column: 1
from unittest import mock
import pytest
from mitmproxy import controller
from mitmproxy import log
from mitmproxy import options
from mitmproxy.tools import dump
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.tools import dump
class TestDumpMaster:
def mkmaster(self, **opts):
o = options.Options(**opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
Reported by Pylint.
Line: 12
Column: 5
class TestDumpMaster:
def mkmaster(self, **opts):
o = options.Options(**opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
def test_has_error(self):
Reported by Pylint.
Line: 12
Column: 5
class TestDumpMaster:
def mkmaster(self, **opts):
o = options.Options(**opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
def test_has_error(self):
Reported by Pylint.
Line: 13
Column: 9
class TestDumpMaster:
def mkmaster(self, **opts):
o = options.Options(**opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
def test_has_error(self):
m = self.mkmaster()
Reported by Pylint.
Line: 14
Column: 9
class TestDumpMaster:
def mkmaster(self, **opts):
o = options.Options(**opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
def test_has_error(self):
m = self.mkmaster()
ent = log.LogEntry("foo", "error")
Reported by Pylint.
Line: 17
Column: 5
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m
def test_has_error(self):
m = self.mkmaster()
ent = log.LogEntry("foo", "error")
ent.reply = controller.DummyReply()
m.addons.trigger(log.AddLogHook(ent))
assert m.errorcheck.has_errored
Reported by Pylint.
Line: 18
Column: 9
return m
def test_has_error(self):
m = self.mkmaster()
ent = log.LogEntry("foo", "error")
ent.reply = controller.DummyReply()
m.addons.trigger(log.AddLogHook(ent))
assert m.errorcheck.has_errored
Reported by Pylint.
Line: 22
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
ent = log.LogEntry("foo", "error")
ent.reply = controller.DummyReply()
m.addons.trigger(log.AddLogHook(ent))
assert m.errorcheck.has_errored
@pytest.mark.parametrize("termlog", [False, True])
def test_addons_termlog(self, termlog):
with mock.patch('sys.stdout'):
o = options.Options()
Reported by Bandit.
mitmproxy/tools/console/searchable.py
20 issues
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console import signals
class Highlight(urwid.AttrMap):
def __init__(self, t):
urwid.AttrMap.__init__(
Reported by Pylint.
Line: 40
Column: 13
self.find_next(True)
elif key == "m_start":
self.set_focus(0)
self.walker._modified()
elif key == "m_end":
self.set_focus(len(self.walker) - 1)
self.walker._modified()
else:
return super().keypress(size, key)
Reported by Pylint.
Line: 43
Column: 13
self.walker._modified()
elif key == "m_end":
self.set_focus(len(self.walker) - 1)
self.walker._modified()
else:
return super().keypress(size, key)
def set_search(self, text):
self.last_search = text
Reported by Pylint.
Line: 89
Column: 17
if txt and self.search_term in txt:
self.set_highlight(off)
self.set_focus(off, coming_from="above")
self.body._modified()
return
else:
self.set_highlight(None)
signals.status_message.send(message="Search not found.", expire=1)
Reported by Pylint.
Line: 91
Column: 9
self.set_focus(off, coming_from="above")
self.body._modified()
return
else:
self.set_highlight(None)
signals.status_message.send(message="Search not found.", expire=1)
Reported by Pylint.
Line: 1
Column: 1
import urwid
from mitmproxy.tools.console import signals
class Highlight(urwid.AttrMap):
def __init__(self, t):
urwid.AttrMap.__init__(
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.tools.console import signals
class Highlight(urwid.AttrMap):
def __init__(self, t):
urwid.AttrMap.__init__(
self,
urwid.Text(t.text),
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.tools.console import signals
class Highlight(urwid.AttrMap):
def __init__(self, t):
urwid.AttrMap.__init__(
self,
urwid.Text(t.text),
Reported by Pylint.
Line: 8
Column: 5
class Highlight(urwid.AttrMap):
def __init__(self, t):
urwid.AttrMap.__init__(
self,
urwid.Text(t.text),
"focusfield",
)
Reported by Pylint.
Line: 17
Column: 1
self.backup = t
class Searchable(urwid.ListBox):
def __init__(self, contents):
self.walker = urwid.SimpleFocusListWalker(contents)
urwid.ListBox.__init__(self, self.walker)
self.search_offset = 0
Reported by Pylint.
mitmproxy/tools/console/tabs.py
19 issues
Line: 1
Column: 1
import urwid
class Tab(urwid.WidgetWrap):
def __init__(self, offset, content, attr, onclick):
"""
onclick is called on click with the tab offset as argument
"""
Reported by Pylint.
Line: 17
Column: 48
self.offset = offset
self.onclick = onclick
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.onclick(self.offset)
return True
Reported by Pylint.
Line: 17
Column: 27
self.offset = offset
self.onclick = onclick
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.onclick(self.offset)
return True
Reported by Pylint.
Line: 17
Column: 58
self.offset = offset
self.onclick = onclick
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.onclick(self.offset)
return True
Reported by Pylint.
Line: 17
Column: 53
self.offset = offset
self.onclick = onclick
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
self.onclick(self.offset)
return True
Reported by Pylint.
Line: 1
Column: 1
import urwid
class Tab(urwid.WidgetWrap):
def __init__(self, offset, content, attr, onclick):
"""
onclick is called on click with the tab offset as argument
"""
Reported by Pylint.
Line: 4
Column: 1
import urwid
class Tab(urwid.WidgetWrap):
def __init__(self, offset, content, attr, onclick):
"""
onclick is called on click with the tab offset as argument
"""
Reported by Pylint.
Line: 4
Column: 1
import urwid
class Tab(urwid.WidgetWrap):
def __init__(self, offset, content, attr, onclick):
"""
onclick is called on click with the tab offset as argument
"""
Reported by Pylint.
Line: 10
Column: 9
"""
onclick is called on click with the tab offset as argument
"""
p = urwid.Text(content, align="center")
p = urwid.Padding(p, align="center", width=("relative", 100))
p = urwid.AttrWrap(p, attr)
urwid.WidgetWrap.__init__(self, p)
self.offset = offset
self.onclick = onclick
Reported by Pylint.
Line: 11
Column: 9
onclick is called on click with the tab offset as argument
"""
p = urwid.Text(content, align="center")
p = urwid.Padding(p, align="center", width=("relative", 100))
p = urwid.AttrWrap(p, attr)
urwid.WidgetWrap.__init__(self, p)
self.offset = offset
self.onclick = onclick
Reported by Pylint.
mitmproxy/addons/modifyheaders.py
19 issues
Line: 39
Column: 13
try:
re.compile(subject)
except re.error as e:
raise ValueError(f"Invalid regular expression {subject!r} ({e})")
spec = ModifySpec(flow_filter, subject, replacement)
try:
spec.read_replacement()
Reported by Pylint.
Line: 46
Column: 9
try:
spec.read_replacement()
except OSError as e:
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
return spec
class ModifyHeaders:
Reported by Pylint.
Line: 1
Column: 1
import re
import typing
from pathlib import Path
from mitmproxy import ctx, exceptions, flowfilter, http
from mitmproxy.http import Headers
from mitmproxy.utils import strutils
from mitmproxy.utils.spec import parse_spec
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.utils.spec import parse_spec
class ModifySpec(typing.NamedTuple):
matches: flowfilter.TFilter
subject: bytes
replacement_str: str
def read_replacement(self) -> bytes:
Reported by Pylint.
Line: 24
Column: 9
Raises:
- IOError if the file cannot be read.
"""
if self.replacement_str.startswith("@"):
return Path(self.replacement_str[1:]).expanduser().read_bytes()
else:
# We could cache this at some point, but unlikely to be a problem.
return strutils.escaped_str_to_bytes(self.replacement_str)
Reported by Pylint.
Line: 31
Column: 1
return strutils.escaped_str_to_bytes(self.replacement_str)
def parse_modify_spec(option: str, subject_is_regex: bool) -> ModifySpec:
flow_filter, subject_str, replacement = parse_spec(option)
subject = strutils.escaped_str_to_bytes(subject_str)
if subject_is_regex:
try:
Reported by Pylint.
Line: 38
Column: 9
if subject_is_regex:
try:
re.compile(subject)
except re.error as e:
raise ValueError(f"Invalid regular expression {subject!r} ({e})")
spec = ModifySpec(flow_filter, subject, replacement)
try:
Reported by Pylint.
Line: 45
Column: 5
try:
spec.read_replacement()
except OSError as e:
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
return spec
Reported by Pylint.
Line: 51
Column: 1
return spec
class ModifyHeaders:
def __init__(self):
self.replacements: typing.List[ModifySpec] = []
def load(self, loader):
loader.add_option(
Reported by Pylint.
Line: 55
Column: 5
def __init__(self):
self.replacements: typing.List[ModifySpec] = []
def load(self, loader):
loader.add_option(
"modify_headers", typing.Sequence[str], [],
"""
Header modify pattern of the form "[/flow-filter]/header-name/[@]header-value", where the
separator can be any character. The @ allows to provide a file path that is used to read
Reported by Pylint.
mitmproxy/addons/maplocal.py
19 issues
Line: 20
Column: 5
def parse_map_local_spec(option: str) -> MapLocalSpec:
filter, regex, replacement = parse_spec(option)
try:
re.compile(regex)
except re.error as e:
raise ValueError(f"Invalid regular expression {regex!r} ({e})")
Reported by Pylint.
Line: 25
Column: 9
try:
re.compile(regex)
except re.error as e:
raise ValueError(f"Invalid regular expression {regex!r} ({e})")
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
Reported by Pylint.
Line: 30
Column: 9
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
return MapLocalSpec(filter, regex, path)
def _safe_path_join(root: Path, untrusted: str) -> Path:
Reported by Pylint.
Line: 1
Column: 1
import mimetypes
import re
import typing
import urllib.parse
from pathlib import Path
from werkzeug.security import safe_join
from mitmproxy import ctx, exceptions, flowfilter, http, version
Reported by Pylint.
Line: 13
Column: 1
from mitmproxy.utils.spec import parse_spec
class MapLocalSpec(typing.NamedTuple):
matches: flowfilter.TFilter
regex: str
local_path: Path
Reported by Pylint.
Line: 19
Column: 1
local_path: Path
def parse_map_local_spec(option: str) -> MapLocalSpec:
filter, regex, replacement = parse_spec(option)
try:
re.compile(regex)
except re.error as e:
Reported by Pylint.
Line: 24
Column: 5
try:
re.compile(regex)
except re.error as e:
raise ValueError(f"Invalid regular expression {regex!r} ({e})")
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
Reported by Pylint.
Line: 29
Column: 5
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
return MapLocalSpec(filter, regex, path)
Reported by Pylint.
Line: 55
Column: 5
Get all potential file candidates given a URL and a mapping spec ordered by preference.
This function already assumes that the spec regex matches the URL.
"""
m = re.search(spec.regex, url)
assert m
if m.groups():
suffix = m.group(1)
else:
suffix = re.split(spec.regex, url, maxsplit=1)[1]
Reported by Pylint.
Line: 56
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
This function already assumes that the spec regex matches the URL.
"""
m = re.search(spec.regex, url)
assert m
if m.groups():
suffix = m.group(1)
else:
suffix = re.split(spec.regex, url, maxsplit=1)[1]
suffix = suffix.split("?")[0] # remove query string
Reported by Bandit.
test/mitmproxy/io/test_io.py
19 issues
Line: 3
Column: 1
import io
import pytest
from hypothesis import example, given
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
Reported by Pylint.
Line: 4
Column: 1
import io
import pytest
from hypothesis import example, given
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
Reported by Pylint.
Line: 5
Column: 1
import pytest
from hypothesis import example, given
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
Reported by Pylint.
Line: 7
Column: 1
from hypothesis import example, given
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
class TestFlowReader:
@given(binary())
Reported by Pylint.
Line: 8
Column: 1
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
class TestFlowReader:
@given(binary())
@example(b'51:11:12345678901#4:this,8:true!0:~,4:true!0:]4:\\x00,~}')
Reported by Pylint.
Line: 1
Column: 1
import io
import pytest
from hypothesis import example, given
from hypothesis.strategies import binary
from mitmproxy import exceptions, version
from mitmproxy.io import FlowReader, tnetstring
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.io import FlowReader, tnetstring
class TestFlowReader:
@given(binary())
@example(b'51:11:12345678901#4:this,8:true!0:~,4:true!0:]4:\\x00,~}')
@example(b'0:')
def test_fuzz(self, data):
f = io.BytesIO(data)
Reported by Pylint.
Line: 15
Column: 5
@given(binary())
@example(b'51:11:12345678901#4:this,8:true!0:~,4:true!0:]4:\\x00,~}')
@example(b'0:')
def test_fuzz(self, data):
f = io.BytesIO(data)
reader = FlowReader(f)
try:
for _ in reader.stream():
pass
Reported by Pylint.
Line: 15
Column: 5
@given(binary())
@example(b'51:11:12345678901#4:this,8:true!0:~,4:true!0:]4:\\x00,~}')
@example(b'0:')
def test_fuzz(self, data):
f = io.BytesIO(data)
reader = FlowReader(f)
try:
for _ in reader.stream():
pass
Reported by Pylint.
Line: 16
Column: 9
@example(b'51:11:12345678901#4:this,8:true!0:~,4:true!0:]4:\\x00,~}')
@example(b'0:')
def test_fuzz(self, data):
f = io.BytesIO(data)
reader = FlowReader(f)
try:
for _ in reader.stream():
pass
except exceptions.FlowReadException:
Reported by Pylint.
mitmproxy/addons/intercept.py
19 issues
Line: 1
Column: 1
import typing
from mitmproxy import flow, flowfilter
from mitmproxy import exceptions
from mitmproxy import ctx
class Intercept:
filt: typing.Optional[flowfilter.TFilter] = None
Reported by Pylint.
Line: 8
Column: 1
from mitmproxy import ctx
class Intercept:
filt: typing.Optional[flowfilter.TFilter] = None
def load(self, loader):
loader.add_option(
"intercept_active", bool, False,
Reported by Pylint.
Line: 11
Column: 5
class Intercept:
filt: typing.Optional[flowfilter.TFilter] = None
def load(self, loader):
loader.add_option(
"intercept_active", bool, False,
"Intercept toggle"
)
loader.add_option(
Reported by Pylint.
Line: 11
Column: 5
class Intercept:
filt: typing.Optional[flowfilter.TFilter] = None
def load(self, loader):
loader.add_option(
"intercept_active", bool, False,
"Intercept toggle"
)
loader.add_option(
Reported by Pylint.
Line: 21
Column: 5
"Intercept filter expression."
)
def configure(self, updated):
if "intercept" in updated:
if ctx.options.intercept:
self.filt = flowfilter.parse(ctx.options.intercept)
if not self.filt:
raise exceptions.OptionsError(f"Invalid interception filter: {ctx.options.intercept}")
Reported by Pylint.
Line: 26
Column: 1
if ctx.options.intercept:
self.filt = flowfilter.parse(ctx.options.intercept)
if not self.filt:
raise exceptions.OptionsError(f"Invalid interception filter: {ctx.options.intercept}")
ctx.options.intercept_active = True
else:
self.filt = None
ctx.options.intercept_active = False
Reported by Pylint.
Line: 32
Column: 5
self.filt = None
ctx.options.intercept_active = False
def should_intercept(self, f: flow.Flow) -> bool:
return bool(
ctx.options.intercept_active
and self.filt
and self.filt(f)
and not f.is_replay
Reported by Pylint.
Line: 32
Column: 5
self.filt = None
ctx.options.intercept_active = False
def should_intercept(self, f: flow.Flow) -> bool:
return bool(
ctx.options.intercept_active
and self.filt
and self.filt(f)
and not f.is_replay
Reported by Pylint.
Line: 40
Column: 5
and not f.is_replay
)
def process_flow(self, f: flow.Flow) -> None:
if self.should_intercept(f):
assert f.reply
if f.reply.state != "start":
return ctx.log.debug("Cannot intercept request that is already taken by another addon.")
f.intercept()
Reported by Pylint.
Line: 40
Column: 5
and not f.is_replay
)
def process_flow(self, f: flow.Flow) -> None:
if self.should_intercept(f):
assert f.reply
if f.reply.state != "start":
return ctx.log.debug("Cannot intercept request that is already taken by another addon.")
f.intercept()
Reported by Pylint.