Skip to content

Clean up "Not Yet Implemented" tests #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gssapi/creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def store(self, store=None, usage='both', mech=None,
raise NotImplementedError("Your GSSAPI implementation does "
"not have support for RFC 5588")

return rcred_cred_store.store_cred(self, usage, mech,
overwrite, set_default)
return rcred_rfc5588.store_cred(self, usage, mech,
overwrite, set_default)
else:
if rcred_cred_store is None:
raise NotImplementedError("Your GSSAPI implementation does "
Expand Down Expand Up @@ -332,6 +332,7 @@ def add(self, desired_name, desired_mech, usage='both',
raise NotImplementedError("Your GSSAPI implementation does "
"not have support for manipulating "
"credential stores")
store = _encode_dict(store)

res = rcred_cred_store.add_cred_from(store, self, desired_name,
desired_mech, usage,
Expand Down
8 changes: 6 additions & 2 deletions gssapi/raw/sec_contexts.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def accept_sec_context(input_token not None, Creds acceptor_cred=None,
free(bdng)

cdef Name on = Name()
cdef Creds oc = Creds()
cdef Creds oc = None
cdef OID py_mech_type
if maj_stat == GSS_S_COMPLETE or maj_stat == GSS_S_CONTINUE_NEEDED:
if output_ttl == GSS_C_INDEFINITE:
Expand All @@ -319,7 +319,11 @@ def accept_sec_context(input_token not None, Creds acceptor_cred=None,
output_ttl_py = output_ttl

on.raw_name = initiator_name
oc.raw_creds = delegated_cred

if delegated_cred is not NULL:
oc = Creds()
oc.raw_creds = delegated_cred

if mech_type is not NULL:
py_mech_type = OID()
py_mech_type.raw_oid = mech_type[0]
Expand Down
139 changes: 109 additions & 30 deletions gssapi/tests/test_high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from gssapi import exceptions as excs
from gssapi.tests._utils import _extension_test
from gssapi.tests import k5test as kt
from gssapi._utils import import_gssapi_extension


TARGET_SERVICE_NAME = b'host'
Expand Down Expand Up @@ -153,7 +152,34 @@ def test_acquire_by_method(self, str_name, kwargs):

@_extension_test('rfc5588', 'RFC 5588')
def test_store_acquire(self):
self.skipTest("Not Yet Implemented")
# we need to acquire a forwardable ticket
svc_princ = SERVICE_PRINCIPAL.decode("UTF-8")
self.realm.kinit(svc_princ, flags=['-k', '-f'])

target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)

client_creds = gsscreds.Credentials(usage='initiate')
client_ctx = gssctx.SecurityContext(
name=target_name, creds=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

client_token = client_ctx.step()

server_creds = gsscreds.Credentials(usage='accept')
server_ctx = gssctx.SecurityContext(creds=server_creds)
server_ctx.step(client_token)

deleg_creds = server_ctx.delegated_creds
deleg_creds.shouldnt_be_none()

store_res = deleg_creds.store(usage='initiate', set_default=True)
store_res.usage.should_be('initiate')
store_res.mech_types.should_include(gb.MechType.kerberos)

reacquired_creds = gsscreds.Credentials(desired_name=deleg_creds.name,
usage='initiate')
reacquired_creds.shouldnt_be_none()

@_extension_test('cred_store', 'credentials store')
def test_store_into_acquire_from(self):
Expand Down Expand Up @@ -181,7 +207,10 @@ def test_store_into_acquire_from(self):
retrieved_creds.shouldnt_be_none()

def test_create_from_other(self):
self.skipTest("Not Yet Implemented")
raw_creds = gb.acquire_cred(None, cred_usage='accept').creds

high_level_creds = gsscreds.Credentials(raw_creds)
high_level_creds.usage.should_be('accept')

@true_false_perms('name', 'lifetime', 'usage', 'mechs')
def test_inquire(self, str_name, kwargs):
Expand Down Expand Up @@ -235,11 +264,41 @@ def test_inquire_by_mech(self, str_name, kwargs):
resp.usage.should_be_none()

def test_add(self):
self.skipTest("Not Yet Implemented")
input_creds = gsscreds.Credentials(gb.Creds())
name = gssnames.Name(SERVICE_PRINCIPAL)
new_creds = input_creds.add(name, gb.MechType.kerberos,
usage='initiate')

new_creds.shouldnt_be_none()
new_creds.should_be_a(gsscreds.Credentials)

@_extension_test('cred_store', 'credentials store')
def test_store_into_add_from(self):
CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir)
KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir)
store = {'ccache': CCACHE, 'keytab': KT}

princ_name = 'service/cs@' + self.realm.realm
self.realm.addprinc(princ_name)
self.realm.extract_keytab(princ_name, KT)
self.realm.kinit(princ_name, None, ['-k', '-t', KT])

initial_creds = gsscreds.Credentials(desired_name=None,
usage='initiate')

# NB(directxman12): we don't test add_cred_from because it really requires
# multiple mechanism support, which would mean something
# like requiring NTLM libraries
store_res = initial_creds.store(store, overwrite=True)

store_res.mech_types.shouldnt_be_none()
store_res.mech_types.shouldnt_be_empty()
store_res.usage.should_be('initiate')

name = gssnames.Name(princ_name)
input_creds = gsscreds.Credentials(gb.Creds())
retrieved_creds = input_creds.add(name, gb.MechType.kerberos,
store=store)

retrieved_creds.shouldnt_be_none()
retrieved_creds.should_be_a(gsscreds.Credentials)

@_extension_test('cred_imp_ext', 'credentials import-export')
def test_export(self):
Expand Down Expand Up @@ -267,37 +326,54 @@ def test_pickle_unpickle(self):

@exist_perms(lifetime=30, desired_mechs=[gb.MechType.kerberos],
usage='initiate')
@_extension_test('s4u', 'S4U')
def test_impersonate(self, str_name, kwargs):
if import_gssapi_extension('s4u') is None:
self.skipTest("The S4U GSSAPI extension is not supported "
"by your GSSAPI implementation")
else:
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
# TODO(directxman12): make this use the high-level SecurityContext
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free everything but the token
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
# TODO(directxman12): make this use the high-level SecurityContext
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free everything but the token

server_name = self.name
server_creds = gsscreds.Credentials(desired_name=server_name,
usage='both')
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_cred=server_creds)
server_name = self.name
server_creds = gsscreds.Credentials(desired_name=server_name,
usage='both')
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_cred=server_creds)

imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs)
imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs)

