The following issues were found
mitmproxy/utils/debug.py
24 issues
Line: 36
Column: 13
print("=======")
try:
import psutil
except:
print("(psutil not installed, skipping some debug info)")
else:
p = psutil.Process()
print("num threads: ", p.num_threads())
Reported by Pylint.
Line: 29
Column: 15
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 29
Column: 15
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 29
Column: 28
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
Reported by Pylint.
Line: 37
Column: 9
try:
import psutil
except:
print("(psutil not installed, skipping some debug info)")
else:
p = psutil.Process()
print("num threads: ", p.num_threads())
if hasattr(p, "num_fds"):
Reported by Pylint.
Line: 67
Column: 37
bthreads.append(i)
else:
print(i.name)
bthreads.sort(key=lambda x: x._thread_started)
for i in bthreads:
print(i._threadinfo())
print()
print("Memory")
Reported by Pylint.
Line: 69
Column: 19
print(i.name)
bthreads.sort(key=lambda x: x._thread_started)
for i in bthreads:
print(i._threadinfo())
print()
print("Memory")
print("=======")
gc.collect()
Reported by Pylint.
Line: 106
Column: 17
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.
Line: 106
Column: 30
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.
Line: 106
Column: 17
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout, testing=False):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append(
"\n# Thread: %s(%d)" % (
Reported by Pylint.
mitmproxy/test/tflow.py
24 issues
Line: 3
Column: 1
import uuid
from mitmproxy import connection
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
Reported by Pylint.
Line: 4
Column: 1
import uuid
from mitmproxy import connection
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
Reported by Pylint.
Line: 5
Column: 1
from mitmproxy import connection
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy import connection
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
Reported by Pylint.
Line: 7
Column: 1
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
Reported by Pylint.
Line: 8
Column: 1
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None) -> tcp.TCPFlow:
Reported by Pylint.
Line: 9
Column: 1
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None) -> tcp.TCPFlow:
if client_conn is True:
Reported by Pylint.
Line: 10
Column: 1
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.test.tutils import treq, tresp
from wsproto.frame_protocol import Opcode
def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None) -> tcp.TCPFlow:
if client_conn is True:
client_conn = tclient_conn()
Reported by Pylint.
Line: 34
Column: 5
def twebsocketflow(messages=True, err=None, close_code=None, close_reason='') -> http.HTTPFlow:
flow = http.HTTPFlow(tclient_conn(), tserver_conn())
flow.request = http.Request(
"example.com",
80,
b"GET",
b"http",
Reported by Pylint.
Line: 138
Column: 5
err = terr()
f = DummyFlow(client_conn, server_conn)
f.error = err
f.reply = controller.DummyReply()
return f
def tclient_conn() -> connection.Client:
Reported by Pylint.
mitmproxy/addons/cut.py
24 issues
Line: 14
Column: 1
from mitmproxy.utils import strutils
import mitmproxy.types
import pyperclip
def headername(spec: str):
if not (spec.startswith("header[") and spec.endswith("]")):
raise exceptions.CommandError("Invalid header spec: %s" % spec)
Reported by Pylint.
Line: 51
Column: 3
elif isinstance(part, certs.Cert): # pragma: no cover
return part.to_pem().decode("ascii")
elif isinstance(part, list) and len(part) > 0 and isinstance(part[0], certs.Cert):
# TODO: currently this extracts only the very first cert as PEM-encoded string.
return part[0].to_pem().decode("ascii")
current = part
return str(current or "")
Reported by Pylint.
Line: 1
Column: 1
import io
import csv
import typing
import os.path
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import ctx
Reported by Pylint.
Line: 17
Column: 1
import pyperclip
def headername(spec: str):
if not (spec.startswith("header[") and spec.endswith("]")):
raise exceptions.CommandError("Invalid header spec: %s" % spec)
return spec[len("header["):-1].strip()
Reported by Pylint.
Line: 23
Column: 1
return spec[len("header["):-1].strip()
def is_addr(v):
return isinstance(v, tuple) and len(v) > 1
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
path = cut.split(".")
Reported by Pylint.
Line: 23
Column: 1
return spec[len("header["):-1].strip()
def is_addr(v):
return isinstance(v, tuple) and len(v) > 1
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
path = cut.split(".")
Reported by Pylint.
Line: 27
Column: 1
return isinstance(v, tuple) and len(v) > 1
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
path = cut.split(".")
current: typing.Any = f
for i, spec in enumerate(path):
if spec.startswith("_"):
raise exceptions.CommandError("Can't access internal attribute %s" % spec)
Reported by Pylint.
Line: 27
Column: 1
return isinstance(v, tuple) and len(v) > 1
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
path = cut.split(".")
current: typing.Any = f
for i, spec in enumerate(path):
if spec.startswith("_"):
raise exceptions.CommandError("Can't access internal attribute %s" % spec)
Reported by Pylint.
Line: 27
Column: 1
return isinstance(v, tuple) and len(v) > 1
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
path = cut.split(".")
current: typing.Any = f
for i, spec in enumerate(path):
if spec.startswith("_"):
raise exceptions.CommandError("Can't access internal attribute %s" % spec)
Reported by Pylint.
Line: 38
Column: 13
if i == len(path) - 1:
if spec == "port" and is_addr(current):
return str(current[1])
if spec == "host" and is_addr(current):
return str(current[0])
elif spec.startswith("header["):
if not current:
return ""
return current.headers.get(headername(spec), "")
Reported by Pylint.
mitmproxy/addons/clientplayback.py
24 issues
Line: 32
Column: 24
"""
flow: http.HTTPFlow
def __init__(self, flow: http.HTTPFlow, context: Context):
super().__init__(context, context.client)
self.flow = flow
def _handle_event(self, event: events.Event) -> CommandGenerator[None]:
if isinstance(event, events.Start):
Reported by Pylint.
Line: 49
Column: 3
if content:
yield layers.http.ReceiveHttp(layers.http.RequestData(1, content))
if self.flow.request.trailers: # pragma: no cover
# TODO: Cover this once we support HTTP/1 trailers.
yield layers.http.ReceiveHttp(layers.http.RequestTrailers(1, self.flow.request.trailers))
yield layers.http.ReceiveHttp(layers.http.RequestEndOfMessage(1))
elif isinstance(event, (
layers.http.ResponseHeaders,
layers.http.ResponseData,
Reported by Pylint.
Line: 67
Column: 24
class ReplayHandler(server.ConnectionHandler):
layer: layers.HttpLayer
def __init__(self, flow: http.HTTPFlow, options: Options) -> None:
client = flow.client_conn.copy()
client.state = ConnectionState.OPEN
context = Context(client, options)
context.server = Server(
Reported by Pylint.
Line: 137
Column: 20
try:
h = ReplayHandler(self.inflight, self.options)
await h.replay()
except Exception:
ctx.log(f"Client replay has crashed!\n{traceback.format_exc()}", "error")
self.queue.task_done()
self.inflight = None
def check(self, f: flow.Flow) -> typing.Optional[str]:
Reported by Pylint.
Line: 1
Column: 1
import asyncio
import time
import traceback
import typing
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import exceptions
Reported by Pylint.
Line: 50
Column: 1
yield layers.http.ReceiveHttp(layers.http.RequestData(1, content))
if self.flow.request.trailers: # pragma: no cover
# TODO: Cover this once we support HTTP/1 trailers.
yield layers.http.ReceiveHttp(layers.http.RequestTrailers(1, self.flow.request.trailers))
yield layers.http.ReceiveHttp(layers.http.RequestEndOfMessage(1))
elif isinstance(event, (
layers.http.ResponseHeaders,
layers.http.ResponseData,
layers.http.ResponseTrailers,
Reported by Pylint.
Line: 64
Column: 1
ctx.log(f"Unexpected event during replay: {event}")
class ReplayHandler(server.ConnectionHandler):
layer: layers.HttpLayer
def __init__(self, flow: http.HTTPFlow, options: Options) -> None:
client = flow.client_conn.copy()
client.state = ConnectionState.OPEN
Reported by Pylint.
Line: 86
Column: 5
self.flow = flow
self.done = asyncio.Event()
async def replay(self) -> None:
self.server_event(events.Start())
await self.done.wait()
def log(self, message: str, level: str = "info") -> None:
ctx.log(f"[replay] {message}", level)
Reported by Pylint.
Line: 101
Column: 21
if isinstance(hook, (layers.http.HttpResponseHook, layers.http.HttpErrorHook)):
if self.transports:
# close server connections
for x in self.transports.values():
if x.handler:
x.handler.cancel()
await asyncio.wait([x.handler for x in self.transports.values() if x.handler])
# signal completion
self.done.set()
Reported by Pylint.
Line: 109
Column: 1
self.done.set()
class ClientPlayback:
playback_task: typing.Optional[asyncio.Task] = None
inflight: typing.Optional[http.HTTPFlow]
queue: asyncio.Queue
options: Options
Reported by Pylint.
mitmproxy/tools/web/master.py
24 issues
Line: 1
Column: 1
import tornado.httpserver
import tornado.ioloop
from tornado.platform.asyncio import AsyncIOMainLoop
from mitmproxy import addons
from mitmproxy import log
from mitmproxy import master
from mitmproxy import optmanager
from mitmproxy.addons import eventstore
Reported by Pylint.
Line: 2
Column: 1
import tornado.httpserver
import tornado.ioloop
from tornado.platform.asyncio import AsyncIOMainLoop
from mitmproxy import addons
from mitmproxy import log
from mitmproxy import master
from mitmproxy import optmanager
from mitmproxy.addons import eventstore
Reported by Pylint.
Line: 3
Column: 1
import tornado.httpserver
import tornado.ioloop
from tornado.platform.asyncio import AsyncIOMainLoop
from mitmproxy import addons
from mitmproxy import log
from mitmproxy import master
from mitmproxy import optmanager
from mitmproxy.addons import eventstore
Reported by Pylint.
Line: 48
Column: 29
self, self.options.web_debug
)
def _sig_view_add(self, view, flow):
app.ClientConnection.broadcast(
resource="flows",
cmd="add",
data=app.flow_to_json(flow)
)
Reported by Pylint.
Line: 48
Column: 29
self, self.options.web_debug
)
def _sig_view_add(self, view, flow):
app.ClientConnection.broadcast(
resource="flows",
cmd="add",
data=app.flow_to_json(flow)
)
Reported by Pylint.
Line: 55
Column: 32
data=app.flow_to_json(flow)
)
def _sig_view_update(self, view, flow):
app.ClientConnection.broadcast(
resource="flows",
cmd="update",
data=app.flow_to_json(flow)
)
Reported by Pylint.
Line: 55
Column: 32
data=app.flow_to_json(flow)
)
def _sig_view_update(self, view, flow):
app.ClientConnection.broadcast(
resource="flows",
cmd="update",
data=app.flow_to_json(flow)
)
Reported by Pylint.
Line: 62
Column: 32
data=app.flow_to_json(flow)
)
def _sig_view_remove(self, view, flow, index):
app.ClientConnection.broadcast(
resource="flows",
cmd="remove",
data=flow.id
)
Reported by Pylint.
Line: 62
Column: 44
data=app.flow_to_json(flow)
)
def _sig_view_remove(self, view, flow, index):
app.ClientConnection.broadcast(
resource="flows",
cmd="remove",
data=flow.id
)
Reported by Pylint.
Line: 62
Column: 32
data=app.flow_to_json(flow)
)
def _sig_view_remove(self, view, flow, index):
app.ClientConnection.broadcast(
resource="flows",
cmd="remove",
data=flow.id
)
Reported by Pylint.
mitmproxy/proxy/layers/modes.py
24 issues
Line: 63
Column: 9
@expect(events.Start)
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
assert platform.original_addr is not None
socket = yield commands.GetSocket(self.context.client)
try:
self.context.server.address = platform.original_addr(socket)
except Exception as e:
yield commands.Log(f"Transparent mode failure: {e!r}")
Reported by Pylint.
Line: 66
Column: 16
socket = yield commands.GetSocket(self.context.client)
try:
self.context.server.address = platform.original_addr(socket)
except Exception as e:
yield commands.Log(f"Transparent mode failure: {e!r}")
self.child_layer = layer.NextLayer(self.context)
err = yield from self.finish_start()
Reported by Pylint.
Line: 1
Column: 1
import socket
import struct
from abc import ABCMeta
from typing import Optional
from mitmproxy import platform
from mitmproxy.net import server_spec
from mitmproxy.proxy import commands, events, layer
from mitmproxy.proxy.layers import tls
Reported by Pylint.
Line: 13
Column: 1
from mitmproxy.proxy.utils import expect
class HttpProxy(layer.Layer):
@expect(events.Start)
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
child_layer = layer.NextLayer(self.context)
self._handle_event = child_layer.handle_event
yield from child_layer.handle_event(event)
Reported by Pylint.
Line: 25
Column: 5
"""Base layer for layers that gather connection destination info and then delegate."""
child_layer: layer.Layer
def finish_start(self) -> layer.CommandGenerator[Optional[str]]:
if self.context.options.connection_strategy == "eager" and self.context.server.address:
err = yield commands.OpenConnection(self.context.server)
if err:
self._handle_event = self.done # type: ignore
return err
Reported by Pylint.
Line: 37
Column: 5
return None
@expect(events.DataReceived, events.ConnectionClosed)
def done(self, _) -> layer.CommandGenerator[None]:
yield from ()
class ReverseProxy(DestinationKnown):
@expect(events.Start)
Reported by Pylint.
Line: 37
Column: 5
return None
@expect(events.DataReceived, events.ConnectionClosed)
def done(self, _) -> layer.CommandGenerator[None]:
yield from ()
class ReverseProxy(DestinationKnown):
@expect(events.Start)
Reported by Pylint.
Line: 41
Column: 1
yield from ()
class ReverseProxy(DestinationKnown):
@expect(events.Start)
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
spec = server_spec.parse_with_mode(self.context.options.mode)[1]
self.context.server.address = spec.address
Reported by Pylint.
Line: 59
Column: 1
yield commands.CloseConnection(self.context.client)
class TransparentProxy(DestinationKnown):
@expect(events.Start)
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
assert platform.original_addr is not None
socket = yield commands.GetSocket(self.context.client)
try:
Reported by Pylint.
Line: 62
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
class TransparentProxy(DestinationKnown):
@expect(events.Start)
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
assert platform.original_addr is not None
socket = yield commands.GetSocket(self.context.client)
try:
self.context.server.address = platform.original_addr(socket)
except Exception as e:
yield commands.Log(f"Transparent mode failure: {e!r}")
Reported by Bandit.
examples/contrib/test_jsondump.py
23 issues
Line: 4
Column: 1
import json
import base64
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
import requests_mock
Reported by Pylint.
Line: 5
Column: 1
import base64
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
import requests_mock
example_dir = tutils.test_data.push("../examples")
Reported by Pylint.
Line: 6
Column: 1
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
import requests_mock
example_dir = tutils.test_data.push("../examples")
Reported by Pylint.
Line: 8
Column: 1
from mitmproxy.test import tutils
from mitmproxy.test import taddons
import requests_mock
example_dir = tutils.test_data.push("../examples")
class TestJSONDump:
Reported by Pylint.
Line: 14
Column: 38
class TestJSONDump:
def echo_response(self, request, context):
self.request = {'json': request.json(), 'headers': request.headers}
return ''
def flow(self, resp_content=b'message'):
times = dict(
Reported by Pylint.
Line: 15
Column: 9
class TestJSONDump:
def echo_response(self, request, context):
self.request = {'json': request.json(), 'headers': request.headers}
return ''
def flow(self, resp_content=b'message'):
times = dict(
timestamp_start=746203272,
Reported by Pylint.
Line: 56
Column: 25
entry = json.loads(inp.readline())
assert entry['response']['content'] == base64.b64encode(content).decode('utf-8')
def test_http(self, tmpdir):
with requests_mock.Mocker() as mock:
mock.post('http://my-server', text=self.echo_response)
with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/jsondump.py"))
tctx.configure(a, dump_destination='http://my-server',
Reported by Pylint.
Line: 1
Column: 1
import json
import base64
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
import requests_mock
Reported by Pylint.
Line: 13
Column: 1
example_dir = tutils.test_data.push("../examples")
class TestJSONDump:
def echo_response(self, request, context):
self.request = {'json': request.json(), 'headers': request.headers}
return ''
def flow(self, resp_content=b'message'):
Reported by Pylint.
Line: 14
Column: 5
class TestJSONDump:
def echo_response(self, request, context):
self.request = {'json': request.json(), 'headers': request.headers}
return ''
def flow(self, resp_content=b'message'):
times = dict(
Reported by Pylint.
docs/scripts/clirecording/screenplays.py
23 issues
Line: 1
Column: 1
#!/usr/bin/env python3
from clidirector import CliDirector
def record_user_interface(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
Reported by Pylint.
Line: 6
Column: 1
from clidirector import CliDirector
def record_user_interface(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_user_interface.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the user interface.")
Reported by Pylint.
Line: 6
Column: 1
from clidirector import CliDirector
def record_user_interface(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_user_interface.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the user interface.")
Reported by Pylint.
Line: 6
Column: 1
from clidirector import CliDirector
def record_user_interface(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_user_interface.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the user interface.")
Reported by Pylint.
Line: 103
Column: 1
d.end()
def record_intercept_requests(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_intercept_requests.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the interception of requests.")
Reported by Pylint.
Line: 103
Column: 1
d.end()
def record_intercept_requests(d: CliDirector):
tmux = d.start_session(width=120, height=36)
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_intercept_requests.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the interception of requests.")
Reported by Pylint.
Line: 108
Column: 1
window = tmux.attached_window
d.start_recording("recordings/mitmproxy_intercept_requests.cast")
d.message("Welcome to the mitmproxy tutorial. In this lesson we cover the interception of requests.")
d.pause(1)
d.exec("mitmproxy")
d.pause(3)
d.message("We first need to configure mitmproxy to intercept requests.")
Reported by Pylint.
Line: 123
Column: 1
d.message("Additionally, we use the filter `~q` to only intercept requests, but not responses.")
d.message("We combine both flow filters using `&`.")
d.message("Enter `~u /Dunedin & ~q` between the quotes of the `set intercept` command and press `ENTER`.")
d.exec("~u /Dunedin & ~q")
d.message("The bottom bar shows that the interception has been configured.")
d.message("Let’s generate a request using `curl` in a separate terminal.")
Reported by Pylint.
Line: 143
Column: 1
d.message("You see a new line in in the list of flows.")
d.message("The new flow is displayed in red to indicate that it has been intercepted.")
d.message("Put the focus (`>>`) on the intercepted flow. This is already the case in our example.")
d.message("Press `a` to resume this flow without making any changes.")
d.type("a")
d.pause(2)
d.focus_pane(pane_bottom)
Reported by Pylint.
Line: 159
Column: 1
d.press_key("Down")
d.pause(1)
d.message("Press `X` to kill this flow, i.e., discard it without forwarding it to its final destination `wttr.in`.")
d.type("X")
d.pause(3)
d.message("In the next lesson you will learn to modify intercepted flows.")
d.save_instructions("recordings/mitmproxy_intercept_requests_instructions.json")
Reported by Pylint.
mitmproxy/addons/command_history.py
23 issues
Line: 28
Column: 3
return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
def running(self):
# FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
# confdir or command_history as updated.
self.configure("command_history") # pragma: no cover
def configure(self, updated):
if "command_history" in updated or "confdir" in updated:
Reported by Pylint.
Line: 44
Column: 20
history_str = "\n".join(self.history[-self.VACUUM_SIZE // 2:]) + "\n"
try:
self.history_file.write_text(history_str)
except Exception as e:
ctx.log.alert(f"Failed writing to {self.history_file}: {e}")
@command.command("commands.history.add")
def add_command(self, command: str) -> None:
if not command.strip():
Reported by Pylint.
Line: 48
Column: 27
ctx.log.alert(f"Failed writing to {self.history_file}: {e}")
@command.command("commands.history.add")
def add_command(self, command: str) -> None:
if not command.strip():
return
self.history.append(command)
if ctx.options.command_history:
Reported by Pylint.
Line: 57
Column: 20
try:
with self.history_file.open("a") as f:
f.write(f"{command}\n")
except Exception as e:
ctx.log.alert(f"Failed writing to {self.history_file}: {e}")
self.set_filter('')
@command.command("commands.history.get")
Reported by Pylint.
Line: 72
Column: 20
if self.history_file.exists():
try:
self.history_file.unlink()
except Exception as e:
ctx.log.alert(f"Failed deleting {self.history_file}: {e}")
self.history = []
self.set_filter('')
# Functionality to provide a filtered list that can be iterated through.
Reported by Pylint.
Line: 1
Column: 1
import os
import pathlib
import typing
from mitmproxy import command
from mitmproxy import ctx
class CommandHistory:
Reported by Pylint.
Line: 9
Column: 1
from mitmproxy import ctx
class CommandHistory:
VACUUM_SIZE = 1024
def __init__(self) -> None:
self.history: typing.List[str] = []
self.filtered_history: typing.List[str] = [""]
Reported by Pylint.
Line: 17
Column: 5
self.filtered_history: typing.List[str] = [""]
self.current_index: int = 0
def load(self, loader):
loader.add_option(
"command_history", bool, True,
"""Persist command history between mitmproxy invocations."""
)
Reported by Pylint.
Line: 17
Column: 5
self.filtered_history: typing.List[str] = [""]
self.current_index: int = 0
def load(self, loader):
loader.add_option(
"command_history", bool, True,
"""Persist command history between mitmproxy invocations."""
)
Reported by Pylint.
Line: 24
Column: 5
)
@property
def history_file(self) -> pathlib.Path:
return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
def running(self):
# FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
# confdir or command_history as updated.
Reported by Pylint.
mitmproxy/proxy/layer.py
23 issues
Line: 15
Column: 1
from mitmproxy.proxy.commands import Command, StartHook
from mitmproxy.proxy.context import Context
T = TypeVar('T')
CommandGenerator = Generator[Command, Any, T]
"""
A function annotated with CommandGenerator[bool] may yield commands and ultimately return a boolean value.
"""
Reported by Pylint.
Line: 18
Column: 1
T = TypeVar('T')
CommandGenerator = Generator[Command, Any, T]
"""
A function annotated with CommandGenerator[bool] may yield commands and ultimately return a boolean value.
"""
class Paused(NamedTuple):
"""
Reported by Pylint.
Line: 39
Column: 1
Most layers do not implement .directly, but instead implement ._handle_event, which
is called by the default implementation of .handle_event.
The default implementation of .handle_event allows layers to emulate blocking code:
When ._handle_event yields a command that has its blocking attribute set to True, .handle_event pauses
the execution of ._handle_event and waits until it is called with the corresponding CommandCompleted event.
All events encountered in the meantime are buffered and replayed after execution is resumed.
The result is code that looks like blocking code, but is not blocking:
Reported by Pylint.
Line: 40
Column: 1
is called by the default implementation of .handle_event.
The default implementation of .handle_event allows layers to emulate blocking code:
When ._handle_event yields a command that has its blocking attribute set to True, .handle_event pauses
the execution of ._handle_event and waits until it is called with the corresponding CommandCompleted event.
All events encountered in the meantime are buffered and replayed after execution is resumed.
The result is code that looks like blocking code, but is not blocking:
def _handle_event(self, event):
Reported by Pylint.
Line: 46
Column: 1
The result is code that looks like blocking code, but is not blocking:
def _handle_event(self, event):
err = yield OpenConnection(server) # execution continues here after a connection has been established.
Technically this is very similar to how coroutines are implemented.
"""
__last_debug_message: ClassVar[str] = ""
context: Context
Reported by Pylint.
Line: 118
Column: 5
"""Handle a proxy server event"""
yield from () # pragma: no cover
def handle_event(self, event: events.Event) -> CommandGenerator[None]:
if self._paused:
# did we just receive the reply we were waiting for?
pause_finished = (
isinstance(event, events.CommandCompleted) and
event.command is self._paused.command
Reported by Pylint.
Line: 118
Column: 5
"""Handle a proxy server event"""
yield from () # pragma: no cover
def handle_event(self, event: events.Event) -> CommandGenerator[None]:
if self._paused:
# did we just receive the reply we were waiting for?
pause_finished = (
isinstance(event, events.CommandCompleted) and
event.command is self._paused.command
Reported by Pylint.
Line: 128
Suggestion:
https://bandit.readthedocs.io/en/latest/plugins/b101_assert_used.html
if self.debug is not None:
yield self.__debug(f"{'>>' if pause_finished else '>!'} {event}")
if pause_finished:
assert isinstance(event, events.CommandCompleted)
yield from self.__continue(event)
else:
self._paused_event_queue.append(event)
else:
if self.debug is not None:
Reported by Bandit.
Line: 152
Column: 17
if self.debug is not None:
if not isinstance(command, commands.Log):
yield self.__debug(f"<< {command}")
if command.blocking is True:
# We only want this layer to block, the outer layers should not block.
# For example, take an HTTP/2 connection: If we intercept one particular request,
# we don't want all other requests in the connection to be blocked a well.
# We signal to outer layers that this command is already handled by assigning our layer to
# `.blocking` here (upper layers explicitly check for `is True`).
Reported by Pylint.
Line: 154
Column: 1
yield self.__debug(f"<< {command}")
if command.blocking is True:
# We only want this layer to block, the outer layers should not block.
# For example, take an HTTP/2 connection: If we intercept one particular request,
# we don't want all other requests in the connection to be blocked a well.
# We signal to outer layers that this command is already handled by assigning our layer to
# `.blocking` here (upper layers explicitly check for `is True`).
command.blocking = self
self._paused = Paused(
Reported by Pylint.