The following issues were found
test/mitmproxy/contentviews/test_css.py
17 issues
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import css
from . import full_eval
@pytest.mark.parametrize("filename", [
"animation-keyframe.css",
"blank-lines-and-spaces.css",
Reported by Pylint.
Line: 4
Column: 1
import pytest
from mitmproxy.contentviews import css
from . import full_eval
@pytest.mark.parametrize("filename", [
"animation-keyframe.css",
"blank-lines-and-spaces.css",
Reported by Pylint.
Line: 22
Column: 9
def test_beautify(filename, tdata):
path = tdata.path("mitmproxy/contentviews/test_css_data/" + filename)
with open(path) as f:
input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f:
expected = f.read()
formatted = css.beautify(input)
assert formatted == expected
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.contentviews import css
from . import full_eval
@pytest.mark.parametrize("filename", [
"animation-keyframe.css",
"blank-lines-and-spaces.css",
Reported by Pylint.
Line: 18
Column: 1
"quoted-string.css",
"selectors.css",
"simple.css",
])
def test_beautify(filename, tdata):
path = tdata.path("mitmproxy/contentviews/test_css_data/" + filename)
with open(path) as f:
input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f:
Reported by Pylint.
Line: 21
Column: 24
])
def test_beautify(filename, tdata):
path = tdata.path("mitmproxy/contentviews/test_css_data/" + filename)
with open(path) as f:
input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f:
expected = f.read()
formatted = css.beautify(input)
assert formatted == expected
Reported by Pylint.
Line: 23
Column: 59
path = tdata.path("mitmproxy/contentviews/test_css_data/" + filename)
with open(path) as f:
input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f:
expected = f.read()
formatted = css.beautify(input)
assert formatted == expected
Reported by Pylint.
Line: 26
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
with open("-formatted.".join(path.rsplit(".", 1))) as f:
expected = f.read()
formatted = css.beautify(input)
assert formatted == expected
def test_simple():
v = full_eval(css.ViewCSS())
assert v(b"#foo{color:red}") == ('CSS', [
Reported by Bandit.
Line: 29
Column: 1
assert formatted == expected
def test_simple():
v = full_eval(css.ViewCSS())
assert v(b"#foo{color:red}") == ('CSS', [
[('text', '#foo {')],
[('text', ' color: red')],
[('text', '}')]
Reported by Pylint.
Line: 30
Column: 5
def test_simple():
v = full_eval(css.ViewCSS())
assert v(b"#foo{color:red}") == ('CSS', [
[('text', '#foo {')],
[('text', ' color: red')],
[('text', '}')]
])
Reported by Pylint.
test/mitmproxy/addons/test_eventstore.py
17 issues
Line: 1
Column: 1
from unittest import mock
from mitmproxy import log
from mitmproxy.addons import eventstore
def test_simple():
store = eventstore.EventStore()
assert not store.data
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.addons import eventstore
def test_simple():
store = eventstore.EventStore()
assert not store.data
sig_add = mock.Mock(spec=lambda: 42)
sig_refresh = mock.Mock(spec=lambda: 42)
Reported by Pylint.
Line: 8
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
def test_simple():
store = eventstore.EventStore()
assert not store.data
sig_add = mock.Mock(spec=lambda: 42)
sig_refresh = mock.Mock(spec=lambda: 42)
store.sig_add.connect(sig_add)
store.sig_refresh.connect(sig_refresh)
Reported by Bandit.
Line: 15
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
store.sig_add.connect(sig_add)
store.sig_refresh.connect(sig_refresh)
assert not sig_add.called
assert not sig_refresh.called
# test .log()
store.add_log(log.LogEntry("test", "info"))
assert store.data
Reported by Bandit.
Line: 16
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
store.sig_refresh.connect(sig_refresh)
assert not sig_add.called
assert not sig_refresh.called
# test .log()
store.add_log(log.LogEntry("test", "info"))
assert store.data
Reported by Bandit.
Line: 20
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
# test .log()
store.add_log(log.LogEntry("test", "info"))
assert store.data
assert sig_add.called
assert not sig_refresh.called
# test .clear()
Reported by Bandit.
Line: 22
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
store.add_log(log.LogEntry("test", "info"))
assert store.data
assert sig_add.called
assert not sig_refresh.called
# test .clear()
sig_add.reset_mock()
Reported by Bandit.
Line: 23
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
assert store.data
assert sig_add.called
assert not sig_refresh.called
# test .clear()
sig_add.reset_mock()
store.clear()
Reported by Bandit.
Line: 29
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
sig_add.reset_mock()
store.clear()
assert not store.data
assert not sig_add.called
assert sig_refresh.called
Reported by Bandit.
Line: 31
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
store.clear()
assert not store.data
assert not sig_add.called
assert sig_refresh.called
def test_max_size():
store = eventstore.EventStore(3)
Reported by Bandit.
examples/contrib/check_ssl_pinning.py
17 issues
Line: 1
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
Reported by Pylint.
Line: 2
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
Reported by Pylint.
Line: 3
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
Reported by Pylint.
Line: 83
Column: 29
"""
)
def clientconnect(self, layer):
mitmproxy.certs.dummy_cert = monkey_dummy_cert
Reported by Pylint.
Line: 1
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
Reported by Pylint.
Line: 4
Column: 1
import mitmproxy
from mitmproxy import ctx
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.certs import Cert
import ipaddress
import OpenSSL
import time
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
# the function to generate test cases for SSL Pinning.
Reported by Pylint.
Line: 12
Column: 1
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
# the function to generate test cases for SSL Pinning.
def monkey_dummy_cert(privkey, cacert, commonname, sans):
ss = []
for i in sans:
try:
ipaddress.ip_address(i.decode("ascii"))
except ValueError:
Reported by Pylint.
Line: 12
Column: 1
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
# the function to generate test cases for SSL Pinning.
def monkey_dummy_cert(privkey, cacert, commonname, sans):
ss = []
for i in sans:
try:
ipaddress.ip_address(i.decode("ascii"))
except ValueError:
Reported by Pylint.
Line: 12
Column: 1
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
# the function to generate test cases for SSL Pinning.
def monkey_dummy_cert(privkey, cacert, commonname, sans):
ss = []
for i in sans:
try:
ipaddress.ip_address(i.decode("ascii"))
except ValueError:
Reported by Pylint.
mitmproxy/addons/asgiapp.py
17 issues
Line: 5
Column: 1
import traceback
import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
from mitmproxy.controller import DummyReply
Reported by Pylint.
Line: 6
Column: 1
import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
from mitmproxy.controller import DummyReply
class ASGIApp:
Reported by Pylint.
Line: 133
Column: 32
try:
await app(scope, receive, send)
if not sent_response:
raise RuntimeError(f"no response sent.")
except Exception:
ctx.log.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
flow.response = http.Response.make(500, b"ASGI Error.")
finally:
flow.reply.commit()
Reported by Pylint.
Line: 134
Column: 12
await app(scope, receive, send)
if not sent_response:
raise RuntimeError(f"no response sent.")
except Exception:
ctx.log.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
flow.response = http.Response.make(500, b"ASGI Error.")
finally:
flow.reply.commit()
done.set()
Reported by Pylint.
Line: 1
Column: 1
import asyncio
import traceback
import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
from mitmproxy.controller import DummyReply
Reported by Pylint.
Line: 16
Column: 1
An addon that hosts an ASGI/WSGI HTTP app within mitmproxy, at a specified hostname and port.
Some important caveats:
- This implementation will block and wait until the entire HTTP response is completed before sending out data.
- It currently only implements the HTTP protocol (Lifespan and WebSocket are unimplemented).
"""
def __init__(self, asgi_app, host: str, port: int):
asgi_app = asgiref.compatibility.guarantee_single_callable(asgi_app)
Reported by Pylint.
Line: 25
Column: 5
self.asgi_app, self.host, self.port = asgi_app, host, port
@property
def name(self) -> str:
return f"asgiapp:{self.host}:{self.port}"
def should_serve(self, flow: http.HTTPFlow) -> bool:
assert flow.reply
return bool(
Reported by Pylint.
Line: 28
Column: 5
def name(self) -> str:
return f"asgiapp:{self.host}:{self.port}"
def should_serve(self, flow: http.HTTPFlow) -> bool:
assert flow.reply
return bool(
(flow.request.pretty_host, flow.request.port) == (self.host, self.port)
and flow.reply.state == "start" and not flow.error and not flow.response
and not isinstance(flow.reply, DummyReply) # ignore the HTTP flows of this app loaded from somewhere
Reported by Pylint.
Line: 29
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
return f"asgiapp:{self.host}:{self.port}"
def should_serve(self, flow: http.HTTPFlow) -> bool:
assert flow.reply
return bool(
(flow.request.pretty_host, flow.request.port) == (self.host, self.port)
and flow.reply.state == "start" and not flow.error and not flow.response
and not isinstance(flow.reply, DummyReply) # ignore the HTTP flows of this app loaded from somewhere
)
Reported by Bandit.
Line: 33
Column: 1
return bool(
(flow.request.pretty_host, flow.request.port) == (self.host, self.port)
and flow.reply.state == "start" and not flow.error and not flow.response
and not isinstance(flow.reply, DummyReply) # ignore the HTTP flows of this app loaded from somewhere
)
def request(self, flow: http.HTTPFlow) -> None:
assert flow.reply
if self.should_serve(flow):
Reported by Pylint.
mitmproxy/contrib/kaitaistruct/ico.py
17 issues
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
import struct
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 58
Column: 24
relevant parser, if needed to parse image data further.
"""
if hasattr(self, '_m_img'):
return self._m_img if hasattr(self, '_m_img') else None
_pos = self._io.pos()
self._io.seek(self.ofs_img)
self._m_img = self._io.read_bytes(self.len_img)
self._io.seek(_pos)
Reported by Pylint.
Line: 72
Column: 24
embedded PNG file.
"""
if hasattr(self, '_m_png_header'):
return self._m_png_header if hasattr(self, '_m_png_header') else None
_pos = self._io.pos()
self._io.seek(self.ofs_img)
self._m_png_header = self._io.read_bytes(8)
self._io.seek(_pos)
Reported by Pylint.
Line: 84
Column: 24
def is_png(self):
"""True if this image is in PNG format."""
if hasattr(self, '_m_is_png'):
return self._m_is_png if hasattr(self, '_m_is_png') else None
self._m_is_png = self.png_header == struct.pack('8b', -119, 80, 78, 71, 13, 10, 26, 10)
return self._m_is_png if hasattr(self, '_m_is_png') else None
Reported by Pylint.
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
import struct
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 4
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
import struct
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
Line: 62
Column: 13
_pos = self._io.pos()
self._io.seek(self.ofs_img)
self._m_img = self._io.read_bytes(self.len_img)
self._io.seek(_pos)
return self._m_img if hasattr(self, '_m_img') else None
@property
def png_header(self):
Reported by Pylint.
Line: 76
Column: 13
_pos = self._io.pos()
self._io.seek(self.ofs_img)
self._m_png_header = self._io.read_bytes(8)
self._io.seek(_pos)
return self._m_png_header if hasattr(self, '_m_png_header') else None
@property
def is_png(self):
Reported by Pylint.
Line: 86
Column: 13
if hasattr(self, '_m_is_png'):
return self._m_is_png if hasattr(self, '_m_is_png') else None
self._m_is_png = self.png_header == struct.pack('8b', -119, 80, 78, 71, 13, 10, 26, 10)
return self._m_is_png if hasattr(self, '_m_is_png') else None
Reported by Pylint.
Line: 1
Column: 1
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
import struct
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
Reported by Pylint.
examples/contrib/webscanner_helper/watchdog.py
17 issues
Line: 7
Column: 1
import logging
from datetime import datetime
import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
from mitmproxy.exceptions import HttpSyntaxException
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 8
Column: 1
from datetime import datetime
import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
from mitmproxy.exceptions import HttpSyntaxException
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 9
Column: 1
import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
from mitmproxy.exceptions import HttpSyntaxException
logger = logging.getLogger(__name__)
Reported by Pylint.
Line: 10
Column: 1
import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
from mitmproxy.exceptions import HttpSyntaxException
logger = logging.getLogger(__name__)
class WatchdogAddon():
Reported by Pylint.
Line: 63
Column: 13
and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)):
self.last_trigger = time.time()
logger.error(f"Watchdog triggered! Cause: {flow}")
self.error_event.set()
# save the request which might have caused the problem
if flow.request:
with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f:
Reported by Pylint.
Line: 1
Column: 1
import pathlib
import time
import typing
import logging
from datetime import datetime
import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
Reported by Pylint.
Line: 16
Column: 1
class WatchdogAddon():
""" The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device
under test responds correctls to the scanner's responses.
The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
This information can be used to restart the device under test if necessary.
Reported by Pylint.
Line: 19
Column: 1
""" The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device
under test responds correctls to the scanner's responses.
The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
This information can be used to restart the device under test if necessary.
"""
def __init__(self, event, outdir: pathlib.Path, timeout=None):
Reported by Pylint.
Line: 20
Column: 1
under test responds correctls to the scanner's responses.
The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
This information can be used to restart the device under test if necessary.
"""
def __init__(self, event, outdir: pathlib.Path, timeout=None):
"""Initializes the Watchdog.
Reported by Pylint.
Line: 29
Column: 1
Args:
event: multiprocessing.Event that will be set if the watchdog is triggered.
outdir: path to a directory in which the triggering requests will be saved (curl and raw).
timeout_conn: float that specifies the timeout for the server connection
"""
self.error_event = event
self.flow_dir = outdir
if self.flow_dir.exists() and not self.flow_dir.is_dir():
Reported by Pylint.
mitmproxy/contentviews/image/image_parser.py
16 issues
Line: 4
Column: 1
import io
import typing
from kaitaistruct import KaitaiStream
from mitmproxy.contrib.kaitaistruct import png
from mitmproxy.contrib.kaitaistruct import gif
from mitmproxy.contrib.kaitaistruct import jpeg
from mitmproxy.contrib.kaitaistruct import ico
Reported by Pylint.
Line: 51
Column: 12
ext_blocks.append(block)
comment_blocks = []
for block in ext_blocks:
if block.body.label._name_ == 'comment':
comment_blocks.append(block)
for block in comment_blocks:
entries = block.body.body.entries
for entry in entries:
comment = entry.bytes
Reported by Pylint.
Line: 68
Column: 12
('Format', 'JPEG (ISO 10918)')
]
for segment in img.segments:
if segment.marker._name_ == 'sof0':
parts.append(('Size', f"{segment.data.image_width} x {segment.data.image_height} px"))
if segment.marker._name_ == 'app0':
parts.append(('jfif_version', f"({segment.data.version_major}, {segment.data.version_minor})"))
parts.append(('jfif_density', f"({segment.data.density_x}, {segment.data.density_y})"))
parts.append(('jfif_unit', str(segment.data.density_units._value_)))
Reported by Pylint.
Line: 70
Column: 12
for segment in img.segments:
if segment.marker._name_ == 'sof0':
parts.append(('Size', f"{segment.data.image_width} x {segment.data.image_height} px"))
if segment.marker._name_ == 'app0':
parts.append(('jfif_version', f"({segment.data.version_major}, {segment.data.version_minor})"))
parts.append(('jfif_density', f"({segment.data.density_x}, {segment.data.density_y})"))
parts.append(('jfif_unit', str(segment.data.density_units._value_)))
if segment.marker._name_ == 'com':
parts.append(('comment', str(segment.data)))
Reported by Pylint.
Line: 73
Column: 44
if segment.marker._name_ == 'app0':
parts.append(('jfif_version', f"({segment.data.version_major}, {segment.data.version_minor})"))
parts.append(('jfif_density', f"({segment.data.density_x}, {segment.data.density_y})"))
parts.append(('jfif_unit', str(segment.data.density_units._value_)))
if segment.marker._name_ == 'com':
parts.append(('comment', str(segment.data)))
if segment.marker._name_ == 'app1':
if hasattr(segment.data, 'body'):
for field in segment.data.body.data.body.ifd0.fields:
Reported by Pylint.
Line: 74
Column: 12
parts.append(('jfif_version', f"({segment.data.version_major}, {segment.data.version_minor})"))
parts.append(('jfif_density', f"({segment.data.density_x}, {segment.data.density_y})"))
parts.append(('jfif_unit', str(segment.data.density_units._value_)))
if segment.marker._name_ == 'com':
parts.append(('comment', str(segment.data)))
if segment.marker._name_ == 'app1':
if hasattr(segment.data, 'body'):
for field in segment.data.body.data.body.ifd0.fields:
if field.data is not None:
Reported by Pylint.
Line: 76
Column: 12
parts.append(('jfif_unit', str(segment.data.density_units._value_)))
if segment.marker._name_ == 'com':
parts.append(('comment', str(segment.data)))
if segment.marker._name_ == 'app1':
if hasattr(segment.data, 'body'):
for field in segment.data.body.data.body.ifd0.fields:
if field.data is not None:
parts.append((field.tag._name_, field.data.decode('UTF-8').strip('\x00')))
return parts
Reported by Pylint.
Line: 80
Column: 39
if hasattr(segment.data, 'body'):
for field in segment.data.body.data.body.ifd0.fields:
if field.data is not None:
parts.append((field.tag._name_, field.data.decode('UTF-8').strip('\x00')))
return parts
def parse_ico(data: bytes) -> Metadata:
img = ico.Ico(KaitaiStream(io.BytesIO(data)))
Reported by Pylint.
Line: 1
Column: 1
import io
import typing
from kaitaistruct import KaitaiStream
from mitmproxy.contrib.kaitaistruct import png
from mitmproxy.contrib.kaitaistruct import gif
from mitmproxy.contrib.kaitaistruct import jpeg
from mitmproxy.contrib.kaitaistruct import ico
Reported by Pylint.
Line: 14
Column: 1
Metadata = typing.List[typing.Tuple[str, str]]
def parse_png(data: bytes) -> Metadata:
img = png.Png(KaitaiStream(io.BytesIO(data)))
parts = [
('Format', 'Portable network graphics'),
('Size', f"{img.ihdr.width} x {img.ihdr.height} px")
]
Reported by Pylint.
test/mitmproxy/addons/test_mapremote.py
16 issues
Line: 1
Column: 1
import pytest
from mitmproxy.addons import mapremote
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TestMapRemote:
Reported by Pylint.
Line: 1
Column: 1
import pytest
from mitmproxy.addons import mapremote
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TestMapRemote:
Reported by Pylint.
Line: 8
Column: 1
from mitmproxy.test import tflow
class TestMapRemote:
def test_configure(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(mr, map_remote=["one/two/three"])
Reported by Pylint.
Line: 10
Column: 5
class TestMapRemote:
def test_configure(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(mr, map_remote=["one/two/three"])
with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(mr, map_remote=["/foo/+/three"])
Reported by Pylint.
Line: 10
Column: 5
class TestMapRemote:
def test_configure(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(mr, map_remote=["one/two/three"])
with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(mr, map_remote=["/foo/+/three"])
Reported by Pylint.
Line: 11
Column: 9
class TestMapRemote:
def test_configure(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(mr, map_remote=["one/two/three"])
with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(mr, map_remote=["/foo/+/three"])
Reported by Pylint.
Line: 17
Column: 5
with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(mr, map_remote=["/foo/+/three"])
def test_simple(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(
mr,
map_remote=[
Reported by Pylint.
Line: 17
Column: 5
with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(mr, map_remote=["/foo/+/three"])
def test_simple(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(
mr,
map_remote=[
Reported by Pylint.
Line: 18
Column: 9
tctx.configure(mr, map_remote=["/foo/+/three"])
def test_simple(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(
mr,
map_remote=[
":example.org/images/:mitmproxy.org/img/",
Reported by Pylint.
Line: 26
Column: 13
":example.org/images/:mitmproxy.org/img/",
]
)
f = tflow.tflow()
f.request.url = b"https://example.org/images/test.jpg"
mr.request(f)
assert f.request.url == "https://mitmproxy.org/img/test.jpg"
def test_has_reply(self):
Reported by Pylint.
test/mitmproxy/script/test_concurrent.py
16 issues
Line: 3
Column: 1
import time
import pytest
from mitmproxy import controller
from mitmproxy.test import tflow
from mitmproxy.test import taddons
Reported by Pylint.
Line: 1
Column: 1
import time
import pytest
from mitmproxy import controller
from mitmproxy.test import tflow
from mitmproxy.test import taddons
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.test import taddons
class Thing:
def __init__(self):
self.reply = controller.DummyReply()
self.live = True
Reported by Pylint.
Line: 11
Column: 1
from mitmproxy.test import taddons
class Thing:
def __init__(self):
self.reply = controller.DummyReply()
self.live = True
Reported by Pylint.
Line: 17
Column: 1
self.live = True
class TestConcurrent:
def test_concurrent(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(
tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py"
Reported by Pylint.
Line: 18
Column: 5
class TestConcurrent:
def test_concurrent(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(
tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
Reported by Pylint.
Line: 18
Column: 5
class TestConcurrent:
def test_concurrent(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(
tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
Reported by Pylint.
Line: 20
Column: 13
class TestConcurrent:
def test_concurrent(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(
tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
f1, f2 = tflow.tflow(), tflow.tflow()
Reported by Pylint.
Line: 25
Column: 17
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1)
tctx.cycle(sc, f2)
start = time.time()
while time.time() - start < 5:
if f1.reply.state == f2.reply.state == "committed":
Reported by Pylint.
Line: 25
Column: 13
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1)
tctx.cycle(sc, f2)
start = time.time()
while time.time() - start < 5:
if f1.reply.state == f2.reply.state == "committed":
Reported by Pylint.
mitmproxy/addons/stickycookie.py
16 issues
Line: 59
Column: 3
assert flow.response
if self.flt:
for name, (value, attrs) in flow.response.cookies.items(multi=True):
# FIXME: We now know that Cookie.py screws up some cookies with
# valid RFC 822/1123 datetime specifications for expiry. Sigh.
dom_port_path = ckey(attrs, flow)
if domain_match(flow.request.host, dom_port_path[0]):
if cookies.is_expired(attrs):
Reported by Pylint.
Line: 88
Column: 3
if all(match):
cookie_list.extend(c.items())
if cookie_list:
# FIXME: we need to formalise this...
flow.metadata["stickycookie"] = True
flow.request.headers["cookie"] = cookies.format_cookie_header(cookie_list)
Reported by Pylint.
Line: 1
Column: 1
import collections
from http import cookiejar
from typing import List, Tuple, Dict, Optional # noqa
from mitmproxy import http, flowfilter, ctx, exceptions
from mitmproxy.net.http import cookies
TOrigin = Tuple[str, int, str]
Reported by Pylint.
Line: 11
Column: 1
TOrigin = Tuple[str, int, str]
def ckey(attrs: Dict[str, str], f: http.HTTPFlow) -> TOrigin:
"""
Returns a (domain, port, path) tuple.
"""
domain = f.request.host
path = "/"
Reported by Pylint.
Line: 24
Column: 1
return (domain, f.request.port, path)
def domain_match(a: str, b: str) -> bool:
if cookiejar.domain_match(a, b): # type: ignore
return True
elif cookiejar.domain_match(a, b.strip(".")): # type: ignore
return True
return False
Reported by Pylint.
Line: 24
Column: 1
return (domain, f.request.port, path)
def domain_match(a: str, b: str) -> bool:
if cookiejar.domain_match(a, b): # type: ignore
return True
elif cookiejar.domain_match(a, b.strip(".")): # type: ignore
return True
return False
Reported by Pylint.
Line: 24
Column: 1
return (domain, f.request.port, path)
def domain_match(a: str, b: str) -> bool:
if cookiejar.domain_match(a, b): # type: ignore
return True
elif cookiejar.domain_match(a, b.strip(".")): # type: ignore
return True
return False
Reported by Pylint.
Line: 25
Column: 5
def domain_match(a: str, b: str) -> bool:
if cookiejar.domain_match(a, b): # type: ignore
return True
elif cookiejar.domain_match(a, b.strip(".")): # type: ignore
return True
return False
Reported by Pylint.
Line: 32
Column: 1
return False
class StickyCookie:
def __init__(self):
self.jar: Dict[TOrigin, Dict[str, str]] = collections.defaultdict(dict)
self.flt: Optional[flowfilter.TFilter] = None
def load(self, loader):
Reported by Pylint.
Line: 37
Column: 5
self.jar: Dict[TOrigin, Dict[str, str]] = collections.defaultdict(dict)
self.flt: Optional[flowfilter.TFilter] = None
def load(self, loader):
loader.add_option(
"stickycookie", Optional[str], None,
"Set sticky cookie filter. Matched against requests."
)
Reported by Pylint.