imp_creds.shouldnt_be_none()
imp_creds.should_be_a(gsscreds.Credentials)
imp_creds.shouldnt_be_none()
imp_creds.should_be_a(gsscreds.Credentials)

@_extension_test('s4u', 'S4U')
def test_add_with_impersonate(self):
self.skipTest("Not Yet Implemented")
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx = gssctx.SecurityContext(name=target_name)
client_token = client_ctx.step()

server_creds = gsscreds.Credentials(usage='both')
server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept')
server_ctx.step(client_token)

# use empty creds to test here
input_creds = gsscreds.Credentials(gb.Creds())
new_creds = input_creds.add(server_ctx.initiator_name,
gb.MechType.kerberos,
impersonator=server_creds,
usage='initiate')

new_creds.shouldnt_be(None)
new_creds.should_be_a(gsscreds.Credentials)


class NamesTestCase(_GSSAPIKerberosTestCase):
def test_create_from_other(self):
self.skipTest("Not Yet Implemented")
raw_name = gb.import_name(SERVICE_PRINCIPAL)
high_level_name = gssnames.Name(raw_name)

bytes(high_level_name).should_be(SERVICE_PRINCIPAL)

def test_create_from_name_no_type(self):
name = gssnames.Name(SERVICE_PRINCIPAL)
Expand Down Expand Up @@ -398,11 +474,14 @@ def setUp(self):
def _create_client_ctx(self, **kwargs):
return gssctx.SecurityContext(name=self.target_name, **kwargs)

def test_process_token(self):
self.skipTest("Not Yet Implemented")
# NB(directxman12): we skip testing process_context_token, because there is
# no concrete, non-deprecated was to obtain an "async"
# token

def test_create_from_other(self):
self.skipTest("Not Yet Implemented")
raw_client_ctx, raw_server_ctx = self._create_completed_contexts()
high_level_ctx = gssctx.SecurityContext(raw_client_ctx)
high_level_ctx.target_name.should_be(self.target_name)

@exist_perms(desired_lifetime=30, flags=[],
mech_type=gb.MechType.kerberos,
Expand Down
36 changes: 32 additions & 4 deletions gssapi/tests/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ def test_inquire_context(self):
is_open.should_be_a(bool)
is_open.should_be_true()

def test_process_context_token(self):
# TODO(directxman12): figure out how to write a test for this
self.skipTest("Not Yet Implemented")
# NB(directxman12): We don't test `process_context_token` because
# there is no clear non-deprecated way to test it

@_extension_test('s4u', 'S4U')
def test_add_cred_impersonate_name(self):
Expand Down Expand Up @@ -296,7 +295,36 @@ def test_acquire_creds_impersonate_name(self):

@_extension_test('rfc5588', 'RFC 5588')
def test_store_cred_acquire_cred(self):
self.skipTest("Not Yet Implemented")
# we need to acquire a forwardable ticket
svc_princ = SERVICE_PRINCIPAL.decode("UTF-8")
self.realm.kinit(svc_princ, flags=['-k', '-f'])

target_name = gb.import_name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)

client_creds = gb.acquire_cred(None, cred_usage='initiate').creds
client_ctx_resp = gb.init_sec_context(
target_name, cred=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

client_token = client_ctx_resp[3]

server_creds = gb.acquire_cred(None, cred_usage='accept').creds
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_cred=server_creds)

deleg_creds = server_ctx_resp.delegated_creds
deleg_creds.shouldnt_be_none()
store_res = gb.store_cred(deleg_creds, cred_usage='initiate',
set_default=True)

store_res.shouldnt_be_none()
store_res.usage.should_be('initiate')
store_res.mech_types.should_include(gb.MechType.kerberos)

deleg_name = gb.inquire_cred(deleg_creds).name
acq_resp = gb.acquire_cred(deleg_name, cred_usage='initiate')
acq_resp.shouldnt_be_none()

@_extension_test('cred_store', 'credentials store')
def test_store_cred_into_acquire_cred(self):
Expand Down