Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tests/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#
from os import environ

from six import text_type

FAKE = environ.get('FAKE')
RANDOM_DOMAIN = environ.get('RANDOM_DOMAIN')
Expand Down Expand Up @@ -49,5 +48,5 @@
NGTS_SCOPE = environ.get('NGTS_SCOPE')
NGTS_ZONE = environ.get('NGTS_ZONE')

if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, text_type):
if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, str):
RANDOM_DOMAIN = RANDOM_DOMAIN.decode()
10 changes: 5 additions & 5 deletions tests/test_tpp_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import binascii
import time
import unittest
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from cryptography import x509
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_tpp_token_enroll(self):
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
self.assertEqual(cert_config['Origin'], "Venafi VCert-Python")
except Exception as err:
self.fail(f"Error in test: {err.message}")
self.fail(f"Error in test: {str(err)}")

def test_tpp_token_enroll_with_service_generated_csr(self):
cn = f"{random_word(10)}.venafi.example.com"
Expand All @@ -58,7 +58,7 @@ def test_tpp_token_enroll_with_service_generated_csr(self):
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
self.assertEqual(cert_config['Origin'], "Venafi VCert-Python")
except Exception as err:
self.fail(f"Error in test: {err.message}")
self.fail(f"Error in test: {str(err)}")

def test_tpp_token_enroll_with_custom_fields(self):
cn = f"{random_word(10)}.venafi.example.com"
Expand Down Expand Up @@ -213,14 +213,14 @@ def test_tpp_token_enroll_valid_hours(self):
request.custom_fields = custom_fields
request.validity_hours = 144
request.issuer_hint = IssuerHint.MICROSOFT
expected_date = datetime.utcnow() + timedelta(hours=request.validity_hours)
expected_date = datetime.now(timezone.utc) + timedelta(hours=request.validity_hours)

self.tpp_conn.request_cert(request, self.tpp_zone)
cert = self.tpp_conn.retrieve_cert(request)

cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
assert isinstance(cert, x509.Certificate)
expiration_date = cert.not_valid_after
expiration_date = cert.not_valid_after_utc
# Due to some roundings and delays in operations on the server side, the certificate expiration date
# is not exactly the same as the one used in the request. A gap is allowed in this scenario to compensate
# this delays and roundings.
Expand Down
3 changes: 1 addition & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.x509.oid import NameOID
from six import string_types

from test_env import RANDOM_DOMAIN
from vcert import CertificateRequest, FakeConnection, TPPConnection, TPPTokenConnection, CSR_ORIGIN_SERVICE
Expand Down Expand Up @@ -141,7 +140,7 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
if isinstance(public_key, string_types):
if isinstance(public_key, str):
public_key = public_key.encode()
if public_key:
source_public_key_pem = serialization.load_pem_public_key(
Expand Down
10 changes: 5 additions & 5 deletions tests/test_vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import binascii
import time
import unittest
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from cryptography import x509
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -123,14 +123,14 @@ def test_cloud_enroll_valid_hours(self):
]
request.custom_fields = custom_fields
request.validity_hours = 144
expected_date = datetime.utcnow() + timedelta(hours=request.validity_hours)
expected_date = datetime.now(timezone.utc) + timedelta(hours=request.validity_hours)

self.cloud_conn.request_cert(request, self.cloud_zone)
cert = self.cloud_conn.retrieve_cert(request)

cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
assert isinstance(cert, x509.Certificate)
expiration_date = cert.not_valid_after
expiration_date = cert.not_valid_after_utc
# Due to some roundings and delays in operations on the server side, the certificate expiration date
# is not exactly the same as the one used in the request. A gap is allowed in this scenario to compensate
# this delays and roundings.
Expand Down Expand Up @@ -196,7 +196,7 @@ def test_enroll_ec_key_certificate(self):
p_key = serialization.load_pem_private_key(data=cert.key.encode(), password=password.encode(),
backend=default_backend())
except Exception as e:
log.error(msg=f"Error parsing Private Key: {e.message}")
log.error(msg=f"Error parsing Private Key: {str(e)}")

