/jobs/submit
import os
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import requests
import json
api_endpoint = "https://live.deck.co/api/v1/jobs/submit"
# Your API keys
client_id = "xxx"
secret = "xxx"
# Your encryption key
encryption_key = "xxx"
# Prepare your input to encrypt
input = {
"source_guid": "xxx",
"username": "xxx",
"password": "xxx"
}
class AESEncryptionHelper:
KEY_SIZE = 32 # 256 bits / 8
NONCE_SIZE = 12 # 96 bits / 8
TAG_SIZE = 16 # 128 bits / 8
@staticmethod
def generate_key():
"""
Generates a new AES-256 key suitable for use with AES-GCM.
Note: This is for testing and demonstration purposes only.
Valid encryption keys for production use will be provided by Deck.
"""
key = os.urandom(AESEncryptionHelper.KEY_SIZE)
return base64.b64encode(key).decode('utf-8')
@staticmethod
def encrypt(plaintext, base64_key):
"""Encrypts a plaintext string using AES-256-GCM."""
if not plaintext:
raise ValueError("Plaintext cannot be empty")
if not base64_key:
raise ValueError("Key cannot be empty")
key = base64.b64decode(base64_key)
if len(key) != AESEncryptionHelper.KEY_SIZE:
raise ValueError(f"Key must be {AESEncryptionHelper.KEY_SIZE} bytes")
plaintext_bytes = plaintext.encode('utf-8')
# Generate nonce
nonce = os.urandom(AESEncryptionHelper.NONCE_SIZE)
# Encrypt
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext_bytes, None)
# Extract tag (last 16 bytes) and actual ciphertext
actual_ciphertext = ciphertext[:-AESEncryptionHelper.TAG_SIZE]
tag = ciphertext[-AESEncryptionHelper.TAG_SIZE:]
# Combine: nonce + ciphertext + tag
result = nonce + actual_ciphertext + tag
return base64.b64encode(result).decode('utf-8')
@staticmethod
def decrypt(encrypted_data, base64_key):
"""Decrypts data that was encrypted using AES-256-GCM."""
if not encrypted_data:
raise ValueError("Encrypted data cannot be empty")
if not base64_key:
raise ValueError("Key cannot be empty")
key = base64.b64decode(base64_key)
if len(key) != AESEncryptionHelper.KEY_SIZE:
raise ValueError(f"Key must be {AESEncryptionHelper.KEY_SIZE} bytes")
data = base64.b64decode(encrypted_data)
if len(data) < AESEncryptionHelper.NONCE_SIZE + AESEncryptionHelper.TAG_SIZE:
raise ValueError("Invalid encrypted data format")
# Extract components
nonce = data[:AESEncryptionHelper.NONCE_SIZE]
ciphertext = data[AESEncryptionHelper.NONCE_SIZE:-AESEncryptionHelper.TAG_SIZE]
tag = data[-AESEncryptionHelper.TAG_SIZE:]
# Decrypt
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(nonce, ciphertext + tag, None)
return plaintext.decode('utf-8')
# Initialize encryption helper
helper = AESEncryptionHelper()
# Convert to JSON and encrypt
json_input = json.dumps(input)
encrypted_input = helper.encrypt(json_input, encryption_key)
# Send to API with encrypted input
payload = {
"job_code": "EnsureConnection",
"input": encrypted_input
}
print("Request Body:")
print(json.dumps(payload, indent=4))
response = requests.post(
api_endpoint,
headers={
"Content-Type": "application/json+encrypted",
"x-deck-client-id": client_id,
"x-deck-secret": secret
},
json=payload
)
# Receive the unencrypted response
if response.status_code == 200 or response.status_code == 202:
response_data = response.json()
print("\r\nResponse Body:")
print(response_data)
else:
print(response.status_code)
Request Body:
{
"job_code": "EnsureConnection",
"input": "h7ukEzOQdAbp4Nhr+584KaWN9SigCJVcYYKNK2vsviW80Nyzy9OaqmAod1csatH6wS7GvNZicj3EL2n5O5VOjtt1aMV3/1vcEz/ztj8PeoFhzeHBe3bz6tngeTmGcAFyQFOUTWQG3d/E9RQiDVdbIF5OVjxdZW75n+9/H8HmDg6LiZ02BEqA5TzNncBl0ZEWlDetXAuR7o8pUg=="
}
Response Body:
{'job_guid': '20ada455-d577-42de-7762-08ddcbf231fd', 'job_code': 'EnsureConnection'}