Añadido tests para la revocación de certificados

This commit is contained in:
Daniel Armengod 2024-02-01 21:35:31 +01:00
parent 62375123f4
commit f67c688259
2 changed files with 63 additions and 15 deletions

View File

@ -1,5 +1,7 @@
import asyncio import asyncio
import base64
import datetime import datetime
import zlib
from typing import Any from typing import Any
import didkit import didkit
@ -8,6 +10,8 @@ import jinja2
from jinja2 import Environment, FileSystemLoader, select_autoescape from jinja2 import Environment, FileSystemLoader, select_autoescape
from ast import literal_eval from ast import literal_eval
from pyroaring import BitMap
def generate_did_controller_key(): def generate_did_controller_key():
return didkit.generate_ed25519_key() return didkit.generate_ed25519_key()
@ -22,9 +26,12 @@ def generate_generic_vc_id():
return "https://pangea.org/credentials/42" return "https://pangea.org/credentials/42"
async def resolve_keydid(keydid): def resolve_did(keydid):
async def inner():
return await didkit.resolve_did(keydid, "{}") return await didkit.resolve_did(keydid, "{}")
return asyncio.run(inner())
def webdid_from_controller_key(key): def webdid_from_controller_key(key):
""" """
@ -33,7 +40,7 @@ def webdid_from_controller_key(key):
""" """
keydid = keydid_from_controller_key(key) # "did:key:<...>" keydid = keydid_from_controller_key(key) # "did:key:<...>"
pubkeyid = keydid.rsplit(":")[-1] # <...> pubkeyid = keydid.rsplit(":")[-1] # <...>
document = json.loads(asyncio.run(resolve_keydid(keydid))) # Documento DID en terminos "key" document = json.loads(resolve_did(keydid)) # Documento DID en terminos "key"
webdid_url = f"did:web:idhub.pangea.org:{pubkeyid}" # nueva URL: "did:web:idhub.pangea.org:<...>" webdid_url = f"did:web:idhub.pangea.org:{pubkeyid}" # nueva URL: "did:web:idhub.pangea.org:<...>"
webdid_url_owner = webdid_url + "#owner" webdid_url_owner = webdid_url + "#owner"
# Reemplazamos los campos del documento DID necesarios: # Reemplazamos los campos del documento DID necesarios:
@ -91,11 +98,8 @@ def sign_credential(unsigned_vc: str, jwk_issuer):
def verify_credential(vc): def verify_credential(vc):
""" """
Returns a (bool, str) tuple indicating whether the credential is valid. Returns a (bool, str) tuple indicating whether the credential is valid.
Checks performed:
* The credential is valid in signature and form, and
* The credential validates itself against its declared schema.
If the boolean is true, the credential is valid and the second argument can be ignored. If the boolean is true, the credential is valid and the second argument can be ignored.
If it is false, the VC is invalid and the second argument contains a string (which is a valid JSON object) with further information. If it is false, the VC is invalid and the second argument contains a JSON object with further information.
""" """
async def inner(): async def inner():
str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}') str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}')
@ -103,12 +107,31 @@ def verify_credential(vc):
ok = res["warnings"] == [] and res["errors"] == [] ok = res["warnings"] == [] and res["errors"] == []
return ok, str_res return ok, str_res
(ok, res) = asyncio.run(inner()) valid, reason = asyncio.run(inner())
if not ok: if not valid:
# The credential doesn't pass signature checks, so early return return valid, reason
return ok, res # Credential passes basic signature verification. Now check it against its schema.
return ok, res # TODO: check agasint schema
pass
# Credential verifies against its schema. Now check revocation status.
vc = json.loads(vc)
if "credentialStatus" in vc:
revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE.
vc_issuer = vc["issuer"]["id"] # This is a DID
issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated.
issuer_revocation_list = issuer_did_document["service"][0]
assert issuer_revocation_list["type"] == "RevocationBitmap2022"
revocation_bitmap = BitMap.deserialize(
zlib.decompress(
base64.b64decode(
issuer_revocation_list["serviceEndpoint"].rsplit(",")[1]
)
)
)
if revocation_index in revocation_bitmap:
return False, "Credential has been revoked by the issuer"
# Fallthrough means all is good.
return True, "Credential passes all checks"
def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str) -> str: def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str) -> str:
@ -136,8 +159,8 @@ def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_di
def verify_presentation(vp): def verify_presentation(vp):
""" """
Returns a (bool, str) tuple indicating whether the credential is valid. Returns a (bool, str) tuple indicating whether the presentation is valid.
If the boolean is true, the credential is valid and the second argument can be ignored. If the boolean is true, the presentation is valid and the second argument can be ignored.
If it is false, the VC is invalid and the second argument contains a JSON object with further information. If it is false, the VC is invalid and the second argument contains a JSON object with further information.
""" """
async def inner(): async def inner():

25
main.py
View File

@ -70,6 +70,31 @@ def test_all_vcs(use_webdid=False):
print(e) print(e)
def did_web_issue_vc_and_check_revocation(vc_name, revoked=True):
jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}'
jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}'
did_issuer = "did:web:idhub.pangea.org:did-registry:allRevoked" if revoked else "did:web:idhub.pangea.org:did-registry:noneRevoked"
did_subject = didkit.key_to_did("key", jwk_subject)
vc_template = json.load(open(f'../../schemas/vc_templates/{vc_name}.json'))
data_base = json.load(open(f'../../schemas/vc_examples/base--data.json'))
data_base["issuer"]["id"] = did_issuer
data_base["credentialSubject"]["id"] = did_subject
data_specific = json.load(open(f'../../schemas/vc_examples/{vc_name}--data.json'))
data = deep_merge_dict(data_base, data_specific)
vc_rendered_unsigned = deep_merge_dict(vc_template, data)
signed_credential = idhub_ssikit.render_and_sign_credential(
vc_rendered_unsigned,
jwk_issuer,
)
ok, reason = idhub_ssikit.verify_credential(signed_credential)
print(ok)
print(reason)
def did_web_issue_vc_test_newstyle(vc_name): def did_web_issue_vc_test_newstyle(vc_name):
jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}' jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}'
jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}' jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}'