95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
import json
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
schema_sendhttprequest = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "sendhttprequest",
|
|
"description": "Send an HTTP request with full control over method, headers, and body/params.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"url": {"type": "string", "description": "Target URL"},
|
|
"method": {
|
|
"type": "string",
|
|
"enum": ["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
|
|
"description": "HTTP method"
|
|
},
|
|
"authorization": {
|
|
"type": "string",
|
|
"description": "Bearer token or full Authorization header value (e.g. 'Bearer <token>' or 'Basic <base64>')"
|
|
},
|
|
"content_type": {
|
|
"type": "string",
|
|
"description": "Content-Type header (e.g. 'application/json', 'application/x-www-form-urlencoded')"
|
|
},
|
|
"data": {
|
|
"type": "object",
|
|
"description": "JSON body for POST/PUT/PATCH requests (will be serialized as JSON). Ignored for GET/DELETE/HEAD/OPTIONS."
|
|
},
|
|
"params": {
|
|
"type": "object",
|
|
"description": "Query parameters as key-value dict (appended to URL)"
|
|
}
|
|
},
|
|
"required": ["url", "method"]
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def sendhttprequest(url, method, authorization=None, content_type=None, data=None, params=None):
|
|
try:
|
|
if params:
|
|
url_parts = list(urllib.parse.urlparse(url))
|
|
query = dict(urllib.parse.parse_qsl(url_parts[4]))
|
|
query.update(params)
|
|
url_parts[4] = urllib.parse.urlencode(query)
|
|
url = urllib.parse.urlunparse(url_parts)
|
|
|
|
headers = {}
|
|
if authorization:
|
|
headers["Authorization"] = authorization
|
|
if content_type:
|
|
headers["Content-Type"] = content_type
|
|
|
|
body = None
|
|
if data is not None and method.upper() in ("POST", "PUT", "PATCH"):
|
|
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
|
|
if content_type is None:
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
req = urllib.request.Request(url, data=body, headers=headers, method=method.upper())
|
|
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
status = resp.status
|
|
resp_headers = dict(resp.getheaders())
|
|
raw = resp.read()
|
|
content_type_resp = resp_headers.get("Content-Type", "")
|
|
if "application/json" in content_type_resp:
|
|
try:
|
|
resp_body = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
resp_body = raw.decode("utf-8", errors="replace")
|
|
else:
|
|
resp_body = raw.decode("utf-8", errors="replace")
|
|
|
|
return json.dumps({
|
|
"status": status,
|
|
"headers": resp_headers,
|
|
"body": resp_body
|
|
}, ensure_ascii=False, indent=2)
|
|
|
|
except urllib.error.HTTPError as e:
|
|
return json.dumps({
|
|
"status": e.code,
|
|
"headers": dict(e.headers),
|
|
"body": e.read().decode("utf-8", errors="replace")
|
|
}, ensure_ascii=False, indent=2)
|
|
except urllib.error.URLError as e:
|
|
return f"Error: {e.reason}"
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|