fix: (Linter) Write queries using QB/ORM and other minor lines for semgrep to skip

This commit is contained in:
marination 2022-02-02 12:58:31 +05:30
parent 780e29b42e
commit 4b62d2d7fe
13 changed files with 89 additions and 84 deletions

View File

@ -435,7 +435,7 @@ def get_existing_payment_request_amount(ref_dt, ref_dn):
""", (ref_dt, ref_dn)) """, (ref_dt, ref_dn))
return flt(existing_payment_request_amount[0][0]) if existing_payment_request_amount else 0 return flt(existing_payment_request_amount[0][0]) if existing_payment_request_amount else 0
def get_gateway_details(args): def get_gateway_details(args): # nosemgrep
"""return gateway and payment account of default payment gateway""" """return gateway and payment account of default payment gateway"""
if args.get("payment_gateway_account"): if args.get("payment_gateway_account"):
return get_payment_gateway_account(args.get("payment_gateway_account")) return get_payment_gateway_account(args.get("payment_gateway_account"))

View File

@ -41,7 +41,7 @@ class TestECommerceSettings(unittest.TestCase):
def test_tax_rule_validation(self): def test_tax_rule_validation(self):
frappe.db.sql("update `tabTax Rule` set use_for_shopping_cart = 0") frappe.db.sql("update `tabTax Rule` set use_for_shopping_cart = 0")
frappe.db.commit() frappe.db.commit() # nosemgrep
cart_settings = self.get_cart_settings() cart_settings = self.get_cart_settings()
cart_settings.enabled = 1 cart_settings.enabled = 1

View File

@ -57,16 +57,19 @@ class WebsiteItem(WebsiteGenerator):
self.publish_unpublish_desk_item(publish=True) self.publish_unpublish_desk_item(publish=True)
if not self.get("__islocal"): if not self.get("__islocal"):
self.old_website_item_groups = frappe.db.sql_list(""" wig = frappe.qb.DocType("Website Item Group")
select query = (
item_group frappe.qb.from_(wig)
from .select(wig.item_group)
`tabWebsite Item Group` .where(
where (wig.parentfield == "website_item_groups")
parentfield='website_item_groups' & (wig.parenttype == "Website Item")
and parenttype='Website Item' & (wig.parent == self.name)
and parent=%s )
""", self.name) )
result = query.run(as_list=True)
self.old_website_item_groups = [x[0] for x in result]
def on_update(self): def on_update(self):
invalidate_cache_for_web_item(self) invalidate_cache_for_web_item(self)
@ -330,18 +333,22 @@ class WebsiteItem(WebsiteGenerator):
return tab_values return tab_values
def get_recommended_items(self, settings): def get_recommended_items(self, settings):
items = frappe.db.sql(f""" ri = frappe.qb.DocType("Recommended Items")
select wi = frappe.qb.DocType("Website Item")
ri.website_item_thumbnail, ri.website_item_name,
ri.route, ri.item_code query = (
from frappe.qb.from_(ri)
`tabRecommended Items` ri, `tabWebsite Item` wi .join(wi).on(ri.item_code == wi.item_code)
where .select(
ri.item_code = wi.item_code ri.item_code, ri.route,
and ri.parent = '{self.name}' ri.website_item_name,
and wi.published = 1 ri.website_item_thumbnail
order by ri.idx ).where(
""", as_dict=1) (ri.parent == self.name)
& (wi.published == 1)
).orderby(ri.idx)
)
items = query.run(as_dict=True)
if settings.show_price: if settings.show_price:
is_guest = frappe.session.user == "Guest" is_guest = frappe.session.user == "Guest"

View File

@ -57,7 +57,7 @@ def remove_from_wishlist(item_code):
"parent": frappe.session.user "parent": frappe.session.user
} }
) )
frappe.db.commit() frappe.db.commit() # nosemgrep
wishlist_items = frappe.db.get_values( wishlist_items = frappe.db.get_values(
"Wishlist Item", "Wishlist Item",

View File

@ -99,18 +99,14 @@ class ProductFiltersBuilder:
if not attributes: if not attributes:
return [] return []
result = frappe.db.sql( result = frappe.get_all(
""" "Item Variant Attribute",
select filters={
distinct attribute, attribute_value "attribute": ["in", attributes],
from "attribute_value": ["is", "set"]
`tabItem Variant Attribute` },
where fields=["attribute", "attribute_value"],
attribute in %(attributes)s distinct=True
and attribute_value is not null
""",
{"attributes": attributes},
as_dict=1,
) )
attribute_value_map = {} attribute_value_map = {}

View File

@ -585,10 +585,20 @@ def get_shipping_rules(quotation=None, cart_settings=None):
if quotation.shipping_address_name: if quotation.shipping_address_name:
country = frappe.db.get_value("Address", quotation.shipping_address_name, "country") country = frappe.db.get_value("Address", quotation.shipping_address_name, "country")
if country: if country:
shipping_rules = frappe.db.sql_list("""select distinct sr.name sr_country = frappe.qb.DocType("Shipping Rule Country")
from `tabShipping Rule Country` src, `tabShipping Rule` sr sr = frappe.qb.DocType("Shipping Rule")
where src.country = %s and query = (
sr.disabled != 1 and sr.name = src.parent""", country) frappe.qb.from_(sr_country)
.join(sr).on(sr.name == sr_country.parent)
.select(sr.name)
.distinct()
.where(
(sr_country.country == country)
& (sr.disabled != 1)
)
)
result = query.run(as_list=True)
shipping_rules = [x[0] for x in result]
return shipping_rules return shipping_rules

View File

@ -60,7 +60,7 @@ def get_item_codes_by_attributes(attribute_filters, template_item_code=None):
NULL NULL
'''.format(attribute_query=attribute_query, variant_of_query=variant_of_query) '''.format(attribute_query=attribute_query, variant_of_query=variant_of_query)
item_codes = set([r[0] for r in frappe.db.sql(query, query_values)]) item_codes = set([r[0] for r in frappe.db.sql(query, query_values)]) # nosemgrep
items.append(item_codes) items.append(item_codes)
res = list(set.intersection(*items)) res = list(set.intersection(*items))

View File

@ -17,7 +17,7 @@ def execute():
"website_warehouse", "web_long_description", "website_content", "thumbnail"] "website_warehouse", "web_long_description", "website_content", "thumbnail"]
# get all valid columns (fields) from Item master DB schema # get all valid columns (fields) from Item master DB schema
item_table_fields = frappe.db.sql("desc `tabItem`", as_dict=1) item_table_fields = frappe.db.sql("desc `tabItem`", as_dict=1) # nosemgrep
item_table_fields = [d.get('Field') for d in item_table_fields] item_table_fields = [d.get('Field') for d in item_table_fields]
# prepare fields to query from Item, check if the web field exists in Item master # prepare fields to query from Item, check if the web field exists in Item master

