315 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			315 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import traceback
 | |
| 
 | |
| import frappe
 | |
| import taxjar
 | |
| from frappe import _
 | |
| from frappe.contacts.doctype.address.address import get_company_address
 | |
| from frappe.utils import cint, flt
 | |
| 
 | |
| from erpnext import get_default_company
 | |
| 
 | |
| TAX_ACCOUNT_HEAD = frappe.db.get_single_value("TaxJar Settings", "tax_account_head")
 | |
| SHIP_ACCOUNT_HEAD = frappe.db.get_single_value("TaxJar Settings", "shipping_account_head")
 | |
| TAXJAR_CREATE_TRANSACTIONS = frappe.db.get_single_value("TaxJar Settings", "taxjar_create_transactions")
 | |
| TAXJAR_CALCULATE_TAX = frappe.db.get_single_value("TaxJar Settings", "taxjar_calculate_tax")
 | |
| SUPPORTED_COUNTRY_CODES = ["AT", "AU", "BE", "BG", "CA", "CY", "CZ", "DE", "DK", "EE", "ES", "FI",
 | |
| 	"FR", "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL", "PL", "PT", "RO",
 | |
| 	"SE", "SI", "SK", "US"]
 | |
| SUPPORTED_STATE_CODES = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FL', 'GA', 'HI', 'ID', 'IL',
 | |
| 	'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE',
 | |
| 	'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD',
 | |
| 	'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']
 | |
| 
 | |
| 
 | |
| def get_client():
 | |
| 	taxjar_settings = frappe.get_single("TaxJar Settings")
 | |
| 
 | |
| 	if not taxjar_settings.is_sandbox:
 | |
| 		api_key = taxjar_settings.api_key and taxjar_settings.get_password("api_key")
 | |
| 		api_url = taxjar.DEFAULT_API_URL
 | |
| 	else:
 | |
| 		api_key = taxjar_settings.sandbox_api_key and taxjar_settings.get_password("sandbox_api_key")
 | |
| 		api_url = taxjar.SANDBOX_API_URL
 | |
| 
 | |
| 	if api_key and api_url:
 | |
| 		client = taxjar.Client(api_key=api_key, api_url=api_url)
 | |
| 		client.set_api_config('headers', {
 | |
| 				'x-api-version': '2020-08-07'
 | |
| 			})
 | |
| 		return client
 | |
| 
 | |
| 
 | |
| def create_transaction(doc, method):
 | |
| 	"""Create an order transaction in TaxJar"""
 | |
| 
 | |
| 	if not TAXJAR_CREATE_TRANSACTIONS:
 | |
| 		return
 | |
| 
 | |
| 	client = get_client()
 | |
| 
 | |
| 	if not client:
 | |
| 		return
 | |
| 
 | |
| 	sales_tax = sum([tax.tax_amount for tax in doc.taxes if tax.account_head == TAX_ACCOUNT_HEAD])
 | |
| 
 | |
| 	if not sales_tax:
 | |
| 		return
 | |
| 
 | |
| 	tax_dict = get_tax_data(doc)
 | |
| 
 | |
| 	if not tax_dict:
 | |
| 		return
 | |
| 
 | |
| 	tax_dict['transaction_id'] = doc.name
 | |
| 	tax_dict['transaction_date'] = frappe.utils.today()
 | |
| 	tax_dict['sales_tax'] = sales_tax
 | |
| 	tax_dict['amount'] = doc.total + tax_dict['shipping']
 | |
| 
 | |
| 	try:
 | |
| 		if doc.is_return:
 | |
| 			client.create_refund(tax_dict)
 | |
| 		else:
 | |
| 			client.create_order(tax_dict)
 | |
| 	except taxjar.exceptions.TaxJarResponseError as err:
 | |
| 		frappe.throw(_(sanitize_error_response(err)))
 | |
| 	except Exception as ex:
 | |
| 		print(traceback.format_exc(ex))
 | |
| 
 | |
| 
 | |
| def delete_transaction(doc, method):
 | |
| 	"""Delete an existing TaxJar order transaction"""
 | |
| 
 | |
| 	if not TAXJAR_CREATE_TRANSACTIONS:
 | |
| 		return
 | |
| 
 | |
| 	client = get_client()
 | |
| 
 | |
| 	if not client:
 | |
| 		return
 | |
| 
 | |
| 	client.delete_order(doc.name)
 | |
| 
 | |
| 
 | |
| def get_tax_data(doc):
 | |
| 	from_address = get_company_address_details(doc)
 | |
| 	from_shipping_state = from_address.get("state")
 | |
| 	from_country_code = frappe.db.get_value("Country", from_address.country, "code")
 | |
| 	from_country_code = from_country_code.upper()
 | |
| 
 | |
| 	to_address = get_shipping_address_details(doc)
 | |
| 	to_shipping_state = to_address.get("state")
 | |
| 	to_country_code = frappe.db.get_value("Country", to_address.country, "code")
 | |
| 	to_country_code = to_country_code.upper()
 | |
| 
 | |
