Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions ZeepLibrary/ws_wsse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import xmlsec
from zeep import ns
from zeep.utils import detect_soap_env
from zeep.wsse.signature import Signature, _sign_node, _make_sign_key, _make_verify_key, _verify_envelope_with_key
from zeep.wsse.utils import ensure_id, get_security_header, WSU
from datetime import datetime, timedelta
from lxml import etree
from lxml.etree import QName

def _sign_envelope_with_key(envelope, key, actor=None):
soap_env = detect_soap_env(envelope)
# Create the Signature node.
signature = xmlsec.template.create(
envelope,
xmlsec.Transform.EXCL_C14N,
xmlsec.Transform.RSA_SHA1,
)

# Add a KeyInfo node with X509Data child to the Signature. XMLSec will fill
# in this template with the actual certificate details when it signs.
key_info = xmlsec.template.ensure_key_info(signature)
x509_data = xmlsec.template.add_x509_data(key_info)
xmlsec.template.x509_data_add_issuer_serial(x509_data)
xmlsec.template.x509_data_add_certificate(x509_data)

# Insert the Signature node in the wsse:Security header.
security = get_security_header(envelope)
if actor:
security.set(QName(soap_env, 'actor'), actor)

security.insert(0, signature)

timestamp = WSU('Timestamp')
created = datetime.utcnow()
expired = created + timedelta(seconds=1 * 60)

timestamp.append(WSU('Created', created.replace(microsecond=0).isoformat()+'Z'))
timestamp.append(WSU('Expires', expired.replace(microsecond=0).isoformat()+'Z'))
security.append(timestamp)


# Perform the actual signing.
ctx = xmlsec.SignatureContext()
ctx.key = key
_sign_node(ctx, signature, envelope.find(QName(soap_env, 'Body')), digest_method=xmlsec.Transform.SHA256)
_sign_node(ctx, signature, security.find(QName(ns.WSU, 'Timestamp')), digest_method=xmlsec.Transform.SHA256)
ctx.sign(signature)

# Place the X509 data inside a WSSE SecurityTokenReference within
# KeyInfo. The recipient expects this structure, but we can't rearrange
# like this until after signing, because otherwise xmlsec won't populate
# the X509 data (because it doesn't understand WSSE).
sec_token_ref = etree.SubElement(
key_info, QName(ns.WSSE, 'SecurityTokenReference'))
sec_token_ref.append(x509_data)

class WSSignature(Signature):
def __init__(self, key_file, certfile, password=None, actor=None, verify='use self'):
super(WSSignature, self).__init__(key_file, certfile, password)
self.actor = actor
if verify == 'use self':
self.verify_o = self
else:
self.verify_o = verify

def apply(self, envelope, headers):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key(envelope, key, actor=self.actor)
return envelope, headers

def verify(self, envelope):
if not self.verify_o:
return envelope

key = _make_verify_key(self.verify_o.cert_data)
_verify_envelope_with_key(envelope, key)
return envelope
27 changes: 24 additions & 3 deletions ZeepLibrary/zeeplibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from email.mime.image import MIMEImage
from email.encoders import encode_7or8bit, encode_base64, encode_noop
import base64

from ZeepLibrary.ws_wsse import WSSignature

class ZeepLibraryException(Exception):
def __str__(self):
Expand Down Expand Up @@ -233,7 +233,8 @@ def create_client(self,
auth=None,
proxies=None,
cert=None,
verify=None):
verify=None,
wsse=None):
session = requests.Session()
session.cert = cert
session.proxies = proxies
Expand All @@ -242,7 +243,7 @@ def create_client(self,
session.auth = requests.auth.HTTPBasicAuth(auth[0], auth[1])
transport = zeep.transports.Transport(session=session)

client = zeep.Client(wsdl, transport=transport)
client = zeep.Client(wsdl, transport=transport, wsse=wsse)
client.attachments = []

self._add_client(client, alias)
Expand Down Expand Up @@ -318,6 +319,26 @@ def switch_client(self, alias):

return current_active_client_alias

@keyword()
def wssignature(self, key_file, cert_file, password=None, actor=None, verify=False):
"""This keyword contructs a wsse signing object that can be used
by the keyword Create Session in the wsse argument.

key_file and cert_file are PEM format files for pythons requests module.
The password is to unlock the key file if it is secured with a password.
If actor is given a value, it is used to populate the actor attribute on the signature.

verify, if given, should be another WSSignature object for verifying the response
or the word "DISABLED" to explicitly disable signature verification.

When verify is not given, the same key as the request is used.
"""
if hasattr(verify, 'upper') and verify.upper() == 'DISABLED':
return WSSignature(key_file, cert_file, password=password, actor=actor, verify=False)
elif verify:
return WSSignature(key_file, cert_file, password=password, actor=actor, verify=verify)
else:
return WSSignature(key_file, cert_file, password=password, actor=actor)

# Utilities.
def _guess_mimetype(filename):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setuptools.setup(
name="robotframework-zeeplibrary",
version="0.9.2",
version="0.9.3",
author="Bart Kleijngeld",
author_email="[email protected]",
description="Robot Framework library for using Zeep.",
Expand All @@ -11,5 +11,5 @@
classifiers=(
"Operating System :: OS Independent",
),
install_requires="zeep>=2.5.0",
install_requires=("zeep>=2.5.0", "xmlsec>=1.3"),
)