Skip to main content

Servicios Externos

Integración Stripe

Configuración Stripe

# managementProject/settings.py
STRIPE_SECRET_KEY = os.getenv('STRIPE_SECRET_KEY')
STRIPE_PUBLISHABLE_KEY = os.getenv('STRIPE_PUBLISHABLE_KEY')
STRIPE_WEBHOOK_SECRET = os.getenv('STRIPE_WEBHOOK_SECRET')

# managementApp/viewsfolder/StripeViews.py
import stripe
from django.conf import settings

stripe.api_key = settings.STRIPE_SECRET_KEY

Products and Pricing

class StripeProductsView(APIView):
def get(self, request):
try:
products = stripe.Product.list(active=True)

products_with_prices = []
for product in products.data:
prices = stripe.Price.list(product=product.id, active=True)

products_with_prices.append({
'id': product.id,
'name': product.name,
'description': product.description,
'images': product.images,
'metadata': product.metadata,
'prices': [{
'id': price.id,
'amount': price.unit_amount,
'currency': price.currency,
'interval': price.recurring.interval if price.recurring else None,
'interval_count': price.recurring.interval_count if price.recurring else None
} for price in prices.data]
})

return Response({
'success': True,
'data': products_with_prices
})

except Exception as e:
return Response({
'success': False,
'message': f'Error obteniendo productos: {str(e)}'
}, status=500)

Payment Intents

class CreatePaymentIntentView(APIView):
def post(self, request):
try:
amount = request.data.get('amount')
currency = request.data.get('currency', 'cop')
price_id = request.data.get('price_id')

# Create payment intent
intent = stripe.PaymentIntent.create(
amount=amount,
currency=currency,
metadata={
'user_id': request.user.id,
'price_id': price_id
}
)

return Response({
'success': True,
'data': {
'client_secret': intent.client_secret,
'payment_intent_id': intent.id
}
})

except Exception as e:
return Response({
'success': False,
'message': f'Error creando payment intent: {str(e)}'
}, status=500)

Webhooks

class StripeWebhookView(APIView):
permission_classes = []

def post(self, request):
payload = request.body
sig_header = request.META.get('HTTP_STRIPE_SIGNATURE')
endpoint_secret = settings.STRIPE_WEBHOOK_SECRET

try:
event = stripe.Webhook.construct_event(
payload, sig_header, endpoint_secret
)
except ValueError:
return Response({'error': 'Invalid payload'}, status=400)
except stripe.error.SignatureVerificationError:
return Response({'error': 'Invalid signature'}, status=400)

# Handle the event
if event['type'] == 'payment_intent.succeeded':
payment_intent = event['data']['object']
self.handle_successful_payment(payment_intent)

elif event['type'] == 'customer.subscription.created':
subscription = event['data']['object']
self.handle_new_subscription(subscription)

elif event['type'] == 'customer.subscription.updated':
subscription = event['data']['object']
self.handle_subscription_update(subscription)

return Response({'status': 'success'})

def handle_successful_payment(self, payment_intent):
"""Process successful payment"""
user_id = payment_intent['metadata'].get('user_id')
amount = payment_intent['amount']

# Update user subscription status
try:
user = CustomUser.objects.get(id=user_id)
# Process payment logic here
except CustomUser.DoesNotExist:
pass

def handle_new_subscription(self, subscription):
"""Process new subscription"""
customer_id = subscription['customer']
subscription_id = subscription['id']

# Create local subscription record
try:
# Find user by Stripe customer ID
user = CustomUser.objects.get(stripe_customer_id=customer_id)

SubscriptionModel.objects.create(
user=user,
stripe_subscription_id=subscription_id,
stripe_customer_id=customer_id,
status='active',
plan_name=subscription['items']['data'][0]['price']['nickname'],
amount=subscription['items']['data'][0]['price']['unit_amount'],
currency=subscription['currency'],
interval=subscription['items']['data'][0]['price']['recurring']['interval'],
current_period_start=datetime.fromtimestamp(subscription['current_period_start']),
current_period_end=datetime.fromtimestamp(subscription['current_period_end'])
)
except CustomUser.DoesNotExist:
pass

AWS S3 Integration

S3 Configuration

# managementProject/settings.py
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_S3_BUCKET_NAME = os.getenv('AWS_S3_BUCKET_NAME')
AWS_REGION_NAME = os.getenv('AWS_REGION_NAME', 'us-east-1')

File Upload Utils

# managementApp/utilsfolder/FileUtils.py
import boto3
from django.conf import settings
import uuid
import os