| 	shipping = sum([tax.tax_amount for tax in doc.taxes if tax.account_head == SHIP_ACCOUNT_HEAD])
 | |
| 
 | |
| 	line_items = [get_line_item_dict(item, doc.docstatus) for item in doc.items]
 | |
| 
 | |
| 	if from_shipping_state not in SUPPORTED_STATE_CODES:
 | |
| 		from_shipping_state = get_state_code(from_address, 'Company')
 | |
| 
 | |
| 	if to_shipping_state not in SUPPORTED_STATE_CODES:
 | |
| 		to_shipping_state = get_state_code(to_address, 'Shipping')
 | |
| 
 | |
| 	tax_dict = {
 | |
| 		'from_country': from_country_code,
 | |
| 		'from_zip': from_address.pincode,
 | |
| 		'from_state': from_shipping_state,
 | |
| 		'from_city': from_address.city,
 | |
| 		'from_street': from_address.address_line1,
 | |
| 		'to_country': to_country_code,
 | |
| 		'to_zip': to_address.pincode,
 | |
| 		'to_city': to_address.city,
 | |
| 		'to_street': to_address.address_line1,
 | |
| 		'to_state': to_shipping_state,
 | |
| 		'shipping': shipping,
 | |
| 		'amount': doc.net_total,
 | |
| 		'plugin': 'erpnext',
 | |
| 		'line_items': line_items
 | |
| 	}
 | |
| 	return tax_dict
 | |
| 
 | |
| def get_state_code(address, location):
 | |
| 	if address is not None:
 | |
| 		state_code = get_iso_3166_2_state_code(address)
 | |
| 		if state_code not in SUPPORTED_STATE_CODES:
 | |
| 			frappe.throw(_("Please enter a valid State in the {0} Address").format(location))
 | |
| 	else:
 | |
| 		frappe.throw(_("Please enter a valid State in the {0} Address").format(location))
 | |
| 
 | |
| 	return state_code
 | |
| 
 | |
| def get_line_item_dict(item, docstatus):
 | |
| 	tax_dict = dict(
 | |
| 		id = item.get('idx'),
 | |
| 		quantity = item.get('qty'),
 | |
| 		unit_price = item.get('rate'),
 | |
| 		product_tax_code = item.get('product_tax_category')
 | |
| 	)
 | |
| 
 | |
| 	if docstatus == 1:
 | |
| 		tax_dict.update({
 | |
| 			'sales_tax':item.get('tax_collectable')
 | |
| 		})
 | |
| 
 | |
| 	return tax_dict
 | |
| 
 | |
| def set_sales_tax(doc, method):
 | |
| 	if not TAXJAR_CALCULATE_TAX:
 | |
| 		return
 | |
| 
 | |
| 	if not doc.items:
 | |
| 		return
 | |
| 
 | |
| 	if check_sales_tax_exemption(doc):
 | |
| 		return
 | |
| 
 | |
| 	tax_dict = get_tax_data(doc)
 | |
| 
 | |
| 	if not tax_dict:
 | |
| 		# Remove existing tax rows if address is changed from a taxable state/country
 | |
| 		setattr(doc, "taxes", [tax for tax in doc.taxes if tax.account_head != TAX_ACCOUNT_HEAD])
 | |
| 		return
 | |
| 
 | |
| 	# check if delivering within a nexus
 | |
| 	check_for_nexus(doc, tax_dict)
 | |
| 
 | |
| 	tax_data = validate_tax_request(tax_dict)
 | |
| 	if tax_data is not None:
 | |
| 		if not tax_data.amount_to_collect:
 | |
| 			setattr(doc, "taxes", [tax for tax in doc.taxes if tax.account_head != TAX_ACCOUNT_HEAD])
 | |
| 		elif tax_data.amount_to_collect > 0:
 | |
| 			# Loop through tax rows for existing Sales Tax entry
 | |
| 			# If none are found, add a row with the tax amount
 | |
| 			for tax in doc.taxes:
 | |
| 				if tax.account_head == TAX_ACCOUNT_HEAD:
 | |
| 					tax.tax_amount = tax_data.amount_to_collect
 | |
| 
 | |
| 					doc.run_method("calculate_taxes_and_totals")
 | |
| 					break
 | |
| 			else:
 | |
| 				doc.append("taxes", {
 | |
| 					"charge_type": "Actual",
 | |
| 					"description": "Sales Tax",
 | |
| 					"account_head": TAX_ACCOUNT_HEAD,
 | |
| 					"tax_amount": tax_data.amount_to_collect
 | |
| 				})
 | |
| 			# Assigning values to tax_collectable and taxable_amount fields in sales item table
 | |
| 			for item in tax_data.breakdown.line_items:
 | |
| 				doc.get('items')[cint(item.id)-1].tax_collectable = item.tax_collectable
 | |
| 				doc.get('items')[cint(item.id)-1].taxable_amount = item.taxable_amount
 | |
| 
 | |
| 			doc.run_method("calculate_taxes_and_totals")
 | |
