How to block brute force attacks against your wordpress and live happy

3 minute read Published:

Some time ago i installed on my wordpress a wonderful plugin to help me secure this (and others) blog.

it’s named Sucuri Security and it’s from sucuri.net. it scans your wordpress for the most common mistakes and add some interesting features.

One of these features it’s the capability of trace failed logins and save them in a log file in JSON format like:

{"user_login":"admin","user_password":"","attempt_time":1422522535,"remote_addr":"91.121.48.49","user_agent":false}

This made me think of a possible way to exploit this information to temporary block the ip of the attacker so i made a simple script to “abuse” their log and ended up with a simple script in python.

This script scan the logs file (more than one in case you have many wordpress installation) and

  • check if the ip is present at least three times (to avoid someone who mistook the password accidentally)
  • check if the ip is not already present
  • it adds the ip to a custom iptables chain to block it on port 80 for half a day this is the script, feel free to use it at your own risk.

to use it you need to install python-iptables and ipaddress packages (available from pip) :

pip install python-iptables
pip install ipaddress
import json
import time
import iptc
import ipaddress

fl_file="sucuri-failedlogins.php"

##change for your own configuration
sdir=["/srv/www/wp-uploads/oneblog/sucuri","/srv/www/wp-uploads/anotherblog/sucuri"]

##change for your own configuration
ignored_network = ["127.0.0.1/8","192.168.0.0/16","10.0.0.1/8", "myexternalip/32"]

chain="wordpressban"

##in seconds, change for your own needs
waittime=43200

def goodIP(ipaddr):
    #if ip is on ignored network list return False otherwise True
    target=ipaddress.IPv4Network(unicode(ipaddr),strict=False)
    for n in ignored_network:
        net=ipaddress.IPv4Network(unicode(n),strict=False)
        if net.overlaps(target):
            return False
    return True

def resetchain():
    table = iptc.Table(iptc.Table.FILTER)
    for tt in table.chains:
        if tt.name == chain:
            tt.flush()

def checkChain():
    table = iptc.Table(iptc.Table.FILTER)
    for ch in table.chains:
        if ch.name == chain:
            return ch
    ch = table.create_chain(chain)
    rule = iptc.Rule()
    target = iptc.Target(rule,"ACCEPT")
    rule.target = target
    ch.insert_rule(rule)
    rule2 = iptc.Rule()
    rule2.protocol = "tcp"
    match2 = iptc.Match(rule2, "tcp")
    match2.dport = "80"
    rule2.add_match(match2)
    target2 = iptc.Target(rule2, chain)
    rule2.target = target2
    ch2 = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
    ch2.insert_rule(rule2)
    return ch

def AddChain(wpc,ip):
    rule = iptc.Rule()
    rule.src = ip
    target = iptc.Target(rule, "DROP")
    rule.target = target
    wpc.insert_rule(rule)
def checkLogs():
    attacker={}
    for wpdir in sdir:
        ff=open(wpdir + '/' + fl_file)
        for line in ff.readlines():
            jline = ""
            try:
                jline = json.loads(line)
            except ValueError:
                pass
            if len(jline) > 0:
                #print jline
                if attacker.has_key(jline['remote_addr']):
                    attacker[jline['remote_addr']]['count'] += 1
                else:
                    attacker[jline['remote_addr']] = {}
                    attacker[jline['remote_addr']]['count'] = 1
                attacker[jline['remote_addr']]['time']= jline['attempt_time']
        ff.close()
    return attacker

def mainLoop(wpchain,attD):
    now=int(time.time())
    for badguy in attD.keys():
        if attD[badguy]['count'] >= 3 and goodIP(badguy):
            srcaddr=badguy + '/255.255.255.255'
            found=0
            for rule in wpchain.rules:
                if srcaddr == rule.src :
                    if attD[badguy]['time'] + waittime > now:
                        #delete from chain since waittime is passed
                        wpchain.delete_rule(rule)
                    found=1
                #for match in rule.matches:
                #    print match.get_all_parameters()
            if found == 0:
                AddChain(wpchain,srcaddr)

if datetime.datetime.now().hour < 2:
     resetchain()
 wpch = checkChain()
 attackD = checkLogs()
 mainLoop(wpch,attackD)
 
 

i launch it every hour from a cron

10 * * * * /pathtothescript/checklogins.py

and once per day the script will flush all the rules in the defined chain.