View File

@ -24,17 +24,17 @@ def execute():
settings = frappe.get_doc("E Commerce Settings") settings = frappe.get_doc("E Commerce Settings")
def map_into_e_commerce_settings(doctype, fields): def map_into_e_commerce_settings(doctype, fields):
data = frappe.db.sql(""" singles = frappe.qb.DocType("Singles")
Select query = (
field, value frappe.qb.from_(singles)
from `tabSingles` .select(
where singles.field, singles.value
doctype='{doctype}' ).where(
and field in ({fields}) (singles.doctype == doctype)
""".format( & (singles.field in fields)
doctype=doctype, )
fields=(",").join(['%s'] * len(fields)) )
), tuple(fields), as_dict=1) data = query.run(as_dict=True)
# {'enable_attribute_filters': '1', ...} # {'enable_attribute_filters': '1', ...}
mapper = {row.field: row.value for row in data} mapper = {row.field: row.value for row in data}
@ -51,10 +51,12 @@ def execute():
# move filters and attributes tables to E Commerce Settings from Products Settings # move filters and attributes tables to E Commerce Settings from Products Settings
for doctype in ("Website Filter Field", "Website Attribute"): for doctype in ("Website Filter Field", "Website Attribute"):
frappe.db.sql("""Update `tab{doctype}` frappe.db.set_value(
set doctype,
parenttype = 'E Commerce Settings', {"parent": "Products Settings"},
parent = 'E Commerce Settings' {
where "parenttype": "E Commerce Settings",
parent = 'Products Settings' "parent": "E Commerce Settings"
""".format(doctype=doctype)) },
update_modified=False
)

View File

@ -29,7 +29,7 @@ def create_fiscal_year_and_company(args):
'domain': args.get('domains')[0] 'domain': args.get('domains')[0]
}).insert() }).insert()
def enable_shopping_cart(args): def enable_shopping_cart(args): # nosemgrep
# Needs price_lists # Needs price_lists
frappe.get_doc({ frappe.get_doc({
"doctype": "E Commerce Settings", "doctype": "E Commerce Settings",

View File

@ -535,7 +535,7 @@ def create_bank_account(args):
# bank account same as a CoA entry # bank account same as a CoA entry
pass pass
def update_shopping_cart_settings(args): def update_shopping_cart_settings(args): # nosemgrep
shopping_cart = frappe.get_doc("E Commerce Settings") shopping_cart = frappe.get_doc("E Commerce Settings")
shopping_cart.update({ shopping_cart.update({
"enabled": 1, "enabled": 1,

View File

@ -53,9 +53,7 @@ def get_product_data(search=None, start=0, limit=12):
# order by # order by
query += """ ORDER BY ranking desc, modified desc limit %s, %s""" % (cint(start), cint(limit)) query += """ ORDER BY ranking desc, modified desc limit %s, %s""" % (cint(start), cint(limit))
return frappe.db.sql(query, { return frappe.db.sql(query, {"search": search}, as_dict=1) # nosemgrep
"search": search
}, as_dict=1)
@frappe.whitelist(allow_guest=True) @frappe.whitelist(allow_guest=True)
def search(query): def search(query):

View File

@ -56,30 +56,22 @@ def get_category_records(categories):
categorical_data = {} categorical_data = {}
for category in categories: for category in categories:
if category == "item_group": if category == "item_group":
categorical_data["item_group"] = frappe.db.sql(""" categorical_data["item_group"] = frappe.db.get_all(
Select "Item Group",
name, parent_item_group, is_group, image, route filters={
from "parent_item_group": "All Item Groups",
`tabItem Group` "show_in_website": 1
where },
parent_item_group = 'All Item Groups' fields=["name", "parent_item_group", "is_group", "image", "route"],
and show_in_website = 1 as_dict=True
""", )
as_dict=1)
else: else:
doctype = frappe.unscrub(category) doctype = frappe.unscrub(category)
fields = ["name"] fields = ["name"]
if frappe.get_meta(doctype, cached=True).get_field("image"): if frappe.get_meta(doctype, cached=True).get_field("image"):
fields += ["image"] fields += ["image"]
categorical_data[category] = frappe.db.sql( categorical_data[category] = frappe.db.get_all(doctype, fields=fields, as_dict=True)
f"""
Select
{",".join(fields)}
from
`tab{doctype}`
""",
as_dict=1)
return categorical_data return categorical_data