if p_key:
self.assertIsInstance(p_key, EllipticCurvePrivateKey, "returned private key is not of type Elliptic Curve")
Expand All @@ -212,4 +212,4 @@ def test_cloud_retire_by_thumbprint(self):
ret_data = self.cloud_conn.retire_cert(ret_request)
assert ret_data is True
except Exception as e:
log.error(msg=f"Error retiring certificate by thumbprint: {e.message}")
log.error(msg=f"Error retiring certificate by thumbprint: {str(e)}")
13 changes: 6 additions & 7 deletions vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID, ExtensionOID
from six import string_types, binary_type

from .errors import VenafiConnectionError, ServerUnexptedBehavior, BadData, ClientBadData
from .http_status import HTTPStatus
Expand Down Expand Up @@ -346,16 +345,16 @@ def __init__(self, cert_id=None,

def __setattr__(self, key, value):
if key == "key_password":
if isinstance(value, string_types):
if isinstance(value, str):
value = value.encode()
elif key == "common_name":
if isinstance(value, binary_type):
if isinstance(value, bytes):
value = value.decode()
elif key == "key_type":
if value is not None and not isinstance(value, KeyType):
raise ClientBadData("key_type should be instance of vcert.KeyType")
elif key == "private_key":
if isinstance(value, string_types):
if isinstance(value, str):
value = serialization.load_pem_private_key(value.encode(),
password=self.key_password, backend=default_backend())
if isinstance(value, rsa.RSAPrivateKey):
Expand All @@ -368,9 +367,9 @@ def __setattr__(self, key, value):
raise ClientBadData(f"invalid private key type {type(value)}")
elif key == "csr":
self.csr_origin = CSR_ORIGIN_PROVIDED
if isinstance(value, binary_type):
if isinstance(value, bytes):
value = value.decode()
elif not (isinstance(value, string_types) or value is None):
elif not (isinstance(value, str) or value is None):
raise ClientBadData(f"invalid csr type {type(value)}")
if value:
csr = x509.load_pem_x509_csr(value.encode(), default_backend())
Expand Down Expand Up @@ -433,7 +432,7 @@ def build_csr(self):
if self.organization:
subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, self.organization))
if self.organizational_unit:
if isinstance(self.organizational_unit, string_types):
if isinstance(self.organizational_unit, str):
subject.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.organizational_unit))
elif isinstance(self.organizational_unit, list):
for u in self.organizational_unit:
Expand Down
5 changes: 2 additions & 3 deletions vcert/connection_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import time

import requests
import six.moves.urllib.parse as urlparse
import urllib.parse as urlparse
from nacl.public import SealedBox
from six import string_types

from .common import (ZoneConfig, CertificateRequest, CommonConnection, Policy, get_ip_address, log_errors, MIME_JSON,
MIME_TEXT, MIME_ANY, CertField, KeyType, DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -934,7 +933,7 @@ def _get_service_generated_csr_attr(self, request, zone):
csr_attr_map[CSR_ATTR_ORG] = ps.defaults.subject.org

if request.organizational_unit:
if isinstance(request.organizational_unit, string_types):
if isinstance(request.organizational_unit, str):
org_units = [request.organizational_unit]
else:
org_units = request.organizational_unit
Expand Down
4 changes: 2 additions & 2 deletions vcert/connection_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ def retrieve_cert(self, certificate_request):
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
datetime.datetime.now(datetime.timezone.utc)
).not_valid_after(
# Our certificate will be valid for 10 days
datetime.datetime.utcnow() + datetime.timedelta(days=10)
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10)
).add_extension(
# csr_builder.extensions,
x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
Expand Down
6 changes: 3 additions & 3 deletions vcert/connection_tpp_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import logging as log
import re
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
from six.moves.urllib import parse as url_parse
from urllib import parse as url_parse

from .common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, Policy, ZoneConfig
Expand Down Expand Up @@ -147,7 +147,7 @@ def request_cert(self, request, zone):
else:
exp_date_attr = IssuerHint.DEFAULT.json_value

expiration_date = datetime.utcnow() + timedelta(hours=request.validity_hours)
expiration_date = datetime.now(timezone.utc) + timedelta(hours=request.validity_hours)
formatted_expiration_date = expiration_date.strftime("%Y-%m-%d %H:%M:%S")

expiration_date = {'Name': exp_date_attr, 'Value': formatted_expiration_date}
Expand Down