class S3Manager:
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=settings.AWS_ACCESS_KEY,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION_NAME
)
self.bucket_name = settings.AWS_S3_BUCKET_NAME

def upload_file(self, file, folder_path='uploads/'):
"""Upload file to S3 and return URL"""
try:
# Generate unique filename
file_extension = os.path.splitext(file.name)[1]
unique_filename = f"{uuid.uuid4()}{file_extension}"
s3_key = f"{folder_path}{unique_filename}"

# Upload file
self.s3_client.upload_fileobj(
file,
self.bucket_name,
s3_key,
ExtraArgs={
'ContentType': file.content_type,
'ACL': 'public-read'
}
)

# Generate URL
file_url = f"https://{self.bucket_name}.s3.{settings.AWS_REGION_NAME}.amazonaws.com/{s3_key}"

return {
'success': True,
'url': file_url,
's3_key': s3_key
}

except Exception as e:
return {
'success': False,
'error': str(e)
}

def delete_file(self, s3_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(
Bucket=self.bucket_name,
Key=s3_key
)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}

def generate_presigned_url(self, s3_key, expiration=3600):
"""Generate presigned URL for temporary access"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': s3_key},
ExpiresIn=expiration
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}

Image Upload View

class PropertyImageUploadView(APIView):
def post(self, request):
try:
property_id = request.data.get('property_id')
images = request.FILES.getlist('images')

if not property_id or not images:
return Response({
'success': False,
'message': 'property_id e imágenes son requeridos'
}, status=400)

property_instance = PropertyModel.objects.get(id=property_id)
s3_manager = S3Manager()
uploaded_images = []

for index, image in enumerate(images):
# Validate file type
if not image.content_type.startswith('image/'):
continue

# Upload to S3
result = s3_manager.upload_file(
image,
f'properties/{property_id}/'
)

if result['success']:
# Create PropertyImage record
property_image = PropertyImage.objects.create(
property=property_instance,
image_url=result['url'],
is_main=(index == 0),
order_index=index
)

uploaded_images.append({
'id': property_image.id,
'url': result['url'],
'is_main': property_image.is_main
})

return Response({
'success': True,
'message': f'{len(uploaded_images)} imágenes subidas exitosamente',
'data': uploaded_images
})

except PropertyModel.DoesNotExist:
return Response({
'success': False,
'message': 'Propiedad no encontrada'
}, status=404)
except Exception as e:
return Response({
'success': False,
'message': f'Error subiendo imágenes: {str(e)}'
}, status=500)

AWS SES Email Service

SES Configuration

# managementProject/settings.py
AWS_SES_REGION_NAME = os.getenv('AWS_SES_REGION_NAME', 'us-east-1')
DEFAULT_FROM_EMAIL = os.getenv('DEFAULT_FROM_EMAIL', 'noreply@alojaplus.com')

Email Manager

# managementApp/utilsfolder/EmailUtils.py
import boto3
from django.conf import settings
from django.template.loader import render_to_string
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

class EmailManager:
def __init__(self):
self.ses_client = boto3.client(
'ses',
aws_access_key_id=settings.AWS_ACCESS_KEY,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_SES_REGION_NAME
)

def send_welcome_email(self, user_email, user_name):
"""Send welcome email to new user"""
subject = "¡Bienvenido a AlojaPlus!"

# Render template
html_content = render_to_string('emails/welcome.html', {
'user_name': user_name,
'login_url': f"{settings.DOMAIN}/auth/sign-in"
})

return self.send_email(user_email, subject, html_content)

def send_contract_signature_request(self, user_email, user_name, contract_id, sign_url):
"""Send contract signature request"""
subject = "Solicitud de Firma de Contrato - AlojaPlus"

html_content = render_to_string('emails/contract_signature.html', {
'user_name': user_name,
'contract_id': contract_id,
'sign_url': sign_url,
'expires_in': "7 días"
})

return self.send_email(user_email, subject, html_content)

def send_payment_reminder(self, user_email, user_name, amount, due_date):
"""Send payment reminder"""
subject = "Recordatorio de Pago - AlojaPlus"

html_content = render_to_string('emails/payment_reminder.html', {
'user_name': user_name,
'amount': amount,
'due_date': due_date,
'payment_url': f"{settings.DOMAIN}/admin/payments"
})

return self.send_email(user_email, subject, html_content)

def send_email(self, to_email, subject, html_content):
"""Send email using AWS SES"""
try:
response = self.ses_client.send_email(
Source=settings.DEFAULT_FROM_EMAIL,
Destination={'ToAddresses': [to_email]},
Message={
'Subject': {'Data': subject, 'Charset': 'UTF-8'},
'Body': {
'Html': {'Data': html_content, 'Charset': 'UTF-8'}
}
}
)

return {
'success': True,
'message_id': response['MessageId']
}

except Exception as e:
return {
'success': False,
'error': str(e)
}

Email Templates

<!-- managementApp/templates/emails/welcome.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Bienvenido a AlojaPlus</title>
</head>
<body>
<div style="max-width: 600px; margin: 0 auto; font-family: Arial, sans-serif;">
<div style="background-color: #0070F3; color: white; padding: 20px; text-align: center;">
<h1>¡Bienvenido a AlojaPlus!</h1>
</div>

<div style="padding: 20px;">
<p>Hola {{ user_name }},</p>

<p>¡Gracias por unirte a AlojaPlus! Tu cuenta ha sido creada exitosamente.</p>

<p>Ahora puedes acceder a nuestra plataforma para gestionar tus propiedades, contratos y pagos.</p>

<div style="text-align: center; margin: 30px 0;">
<a href="{{ login_url }}"
style="background-color: #0070F3; color: white; padding: 12px 24px; text-decoration: none; border-radius: 5px; display: inline-block;">
Iniciar Sesión
</a>
</div>

<p>Si tienes alguna pregunta, no dudes en contactarnos.</p>

<p>Saludos,<br>El equipo de AlojaPlus</p>
</div>

<div style="background-color: #f8f9fa; padding: 15px; text-align: center; font-size: 12px; color: #666;">
© 2024 AlojaPlus. Todos los derechos reservados.
</div>
</div>
</body>
</html>

Error Handling para Servicios

Service Error Handler

# managementApp/utilsfolder/ServiceErrorHandler.py
import logging
from django.http import JsonResponse

logger = logging.getLogger(__name__)

class ServiceErrorHandler:
@staticmethod
def handle_stripe_error(error):
"""Handle Stripe API errors"""
if hasattr(error, 'user_message'):
message = error.user_message
else:
message = "Error processing payment"

logger.error(f"Stripe error: {str(error)}")

return {
'success': False,
'message': message,
'error_type': 'stripe_error'
}

@staticmethod
def handle_aws_error(error):
"""Handle AWS service errors"""
error_code = getattr(error, 'response', {}).get('Error', {}).get('Code', 'Unknown')

if error_code == 'NoSuchBucket':
message = "Storage configuration error"
elif error_code == 'AccessDenied':
message = "Storage access denied"
else:
message = "Storage service error"

logger.error(f"AWS error: {str(error)}")

return {
'success': False,
'message': message,
'error_type': 'aws_error'
}

@staticmethod
def handle_generic_service_error(service_name, error):
"""Handle generic service errors"""
message = f"Error connecting to {service_name}"

logger.error(f"{service_name} error: {str(error)}")

return {
'success': False,
'message': message,
'error_type': 'service_error'
}

Retry Logic

# managementApp/utilsfolder/RetryUtils.py
import time
import functools

def retry_on_failure(max_retries=3, delay=1, exponential_backoff=True):
"""Decorator for retrying failed service calls"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e

wait_time = delay * (2 ** attempt) if exponential_backoff else delay
time.sleep(wait_time)

return None
return wrapper
return decorator

# Uso
@retry_on_failure(max_retries=3, delay=1)
def upload_to_s3_with_retry(file, folder_path):
s3_manager = S3Manager()
return s3_manager.upload_file(file, folder_path)

Health Checks

Service Health Monitoring

class ServiceHealthView(APIView):
permission_classes = []

def get(self, request):
health_status = {
'status': 'healthy',
'services': {}
}

# Check database
try:
from django.db import connection
cursor = connection.cursor()
cursor.execute("SELECT 1")
health_status['services']['database'] = 'healthy'
except Exception as e:
health_status['services']['database'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'

# Check S3
try:
s3_manager = S3Manager()
s3_manager.s3_client.head_bucket(Bucket=s3_manager.bucket_name)
health_status['services']['s3'] = 'healthy'
except Exception as e:
health_status['services']['s3'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'

# Check Stripe
try:
stripe.Account.retrieve()
health_status['services']['stripe'] = 'healthy'
except Exception as e:
health_status['services']['stripe'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'

status_code = 200 if health_status['status'] == 'healthy' else 503
return Response(health_status, status=status_code)