Pipelines
Inventory Items Retrieval Module
17min
this document provides a comprehensive overview of the modules responsible for retrieving inventory data and levels from linnworks these modules fetch data from linnworks and subsequently update the database with the received inventory data code overview the focus of this guide is on three primary modules inventory threepl inventory levels most of the code is reused between this modules, as they fetch and load data from the same source inventory this module is specifically designed to retrieve inventory items using the linnworks api it comprises several functions and a class that collectively manage the retrieval and updating of inventory data database connection context manager get db cursor the get db cursor function is a context manager for handling database connections it establishes a connection, yields it for use, and ensures the connection is closed after use it also handles any errors that occur during database operations this context manager is reused in the whole code @contextmanager def get db cursor() conn = psycopg2 connect( host=config database host, database=config database name, user=config database user, password=config database password ) cur = conn cursor() try yield conn, cur except pgerror as pg error inv log error("an error occurred with the database %s", pg error) conn rollback() finally cur close() conn close() inventorystock class inventorystock the class extends the linnapi class and includes methods for managing inventory stock using the linnworks api the key method in this class is get stock items , which retrieves stock items from the linnworks inventory retrieve inventory data function retrieve inventory data the function is used to retrieve inventory data and update a database with the yielded data from get stock items for i in range(0, len(inventory response), batch size) batch = inventory response\[i\ i + batch size] data = \[ ( response\["itemtitle"], response\["itemnumber"], response\["barcodenumber"], response\["metadata"], response\["purchaseprice"], response\["retailprice"], response\["categoryname"], response\["stockitemid"], response\["quantity"], response\["weight"] ) for response in batch ] try with get db cursor() as (conn, cur) insert query = inventory query execute values(cur, insert query, data) conn commit() except pgerror as pg error inv log error("an error occurred with the database %s at %(lineno)d ", str(pg error)) 3pl integration this module is used for extracting item titles, descriptions, prices, extended properties, and images linnworks provides a 3pl endpoint, stock/getstockitemsfullbyids , which we can use to retrieve inventory items, provided we have the inventory id in the inventory module, we have already fetched the main inventory data which contains the inventory id so, we fetch the ids from our database and send them to the api asynchronously this module has one main function and several helper functions that assist the main function in retrieving the inventory items main function get all data the function is an asynchronous function that retrieves and processes inventory data from the linnworks api it sends a post request to the stock/getstockitemsfullbyids endpoint of the linnworks api to retrieve inventory items it uses the inventory ids from params list for this purpose the function creates tasks for these requests and runs them asynchronously using asyncio's gather method if any errors occur during the http request, such as a client error or a timeout error, they are logged and the function returns after successfully retrieving the responses, the function processes and inserts the data into the database it does this by calling helper functions ( process responses ) for each type of data (titles, prices, descriptions, images, and extended properties) tasks = \[] url = 'stock/getstockitemsfullbyids' payload = {"request" {"stockitemids" params list, "datarequirements" \[2, 3, 4, 5, 6, 7]}} encoded payload = urllib parse urlencode(payload) tasks append(linn api make request( "post", url, session=session, payload=encoded payload)) responses = await asyncio gather( tasks) await process responses(responses, title query, extract titles data) await process responses(responses, price query, extract prices data) await process responses(responses, description query, extract description data) await process responses(responses, image query, extraxt images data) await process responses(responses, properties query, extract extended properties data) note this is just a code snnippet, find the actual code in the code repository helper functions there are several helper functions, \ process responses this is an asynchronous function used to transform the data fetched from the api, it processes the responses in batches of a predefined size ( batch size ) for each batch, it extracts the required data using the data extractor function and inserts it into the database using the provided sql query ( query ) if an error occurs during data insertion, it skips the current iteration and continues with the next batch async def process responses(responses, query, data extractor) start time = time perf counter() for i in range(0, len(responses), batch size) batch = responses\[i\ i + batch size] if not batch continue data = data extractor(batch) data = set(data) error occurred = await insert data(query, data) if error occurred continue # skip this iteration if an error occurred \ insert data this is used to update the data in the database after it has been extracted and transformed async def insert data(query, data) error occurred = false try with get db cursor() as (conn, cur) execute values(cur, query, data) conn commit() except pgerror as pg error threepl error("an error occurred with the database %s", str(pg error)) conn rollback() error occurred = true return error occurred extract functions this functions include extract titles data , extract prices data , extract description data , extract images data , and extract extended properties data this functions are designed to extract data from a batch of items the batch is expected to be a list of dictionaries, where each dictionary represents an item with its associated properties this functions are called in the main function passed in the process responses function def extract titles data(batch list\[any]) > list\[tuple\[any, ]] return \[ ( item\["stockitemid"], item\["source"], item\["subsource"], item\["title"] ) for response in batch if response for res in response get("stockitemsfullextended", \[]) for item in res get("itemchanneltitles", \[]) ] await process responses(responses, title query, extract titles data) levels stock levels are a crucial part of inventory management and need to be updated regularly while we could have used the stock/getstockitemsfullbyids endpoint to retrieve the levels, this would also mean regularly fetching items, which isn’t necessary in this module, we use the stock/getstocklevel batch endpoint to fetch the levels on a regular basis in a more efficient manner the code is quite similar to that of the 3pl integration , as they perform almost the same function this module contains only one function update stock levels update stock levels this asynchronous function is designed to update stock levels regularly it makes multiple http requests concurrently, gathers the responses, and updates the stock levels in the database usage to run this pipeline, the function are imported to one module from where they can get executed main inventory main inventory this asynchronous function is the main entry point for managing inventory it retrieves stock items and processes each item to retrieve its inventory data async def main inventory() > none inventory stock = inventorystock() stock responses = inventory stock get stock items() async for inventory response in stock responses await retrieve inventory data(inventory response) the function starts by initializing an inventorystock object it then calls the get stock items method of this object to retrieve a list of stock items the function iterates over each item in this list asynchronously for each item, it calls the retrieve inventory data function to process the item and retrieve its inventory data 3pl integration update inventory data this asynchronous function is designed to update inventory data it fetches parameters from the database in batches and makes multiple http requests concurrently to retrieve the data after all requests for a batch are completed, it waits for 60 seconds before proceeding with the next batch this process continues until there are no more parameters to fetch from the database async def update inventory data() > none total start time = time perf counter() async with aiohttp clientsession() as session offset = 0 limit = 150 while true params list = \[] for in range(200) params = await get params from db(offset, limit) if not params break params list append(params) offset += limit if not params list break tasks = \[get all data(session, params) for params in params list] await asyncio gather( tasks) await asyncio sleep(60) the function starts by initializing an aiohttp client session it then enters a loop where it fetches parameters from the database in batches of 150 (up to a maximum of 200 batches per minute) for each batch, it creates a list of tasks where each task is a call to the get all data function with the current session and parameters it then executes all tasks concurrently using asyncio gather() after all tasks for the current batch are completed, it waits for 60 seconds before proceeding with the next batch the function keeps track of the total time taken to update the inventory data levels data this asynchronous function is designed to update level data it fetches parameters from the database in batches and uses these parameters to update the stock levels this process continues until there are no more parameters to fetch from the database async def update level data() > none async with aiohttp clientsession() as session offset = 0 limit = 200 while true params list = \[] for in range(50) params = await get params from db(offset, limit) if not params break params list append(params) offset += limit if not params list break await update stock levels(session, params list) the function starts by initializing an aiohttp client session it then enters a loop where it fetches parameters from the database in batches of 200 (up to a maximum of 50 batches at a time) for each batch, it calls the update stock levels function with the current session and parameters the loop continues until there are no more parameters to fetch from the database conclusion this is how the etl pipeline is designed to function the processes are later scheduled to execute at respective times in conclusion, this documentation provides an overview of the inventory items retrieval module's functionality, its components, and their usage it's designed for efficient and automated inventory management