| 
 | |
| def check_for_nexus(doc, tax_dict):
 | |
| 	if not frappe.db.get_value('TaxJar Nexus', {'region_code': tax_dict["to_state"]}):
 | |
| 		for item in doc.get("items"):
 | |
| 			item.tax_collectable = flt(0)
 | |
| 			item.taxable_amount = flt(0)
 | |
| 
 | |
| 		for tax in doc.taxes:
 | |
| 			if tax.account_head == TAX_ACCOUNT_HEAD:
 | |
| 				doc.taxes.remove(tax)
 | |
| 		return
 | |
| 
 | |
| def check_sales_tax_exemption(doc):
 | |
| 	# if the party is exempt from sales tax, then set all tax account heads to zero
 | |
| 	sales_tax_exempted = hasattr(doc, "exempt_from_sales_tax") and doc.exempt_from_sales_tax \
 | |
| 		or frappe.db.has_column("Customer", "exempt_from_sales_tax") \
 | |
| 		and frappe.db.get_value("Customer", doc.customer, "exempt_from_sales_tax")
 | |
| 
 | |
| 	if sales_tax_exempted:
 | |
| 		for tax in doc.taxes:
 | |
| 			if tax.account_head == TAX_ACCOUNT_HEAD:
 | |
| 				tax.tax_amount = 0
 | |
| 				break
 | |
| 		doc.run_method("calculate_taxes_and_totals")
 | |
| 		return True
 | |
| 	else:
 | |
| 		return False
 | |
| 
 | |
| def validate_tax_request(tax_dict):
 | |
| 	"""Return the sales tax that should be collected for a given order."""
 | |
| 
 | |
| 	client = get_client()
 | |
| 
 | |
| 	if not client:
 | |
| 		return
 | |
| 
 | |
| 	try:
 | |
| 		tax_data = client.tax_for_order(tax_dict)
 | |
| 	except taxjar.exceptions.TaxJarResponseError as err:
 | |
| 		frappe.throw(_(sanitize_error_response(err)))
 | |
| 	else:
 | |
| 		return tax_data
 | |
| 
 | |
| 
 | |
| def get_company_address_details(doc):
 | |
| 	"""Return default company address details"""
 | |
| 
 | |
| 	company_address = get_company_address(get_default_company()).company_address
 | |
| 
 | |
| 	if not company_address:
 | |
| 		frappe.throw(_("Please set a default company address"))
 | |
| 
 | |
| 	company_address = frappe.get_doc("Address", company_address)
 | |
| 	return company_address
 | |
| 
 | |
| 
 | |
| def get_shipping_address_details(doc):
 | |
| 	"""Return customer shipping address details"""
 | |
| 
 | |
| 	if doc.shipping_address_name:
 | |
| 		shipping_address = frappe.get_doc("Address", doc.shipping_address_name)
 | |
| 	elif doc.customer_address:
 | |
| 		shipping_address = frappe.get_doc("Address", doc.customer_address_name)
 | |
| 	else:
 | |
| 		shipping_address = get_company_address_details(doc)
 | |
| 
 | |
| 	return shipping_address
 | |
| 
 | |
| 
 | |
| def get_iso_3166_2_state_code(address):
 | |
| 	import pycountry
 | |
| 	country_code = frappe.db.get_value("Country", address.get("country"), "code")
 | |
| 
 | |
| 	error_message = _("""{0} is not a valid state! Check for typos or enter the ISO code for your state.""").format(address.get("state"))
 | |
| 	state = address.get("state").upper().strip()
 | |
| 
 | |
| 	# The max length for ISO state codes is 3, excluding the country code
 | |
| 	if len(state) <= 3:
 | |
| 		# PyCountry returns state code as {country_code}-{state-code} (e.g. US-FL)
 | |
| 		address_state = (country_code + "-" + state).upper()
 | |
| 
 | |
| 		states = pycountry.subdivisions.get(country_code=country_code.upper())
 | |
| 		states = [pystate.code for pystate in states]
 | |
| 
 | |
| 		if address_state in states:
 | |
| 			return state
 | |
| 
 | |
| 		frappe.throw(_(error_message))
 | |
| 	else:
 | |
| 		try:
 | |
| 			lookup_state = pycountry.subdivisions.lookup(state)
 | |
| 		except LookupError:
 | |
| 			frappe.throw(_(error_message))
 | |
| 		else:
 | |
| 			return lookup_state.code.split('-')[1]
 | |
| 
 | |
| 
 | |
| def sanitize_error_response(response):
 | |
| 	response = response.full_response.get("detail")
 | |
| 	response = response.replace("_", " ")
 | |
| 
 | |
| 	sanitized_responses = {
 | |
| 		"to zip": "Zipcode",
 | |
| 		"to city": "City",
 | |
| 		"to state": "State",
 | |
| 		"to country": "Country"
 | |
| 	}
 | |
| 
 | |
| 	for k, v in sanitized_responses.items():
 | |
| 		response = response.replace(k, v)
 | |
| 
 | |
| 	return response
 |